You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/04 01:01:05 UTC
[3/4] incubator-systemml git commit: Cleanup lop piggybacking (unused
code, visibility, logging, iterators)
Cleanup lop piggybacking (unused code, visibility, logging, iterators)
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/3ea3cdbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/3ea3cdbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/3ea3cdbd
Branch: refs/heads/master
Commit: 3ea3cdbd4cd83b0dc144f232a69b51cc25bc82a9
Parents: ba08b4c
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 2 16:54:33 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 2 16:54:33 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/sysml/lops/compile/Dag.java | 872 ++++---------------
1 file changed, 149 insertions(+), 723 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ea3cdbd/src/main/java/org/apache/sysml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java
index 3b4cfdb..f5693ef 100644
--- a/src/main/java/org/apache/sysml/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -92,7 +91,6 @@ import org.apache.sysml.runtime.matrix.sort.PickFromCompactInputFormat;
*/
public class Dag<N extends Lop>
{
-
private static final Log LOG = LogFactory.getLog(Dag.class.getName());
private static final int CHILD_BREAKS_ALIGNMENT = 2;
@@ -106,7 +104,7 @@ public class Dag<N extends Lop>
private int total_reducers = -1;
private String scratch = "";
- private String scratchFilePath = "";
+ private String scratchFilePath = null;
private double gmrMapperFootprint = 0;
@@ -185,7 +183,7 @@ public class Dag<N extends Lop>
}
private String getFilePath() {
- if ( scratchFilePath.equalsIgnoreCase("") ) {
+ if ( scratchFilePath == null ) {
scratchFilePath = scratch + Lop.FILE_SEPARATOR
+ Lop.PROCESS_PREFIX + DMLScript.getUUID()
+ Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR
@@ -209,6 +207,20 @@ public class Dag<N extends Lop>
}
/**
+ * Method to add a node to the DAG.
+ *
+ * @param node
+ * @return true if node was not already present, false if not.
+ */
+
+ public boolean addNode(N node) {
+ if (nodes.contains(node))
+ return false;
+ nodes.add(node);
+ return true;
+ }
+
+ /**
*
* @param config
* @return
@@ -262,83 +274,6 @@ public class Dag<N extends Lop>
}
- /**
- * Method to remove transient reads that do not have a transient write
- *
- * @param nodeV
- * @param inst
- * @throws DMLUnsupportedOperationException
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("unused")
- private void deleteUnwantedTransientReadVariables(ArrayList<N> nodeV,
- ArrayList<Instruction> inst) throws DMLRuntimeException,
- DMLUnsupportedOperationException {
- HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
-
- LOG.trace("In delete unwanted variables");
-
- // first capture all transient read variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
-
- if (node.getExecLocation() == ExecLocation.Data
- && ((Data) node).isTransient()
- && ((Data) node).getOperationType() == OperationTypes.READ) {
- labelNodeMapping.put(node.getOutputParameters().getLabel(),
- node);
- }
- }
-
- // generate delete instructions for all transient read variables without
- // a transient write
- // first capture all transient write variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
-
- if (node.getExecLocation() == ExecLocation.Data
- && ((Data) node).isTransient()
- && ((Data) node).getOperationType() == OperationTypes.WRITE ) {
- if (node.getInputs().get(0).getExecLocation() == ExecLocation.Data) {
- // this transient write is NOT a result of actual computation, but is simple copy.
- // in such a case, preserve the input variable so that appropriate copy instruction (cpvar or GMR) is generated
- // therefore, remove the input label from labelNodeMapping
- labelNodeMapping.remove(node.getInputs().get(0).getOutputParameters().getLabel());
- }
- else {
- if(labelNodeMapping.containsKey(node.getOutputParameters().getLabel()))
- // corresponding transient read exists (i.e., the variable is updated)
- // in such a case, generate rmvar instruction so that OLD data gets deleted
- labelNodeMapping.remove(node.getOutputParameters().getLabel());
- }
- }
- }
-
- // generate RM instructions
- Instruction rm_inst = null;
- for( Entry<String, N> e : labelNodeMapping.entrySet() ) {
- String label = e.getKey();
- N node = e.getValue();
-
- if (((Data) node).getDataType() == DataType.SCALAR) {
- // if(DEBUG)
- // System.out.println("rmvar" + Lops.OPERAND_DELIMITOR + label);
- // inst.add(new VariableSimpleInstructions("rmvar" +
- // Lops.OPERAND_DELIMITOR + label));
-
- } else {
- rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
- rm_inst.setLocation(node);
-
- if( LOG.isTraceEnabled() )
- LOG.trace(rm_inst.toString());
- inst.add(rm_inst);
-
- }
- }
-
- }
-
private void deleteUpdatedTransientReadVariables(StatementBlock sb, ArrayList<N> nodeV,
ArrayList<Instruction> inst) throws DMLRuntimeException,
DMLUnsupportedOperationException {
@@ -348,20 +283,6 @@ public class Dag<N extends Lop>
LOG.trace("In delete updated variables");
- /*
- Set<String> in = sb.liveIn().getVariables().keySet();
- Set<String> out = sb.liveOut().getVariables().keySet();
- Set<String> updated = sb.variablesUpdated().getVariables().keySet();
-
- Set<String> intersection = in;
- intersection.retainAll(out);
- intersection.retainAll(updated);
-
- for (String var : intersection) {
- inst.add(VariableCPInstruction.prepareRemoveInstruction(var));
- }
- */
-
// CANDIDATE list of variables which could have been updated in this statement block
HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
@@ -371,8 +292,7 @@ public class Dag<N extends Lop>
HashMap<String, N> updatedLabelsLineNum = new HashMap<String, N>();
// first capture all transient read variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
+ for ( N node : nodeV ) {
if (node.getExecLocation() == ExecLocation.Data
&& ((Data) node).isTransient()
@@ -399,8 +319,7 @@ public class Dag<N extends Lop>
}
// capture updated transient write variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
+ for ( N node : nodeV ) {
if (node.getExecLocation() == ExecLocation.Data
&& ((Data) node).isTransient()
@@ -422,8 +341,8 @@ public class Dag<N extends Lop>
rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
rm_inst.setLocation(updatedLabelsLineNum.get(label));
-
- LOG.trace(rm_inst.toString());
+ if( LOG.isTraceEnabled() )
+ LOG.trace(rm_inst.toString());
inst.add(rm_inst);
}
@@ -479,23 +398,6 @@ public class Dag<N extends Lop>
}*/
}
- /**
- * Method to add a node to the DAG.
- *
- * @param node
- * @return true if node was not already present, false if not.
- */
-
- public boolean addNode(N node) {
- if (nodes.contains(node))
- return false;
- else {
- nodes.add(node);
- return true;
- }
-
- }
-
private ArrayList<ArrayList<N>> createNodeVectors(int size) {
ArrayList<ArrayList<N>> arr = new ArrayList<ArrayList<N>>();
@@ -508,8 +410,8 @@ public class Dag<N extends Lop>
}
private void clearNodeVectors(ArrayList<ArrayList<N>> arr) {
- for (int i = 0; i < arr.size(); i++) {
- arr.get(i).clear();
+ for (ArrayList<N> tmp : arr) {
+ tmp.clear();
}
}
@@ -519,9 +421,10 @@ public class Dag<N extends Lop>
int base = jt.getBase();
for (int i = from; i < to; i++) {
- if ((nodes.get(i).getCompatibleJobs() & base) == 0) {
+ N node = nodes.get(i);
+ if ((node.getCompatibleJobs() & base) == 0) {
if( LOG.isTraceEnabled() )
- LOG.trace("Not compatible "+ nodes.get(i).toString());
+ LOG.trace("Not compatible "+ node.toString());
return false;
}
}
@@ -665,7 +568,6 @@ public class Dag<N extends Lop>
* @param jobNodes
* @throws LopsException
*/
-
private void handleSingleOutputJobs(ArrayList<N> execNodes,
ArrayList<ArrayList<N>> jobNodes, ArrayList<N> finishedNodes)
throws LopsException {
@@ -685,9 +587,7 @@ public class Dag<N extends Lop>
if (!jobNodes.get(jindex).isEmpty()) {
ArrayList<N> vec = jobNodes.get(jindex);
- // first find all nodes with more than one parent that is not
- // finished.
-
+ // first find all nodes with more than one parent that is not finished.
for (int i = 0; i < vec.size(); i++) {
N node = vec.get(i);
if (node.getExecLocation() == ExecLocation.MapOrReduce
@@ -705,24 +605,10 @@ public class Dag<N extends Lop>
}
}
}
- /*
- // Following condition will not work for MMRJ because execNodes may contain non-MapAndReduce
- // lops that are compatible with MMRJ (e.g., Data-WRITE)
- else if (!(node.getExecLocation() == ExecLocation.MapAndReduce
- && node.getType() == lopTypes[jobi])) {
- throw new LopsException(
- "Error in handleSingleOutputJobs(): encountered incompatible execLocation='"
- + node.getExecLocation() + "' for lop (ID="
- + node.getID() + ").");
- }
- */
}
- // need to redo all nodes in nodesWithOutput as well as their
- // children
-
- for (int i = 0; i < vec.size(); i++) {
- N node = vec.get(i);
+ // need to redo all nodes in nodesWithOutput as well as their children
+ for ( N node : vec ) {
if (node.getExecLocation() == ExecLocation.MapOrReduce
|| node.getExecLocation() == ExecLocation.Map) {
if (nodesWithUnfinishedOutputs.contains(node))
@@ -798,14 +684,6 @@ public class Dag<N extends Lop>
}
}
- /*private String getOutputFormat(Data node) {
- if(node.getFileFormatType() == FileFormatTypes.TEXT)
- return "textcell";
- else if ( node.getOutputParameters().isBlocked_representation() )
- return "binaryblock";
- else
- return "binarycell";
- }*/
/**
* Determine whether to send <code>node</code> to MR or to process it in the control program.
@@ -922,7 +800,6 @@ public class Dag<N extends Lop>
* @throws DMLUnsupportedOperationException
* @throws DMLRuntimeException
*/
-
@SuppressWarnings("unchecked")
private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, ArrayList<N> node_v)
throws LopsException, IOException, DMLRuntimeException,
@@ -940,7 +817,6 @@ public class Dag<N extends Lop>
ArrayList<ArrayList<N>> jobNodes = createNodeVectors(JobType.getNumJobTypes());
-
// list of instructions
ArrayList<Instruction> inst = new ArrayList<Instruction>();
@@ -949,9 +825,6 @@ public class Dag<N extends Lop>
ArrayList<Instruction> deleteInst = new ArrayList<Instruction>();
ArrayList<Instruction> endOfBlockInst = new ArrayList<Instruction>();
- // delete transient variables that are no longer needed
- //deleteUnwantedTransientReadVariables(node_v, deleteInst);
-
// remove files for transient reads that are updated.
deleteUpdatedTransientReadVariables(sb, node_v, writeInst);
@@ -1004,9 +877,6 @@ public class Dag<N extends Lop>
if( LOG.isTraceEnabled() )
LOG.trace(indent + "Queueing node "
+ node.toString() + " (code 2)");
- //for(N q: queuedNodes) {
- // LOG.trace(indent + " " + q.getType() + "," + q.getID());
- //}
queuedNodes.add(node);
// if node has more than two inputs,
@@ -1043,22 +913,6 @@ public class Dag<N extends Lop>
}
}
- /*if (node.getInputs().size() == 2) {
- int j1 = jobType(node.getInputs().get(0), jobNodes);
- int j2 = jobType(node.getInputs().get(1), jobNodes);
- if (j1 != -1 && j2 != -1 && j1 != j2) {
- LOG.trace(indent + "Queueing node "
- + node.toString() + " (code 3)");
-
- queuedNodes.add(node);
-
- removeNodesForNextIteration(node, finishedNodes,
- execNodes, queuedNodes, jobNodes);
-
- continue;
- }
- }*/
-
// See if this lop can be eliminated
// This check is for "aligner" lops (e.g., group)
boolean eliminate = false;
@@ -1267,13 +1121,6 @@ public class Dag<N extends Lop>
// reduce node, make sure no parent needs reduce, else queue
if (node.getExecLocation() == ExecLocation.MapAndReduce) {
-
- // boolean eliminate = false;
- // eliminate = canEliminateLop(node, execNodes);
- // if (eliminate || (!hasChildNode(node, execNodes,
- // ExecLocation.MapAndReduce)) &&
- // !hasMRJobChildNode(node,execNodes)) {
-
// TODO: statiko -- keep the middle condition
// discuss about having a lop that is MapAndReduce but does
// not define a job
@@ -1282,14 +1129,7 @@ public class Dag<N extends Lop>
execNodes.add(node);
finishedNodes.add(node);
addNodeByJobType(node, jobNodes, execNodes, eliminate);
-
- // } else {
- // if (DEBUG)
- // System.out.println("Queueing -" + node.toString());
- // queuedNodes.add(node);
- // removeNodesForNextIteration(node, finishedNodes,
- // execNodes, queuedNodes, jobNodes);
- // }
+
continue;
}
@@ -1319,10 +1159,10 @@ public class Dag<N extends Lop>
// add Scalar to execNodes if it has no child in exec nodes
// that will be executed in a MR job.
if (node.getExecLocation() == ExecLocation.ControlProgram) {
- for (int j = 0; j < node.getInputs().size(); j++) {
- if (execNodes.contains(node.getInputs().get(j))
- && !(node.getInputs().get(j).getExecLocation() == ExecLocation.Data)
- && !(node.getInputs().get(j).getExecLocation() == ExecLocation.ControlProgram)) {
+ for ( Lop lop : node.getInputs() ) {
+ if (execNodes.contains(lop)
+ && !(lop.getExecLocation() == ExecLocation.Data)
+ && !(lop.getExecLocation() == ExecLocation.ControlProgram)) {
if( LOG.isTraceEnabled() )
LOG.trace(indent + "Queueing -"+ node.toString() + " (code 9)");
@@ -1396,11 +1236,8 @@ public class Dag<N extends Lop>
// next generate MR instructions
if (!execNodes.isEmpty())
generateMRJobs(execNodes, inst, writeInst, deleteInst, jobNodes);
-
handleSingleOutputJobs(execNodes, jobNodes, finishedNodes);
-
}
-
}
// add write and delete inst at the very end.
@@ -1415,9 +1252,7 @@ public class Dag<N extends Lop>
}
private boolean compatibleWithChildrenInExecNodes(ArrayList<N> execNodes, N node) {
- for(int i=0; i < execNodes.size(); i++)
- {
- N tmpNode = execNodes.get(i);
+ for( N tmpNode : execNodes ) {
// for lops that execute in control program, compatibleJobs property is set to LopProperties.INVALID
// we should not consider such lops in this check
if (isChild(tmpNode, node, IDMap)
@@ -1466,11 +1301,6 @@ public class Dag<N extends Lop>
else {
processConsumers((N)in, inst, delteInst, null);
}
-
- /*if ( in.removeConsumer() == 0 ) {
- String label = in.getOutputParameters().getLabel();
- inst.add(VariableCPInstruction.prepareRemoveInstruction(label));
- }*/
}
}
@@ -1506,7 +1336,6 @@ public class Dag<N extends Lop>
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
-
@SuppressWarnings("unchecked")
private void generateControlProgramJobs(ArrayList<N> execNodes,
ArrayList<Instruction> inst, ArrayList<Instruction> writeInst, ArrayList<Instruction> deleteInst) throws LopsException,
@@ -1568,8 +1397,6 @@ public class Dag<N extends Lop>
else {
var_deletions.add(node.getOutputParameters().getLabel());
var_deletionsLineNum.put(node.getOutputParameters().getLabel(), node);
-
- //System.out.println(" --> skipping " + out.getLastInstructions() + " while processing node " + node.getID());
}
}
@@ -1768,10 +1595,9 @@ public class Dag<N extends Lop>
}
// delete all marked nodes
- for (int i = 0; i < markedNodes.size(); i++) {
- execNodes.remove(markedNodes.get(i));
+ for ( N node : markedNodes ) {
+ execNodes.remove(node);
}
-
}
/**
@@ -1784,7 +1610,6 @@ public class Dag<N extends Lop>
* @param queuedNodes
* @throws LopsException
*/
-
private void removeNodesForNextIteration(N node, ArrayList<N> finishedNodes,
ArrayList<N> execNodes, ArrayList<N> queuedNodes,
ArrayList<ArrayList<N>> jobvec) throws LopsException {
@@ -1794,10 +1619,9 @@ public class Dag<N extends Lop>
return;
//if all children are queued, then there is nothing to do.
- int numInputs = node.getInputs().size();
boolean allQueued = true;
- for(int i=0; i < numInputs; i++) {
- if( !queuedNodes.contains(node.getInputs().get(i)) ) {
+ for( Lop input : node.getInputs() ) {
+ if( !queuedNodes.contains(input) ) {
allQueued = false;
break;
}
@@ -1811,8 +1635,8 @@ public class Dag<N extends Lop>
// Determine if <code>node</code> has inputs from the same job or multiple jobs
int jobid = Integer.MIN_VALUE;
boolean inputs_in_same_job = true;
- for(int idx=0; idx < node.getInputs().size(); idx++) {
- int input_jobid = jobType(node.getInputs().get(idx), jobvec);
+ for( Lop input : node.getInputs() ) {
+ int input_jobid = jobType(input, jobvec);
if ( jobid == Integer.MIN_VALUE )
jobid = input_jobid;
else if ( jobid != input_jobid ) {
@@ -1824,8 +1648,7 @@ public class Dag<N extends Lop>
// Determine if there exist any unassigned inputs to <code>node</code>
// Evaluate only those lops that execute in MR.
boolean unassigned_inputs = false;
- for(int i=0; i < numInputs; i++) {
- Lop input = node.getInputs().get(i);
+ for( Lop input : node.getInputs() ) {
//if ( input.getExecLocation() != ExecLocation.ControlProgram && jobType(input, jobvec) == -1 ) {
if ( input.getExecType() == ExecType.MR && !execNodes.contains(input)) { //jobType(input, jobvec) == -1 ) {
unassigned_inputs = true;
@@ -1835,8 +1658,8 @@ public class Dag<N extends Lop>
// Determine if any node's children are queued
boolean child_queued = false;
- for(int i=0; i < numInputs; i++) {
- if (queuedNodes.contains(node.getInputs().get(i)) ) {
+ for( Lop input : node.getInputs() ) {
+ if (queuedNodes.contains(input) ) {
child_queued = true;
break;
}
@@ -1940,14 +1763,16 @@ public class Dag<N extends Lop>
} // for i
// we also need to delete all parent nodes of marked nodes
- for (int i = 0; i < execNodes.size(); i++) {
- LOG.trace(" Checking for removal - ("
- + execNodes.get(i).getID() + ") "
- + execNodes.get(i).toString());
-
- if (hasChildNode(execNodes.get(i), markedNodes) && !markedNodes.contains(execNodes.get(i))) {
- markedNodes.add(execNodes.get(i));
- LOG.trace(" Removing for next iteration (code 6) (" + execNodes.get(i).getID() + ") " + execNodes.get(i).toString());
+ for ( N enode : execNodes ) {
+ if( LOG.isTraceEnabled() ) {
+ LOG.trace(" Checking for removal - ("
+ + enode.getID() + ") " + enode.toString());
+ }
+
+ if (hasChildNode(enode, markedNodes) && !markedNodes.contains(enode)) {
+ markedNodes.add(enode);
+ if( LOG.isTraceEnabled() )
+ LOG.trace(" Removing for next iteration (code 6) (" + enode.getID() + ") " + enode.toString());
}
}
@@ -1963,193 +1788,6 @@ public class Dag<N extends Lop>
queuedNodes.add(n);
}
}
- /*for (int i = 0; i < markedNodes.size(); i++) {
- finishedNodes.remove(markedNodes.elementAt(i));
- execNodes.remove(markedNodes.elementAt(i));
- removeNodeByJobType(markedNodes.elementAt(i), jobvec);
- queuedNodes.add(markedNodes.elementAt(i));
- }*/
- }
-
- @SuppressWarnings("unused")
- private void removeNodesForNextIterationOLD(N node, ArrayList<N> finishedNodes,
- ArrayList<N> execNodes, ArrayList<N> queuedNodes,
- ArrayList<ArrayList<N>> jobvec) throws LopsException {
- // only queued nodes with two inputs need to be handled.
-
- // TODO: statiko -- this should be made == 1
- if (node.getInputs().size() != 2)
- return;
-
-
- //if both children are queued, nothing to do.
- if (queuedNodes.contains(node.getInputs().get(0))
- && queuedNodes.contains(node.getInputs().get(1)))
- return;
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Before remove nodes for next iteration -- size of execNodes " + execNodes.size());
-
-
-
- boolean inputs_in_same_job = false;
- // TODO: handle tertiary
- if (jobType(node.getInputs().get(0), jobvec) != -1
- && jobType(node.getInputs().get(0), jobvec) == jobType(node
- .getInputs().get(1), jobvec)) {
- inputs_in_same_job = true;
- }
-
- boolean unassigned_inputs = false;
- // here.. scalars shd be ignored
- if ((node.getInputs().get(0).getExecLocation() != ExecLocation.ControlProgram && jobType(
- node.getInputs().get(0), jobvec) == -1)
- || (node.getInputs().get(1).getExecLocation() != ExecLocation.ControlProgram && jobType(
- node.getInputs().get(1), jobvec) == -1)) {
- unassigned_inputs = true;
- }
-
- boolean child_queued = false;
-
- // check if atleast one child was queued.
- if (queuedNodes.contains(node.getInputs().get(0))
- || queuedNodes.contains(node.getInputs().get(1)))
- child_queued = true;
-
- // nodes to be dropped
- ArrayList<N> markedNodes = new ArrayList<N>();
-
- for (int i = 0; i < execNodes.size(); i++) {
-
- N tmpNode = execNodes.get(i);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Checking for removal (" + tmpNode.getID()
- + ") " + tmpNode.toString());
- LOG.trace("Inputs in same job " + inputs_in_same_job);
- LOG.trace("Unassigned inputs " + unassigned_inputs);
- LOG.trace("Child queued " + child_queued);
-
- }
-
- //if(node.definesMRJob() && isChild(tmpNode,node) && (tmpNode.getCompatibleJobs() & node.getCompatibleJobs()) == 0)
- // continue;
-
- // TODO: statiko -- check if this is too conservative?
- if (child_queued) {
- // if one of the children are queued,
- // remove some child nodes on other leg that may be needed later on.
- // For e.g. Group lop.
-
- if((tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) &&
- tmpNode.isAligner())
- {
- markedNodes.add(tmpNode);
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration: ("
- + tmpNode.getID() + ") " + tmpNode.toString());
- }
- else {
- if (!hasOtherQueuedParentNode(tmpNode, queuedNodes, node)
- && isChild(tmpNode, node, IDMap) && branchHasNoOtherUnExecutedParents(tmpNode, node, execNodes, finishedNodes)) {
-
-
- if(
- //e.g. MMCJ
- (node.getExecLocation() == ExecLocation.MapAndReduce &&
- branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes) && !tmpNode.definesMRJob() )
- ||
- //e.g. Binary
- (node.getExecLocation() == ExecLocation.Reduce && branchCanBePiggyBackedReduce(tmpNode, node, execNodes, finishedNodes)) )
- {
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration: ("
- + tmpNode.getID() + ") " + tmpNode.toString());
-
-
- markedNodes.add(tmpNode);
- }
- }
- }
- } else {
- /*
- * "node" has no other queued children.
- *
- * If inputs are in the same job and "node" is of type
- * MapAndReduce, then remove nodes of all types other than
- * Reduce, MapAndReduce, and the ones that define a MR job as
- * they can be piggybacked later.
- *
- * e.g: A=Rand, B=Rand, C=A%*%B Here, both inputs of MMCJ lop
- * come from Rand job, and they should not be removed.
- *
- * Other examples: -- MMCJ whose children are of type
- * MapAndReduce (say GMR) -- Inputs coming from two different
- * jobs .. GMR & REBLOCK
- */
- if ((inputs_in_same_job || unassigned_inputs)
- && node.getExecLocation() == ExecLocation.MapAndReduce
- && !hasOtherMapAndReduceParentNode(tmpNode, execNodes,
- node)
- && isChild(tmpNode, node, IDMap) &&
- branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes)
- && !tmpNode.definesMRJob()) {
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration:: ("
- + tmpNode.getID() + ") " + tmpNode.toString());
-
- markedNodes.add(tmpNode);
- }
-
- // as this node has inputs coming from different jobs, need to
- // free up everything
- // below and include the closest MapAndReduce lop if this is of
- // type Reduce.
- // if it is of type MapAndReduce, don't need to free any nodes
-
- if (!inputs_in_same_job && !unassigned_inputs
- && isChild(tmpNode, node, IDMap) &&
- (tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) &&
- tmpNode.isAligner())
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration ("
- + tmpNode.getID()
- + ") "
- + tmpNode.toString());
-
- markedNodes.add(tmpNode);
-
- }
-
- }
- }
-
- // we also need to delete all parent nodes of marked nodes
- for (int i = 0; i < execNodes.size(); i++) {
- if( LOG.isTraceEnabled() )
- LOG.trace("Checking for removal - ("
- + execNodes.get(i).getID() + ") "
- + execNodes.get(i).toString());
-
- if (hasChildNode(execNodes.get(i), markedNodes)) {
- markedNodes.add(execNodes.get(i));
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration - ("
- + execNodes.get(i).getID() + ") "
- + execNodes.get(i).toString());
- }
- }
-
- // delete marked nodes from finishedNodes and execNodes
- // add to queued nodes
- for (int i = 0; i < markedNodes.size(); i++) {
- finishedNodes.remove(markedNodes.get(i));
- execNodes.remove(markedNodes.get(i));
- removeNodeByJobType(markedNodes.get(i), jobvec);
- queuedNodes.add(markedNodes.get(i));
- }
}
private boolean branchCanBePiggyBackedReduce(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
@@ -2162,9 +1800,7 @@ public class Dag<N extends Lop>
return false;
}
- for(int i=0; i < execNodes.size(); i++) {
- N n = execNodes.get(i);
-
+ for( N n : execNodes ) {
if(n.equals(node))
continue;
@@ -2177,10 +1813,6 @@ public class Dag<N extends Lop>
&& n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
return false;
}
- /*if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap)
- && !node.getInputs().contains(tmpNode)
- && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
- return false;*/
}
return true;
}
@@ -2258,9 +1890,7 @@ public class Dag<N extends Lop>
return false;
JobType jt = JobType.findJobTypeFromLop(node);
- for (int i = 0; i < execNodes.size(); i++) {
- N n = execNodes.get(i);
-
+ for ( N n : execNodes ) {
if (n.equals(node))
continue;
@@ -2274,16 +1904,6 @@ public class Dag<N extends Lop>
else if (!isCompatible(n, jt))
return false;
}
-
- /*
- * if(n.equals(tmpNode) && n.getExecLocation() != ExecLocation.Map
- * && n.getExecLocation() != ExecLocation.MapOrReduce) return false;
- *
- * if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap) &&
- * n.getExecLocation() != ExecLocation.Map && n.getExecLocation() !=
- * ExecLocation.MapOrReduce) return false;
- */
-
}
return true;
}
@@ -2305,10 +1925,7 @@ public class Dag<N extends Lop>
}
//check to see if any node between node and tmpNode has more than one unfinished output
- for(int i=0; i < execNodes.size(); i++)
- {
- N n = execNodes.get(i);
-
+ for( N n : execNodes ) {
if(n.equals(node) || n.equals(tmpNode))
continue;
@@ -2328,36 +1945,6 @@ public class Dag<N extends Lop>
return true;
}
- /**
- * Method to check of there is a node of type MapAndReduce between a child
- * (tmpNode) and its parent (node)
- *
- * @param tmpNode
- * @param execNodes
- * @param node
- * @return
- */
-
- @SuppressWarnings({ "unchecked", "unused" })
- private boolean hasMapAndReduceParentNode(N tmpNode, ArrayList<N> execNodes, N node)
- {
- for (int i = 0; i < tmpNode.getOutputs().size(); i++) {
- N n = (N) tmpNode.getOutputs().get(i);
-
- if (execNodes.contains(n)
- && n.getExecLocation() == ExecLocation.MapAndReduce
- && isChild(n, node, IDMap)) {
- return true;
- } else {
- if (hasMapAndReduceParentNode(n, execNodes, node))
- return true;
- }
-
- }
-
- return false;
- }
-
/**
* Method to return the job index for a lop.
*
@@ -2366,7 +1953,6 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
-
private int jobType(Lop lops, ArrayList<ArrayList<N>> jobvec) throws LopsException {
for ( JobType jt : JobType.values()) {
int i = jt.getId();
@@ -2386,7 +1972,6 @@ public class Dag<N extends Lop>
* @param node
* @return
*/
-
@SuppressWarnings("unchecked")
private boolean hasOtherMapAndReduceParentNode(N tmpNode,
ArrayList<N> nodeList, N node) {
@@ -2425,10 +2010,9 @@ public class Dag<N extends Lop>
long nodeid = IDMap.get(node.getID());
long tmpid = IDMap.get(tmpNode.getID());
- for ( int i=0; i < queuedNodes.size(); i++ ) {
- int id = IDMap.get(queuedNodes.get(i).getID());
+ for ( N qnode : queuedNodes ) {
+ int id = IDMap.get(qnode.getID());
if ((id != nodeid && nodeMarked[id]) && (id != tmpid && tmpMarked[id]) )
- //if (nodeMarked[id] && tmpMarked[id])
return true;
}
@@ -2441,8 +2025,9 @@ public class Dag<N extends Lop>
* @param jobNodes
* @throws DMLRuntimeException
*/
- void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
- throws DMLRuntimeException {
+ private void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
+ throws DMLRuntimeException
+ {
if (LOG.isTraceEnabled()){
for ( JobType jt : JobType.values() ) {
int i = jt.getId();
@@ -2458,8 +2043,6 @@ public class Dag<N extends Lop>
}
}
-
-
}
/**
@@ -2469,7 +2052,7 @@ public class Dag<N extends Lop>
* @param loc
* @return
*/
- boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
+ private boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
for (int i = 0; i < nodes.size(); i++) {
if (nodes.get(i).getExecLocation() == ExecLocation.RecordReader)
return true;
@@ -2477,8 +2060,8 @@ public class Dag<N extends Lop>
return false;
}
- ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes) {
-
+ private ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes)
+ {
// obtain the list of record reader nodes
ArrayList<N> rrnodes = new ArrayList<N>();
for (int i = 0; i < gmrnodes.size(); i++) {
@@ -2542,8 +2125,7 @@ public class Dag<N extends Lop>
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
-
- public void generateMRJobs(ArrayList<N> execNodes,
+ private void generateMRJobs(ArrayList<N> execNodes,
ArrayList<Instruction> inst,
ArrayList<Instruction> writeinst,
ArrayList<Instruction> deleteinst, ArrayList<ArrayList<N>> jobNodes)
@@ -2551,17 +2133,6 @@ public class Dag<N extends Lop>
DMLRuntimeException
{
-
- /*// copy unassigned lops in execnodes to gmrnodes
- for (int i = 0; i < execNodes.size(); i++) {
- N node = execNodes.elementAt(i);
- if (jobType(node, jobNodes) == -1) {
- jobNodes.get(JobType.GMR.getId()).add(node);
- addChildren(node, jobNodes.get(JobType.GMR.getId()),
- execNodes);
- }
- }*/
-
printJobNodes(jobNodes);
ArrayList<Instruction> rmvarinst = new ArrayList<Instruction>();
@@ -2632,61 +2203,22 @@ public class Dag<N extends Lop>
}
/**
- * Method to get the input format for a lop
- *
- * @param elementAt
- * @return
- * @throws LopsException
- */
-
- // This code is replicated in ReBlock.java
- @SuppressWarnings("unused")
- private Format getChildFormat(N node)
- throws LopsException
- {
- if (node.getOutputParameters().getFile_name() != null
- || node.getOutputParameters().getLabel() != null) {
- return node.getOutputParameters().getFormat();
- } else {
- if (node.getInputs().size() > 1)
- throw new LopsException(node.printErrorLocation() + "Should only have one child! \n");
- /*
- * Return the format of the child node (i.e., input lop) No need of
- * recursion here.. because 1) Reblock lop's input can either be
- * DataLop or some intermediate computation If it is Data then we
- * just take its format (TEXT or BINARY) If it is intermediate lop
- * then it is always BINARY since we assume that all intermediate
- * computations will be in Binary format 2) Note that Reblock job
- * will never have any instructions in the mapper => the input lop
- * (if it is other than Data) is always executed in a different job
- */
- return node.getInputs().get(0).getOutputParameters().getFormat();
- // return getChildFormat((N) node.getInputs().get(0));
- }
-
- }
-
- /**
* Method to add all parents of "node" in exec_n to node_v.
*
* @param node
* @param node_v
* @param exec_n
*/
-
private void addParents(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
- for (int i = 0; i < exec_n.size(); i++) {
- if (isChild(node, exec_n.get(i), IDMap)) {
- if (!node_v.contains(exec_n.get(i))) {
+ for (N enode : exec_n ) {
+ if (isChild(node, enode, IDMap)) {
+ if (!node_v.contains(enode)) {
if( LOG.isTraceEnabled() )
- LOG.trace("Adding parent - "
- + exec_n.get(i).toString());
- node_v.add(exec_n.get(i));
+ LOG.trace("Adding parent - " + enode.toString());
+ node_v.add(enode);
}
}
-
}
-
}
/**
@@ -2696,11 +2228,10 @@ public class Dag<N extends Lop>
* @param node_v
* @param exec_n
*/
-
@SuppressWarnings("unchecked")
private void addChildren(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
- /** add child in exec nodes that is not of type scalar **/
+ // add child in exec nodes that is not of type scalar
if (exec_n.contains(node)
&& node.getExecLocation() != ExecLocation.ControlProgram) {
if (!node_v.contains(node)) {
@@ -2714,9 +2245,7 @@ public class Dag<N extends Lop>
if (!exec_n.contains(node))
return;
- /**
- * recurse
- */
+ // recurse
for (int i = 0; i < node.getInputs().size(); i++) {
N n = (N) node.getInputs().get(i);
addChildren(n, node_v, exec_n);
@@ -2730,7 +2259,7 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
- OutputInfo getOutputInfo(N node, boolean cellModeOverride)
+ private OutputInfo getOutputInfo(N node, boolean cellModeOverride)
throws LopsException
{
if ( (node.getDataType() == DataType.SCALAR && node.getExecType() == ExecType.CP)
@@ -2797,22 +2326,8 @@ public class Dag<N extends Lop>
return oinfo;
}
-
- /**
- * Method that determines the output format for a given node,
- * and returns a string representation of OutputInfo. This
- * method is primarily used while generating instructions that
- * execute in the control program.
- *
- * @param node
- * @return
- */
-/* String getOutputFormat(N node) {
- return OutputInfo.outputInfoToString(getOutputInfo(node, false));
- }
-*/
- public String prepareAssignVarInstruction(Lop input, Lop node) {
+ private String prepareAssignVarInstruction(Lop input, Lop node) {
StringBuilder sb = new StringBuilder();
sb.append(ExecType.CP);
@@ -3295,7 +2810,7 @@ public class Dag<N extends Lop>
* @throws DMLRuntimeException
*/
@SuppressWarnings("unchecked")
- public void generateMapReduceInstructions(ArrayList<N> execNodes,
+ private void generateMapReduceInstructions(ArrayList<N> execNodes,
ArrayList<Instruction> inst, ArrayList<Instruction> writeinst, ArrayList<Instruction> deleteinst, ArrayList<Instruction> rmvarinst,
JobType jt) throws LopsException,
DMLUnsupportedOperationException, DMLRuntimeException
@@ -3339,19 +2854,18 @@ public class Dag<N extends Lop>
if (jt == JobType.GMR || jt == JobType.GMRCELL) {
ArrayList<N> markedNodes = new ArrayList<N>();
// only keep data nodes that are results of some computation.
- for (int i = 0; i < rootNodes.size(); i++) {
- N node = rootNodes.get(i);
- if (node.getExecLocation() == ExecLocation.Data
- && ((Data) node).isTransient()
- && ((Data) node).getOperationType() == OperationTypes.WRITE
- && ((Data) node).getDataType() == DataType.MATRIX) {
+ for ( N rnode : rootNodes ) {
+ if (rnode.getExecLocation() == ExecLocation.Data
+ && ((Data) rnode).isTransient()
+ && ((Data) rnode).getOperationType() == OperationTypes.WRITE
+ && ((Data) rnode).getDataType() == DataType.MATRIX) {
// no computation, just a copy
- if (node.getInputs().get(0).getExecLocation() == ExecLocation.Data
- && ((Data) node.getInputs().get(0)).isTransient()
- && node.getOutputParameters().getLabel().compareTo(
- node.getInputs().get(0)
- .getOutputParameters().getLabel()) == 0) {
- markedNodes.add(node);
+ if (rnode.getInputs().get(0).getExecLocation() == ExecLocation.Data
+ && ((Data) rnode.getInputs().get(0)).isTransient()
+ && rnode.getOutputParameters().getLabel().compareTo(
+ rnode.getInputs().get(0).getOutputParameters().getLabel()) == 0)
+ {
+ markedNodes.add(rnode);
}
}
}
@@ -3367,10 +2881,9 @@ public class Dag<N extends Lop>
/* Determine all input data files */
- for (int i = 0; i < rootNodes.size(); i++) {
- getInputPathsAndParameters(rootNodes.get(i), execNodes,
- inputs, inputInfos, numRows, numCols, numRowsPerBlock,
- numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
+ for ( N rnode : rootNodes ) {
+ getInputPathsAndParameters(rnode, execNodes, inputs, inputInfos, numRows, numCols,
+ numRowsPerBlock, numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
}
// In case of RAND job, instructions are defined in the input file
@@ -3384,10 +2897,9 @@ public class Dag<N extends Lop>
// currently, recordreader instructions are allowed only in GMR jobs
if (jt == JobType.GMR || jt == JobType.GMRCELL) {
- for (int i = 0; i < rootNodes.size(); i++) {
- getRecordReaderInstructions(rootNodes.get(i), execNodes,
- inputs, recordReaderInstructions, nodeIndexMapping,
- start_index, inputLabels, inputLops, MRJobLineNumbers);
+ for ( N rnode : rootNodes ) {
+ getRecordReaderInstructions(rnode, execNodes, inputs, recordReaderInstructions,
+ nodeIndexMapping, start_index, inputLabels, inputLops, MRJobLineNumbers);
if ( recordReaderInstructions.size() > 1 )
throw new LopsException("MapReduce job can only have a single recordreader instruction: " + recordReaderInstructions.toString());
}
@@ -3544,23 +3056,6 @@ public class Dag<N extends Lop>
processConsumers((N)l, rmvarinst, deleteinst, null);
}
}
-
- }
-
- /**
- * Convert a byte array into string
- *
- * @param arr
- * @return none
- */
- public String getString(byte[] arr) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < arr.length; i++) {
- sb.append(",");
- sb.append(Byte.toString(arr[i]));
- }
-
- return sb.toString();
}
/**
@@ -3571,16 +3066,14 @@ public class Dag<N extends Lop>
*/
private String getCSVString(ArrayList<String> inputStrings) {
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < inputStrings.size(); i++) {
- String tmp = inputStrings.get(i);
- if( tmp != null ) {
+ for ( String str : inputStrings ) {
+ if( str != null ) {
if( sb.length()>0 )
sb.append(Lop.INSTRUCTION_DELIMITOR);
- sb.append( tmp );
+ sb.append( str );
}
}
return sb.toString();
-
}
/**
@@ -3589,7 +3082,6 @@ public class Dag<N extends Lop>
* @param list
* @return
*/
-
private String[] getStringArray(ArrayList<String> list) {
String[] arr = new String[list.size()];
@@ -3615,7 +3107,6 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
-
@SuppressWarnings("unchecked")
private int getAggAndOtherInstructions(N node, ArrayList<N> execNodes,
ArrayList<String> shuffleInstructions,
@@ -3624,9 +3115,7 @@ public class Dag<N extends Lop>
HashMap<N, Integer> nodeIndexMapping, int[] start_index,
ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
{
-
int ret_val = -1;
if (nodeIndexMapping.containsKey(node))
@@ -3813,9 +3302,6 @@ public class Dag<N extends Lop>
start_index[0]++;
if (node.getType() == Type.Ternary ) {
- //Tertiary.OperationTypes op = ((Tertiary<?, ?, ?, ?>) node).getOperationType();
- //if ( op == Tertiary.OperationTypes.CTABLE_TRANSFORM ) {
-
// in case of CTABLE_TRANSFORM_SCALAR_WEIGHT: inputIndices.get(2) would be -1
otherInstructionsReducer.add(node.getInstructions(
inputIndices.get(0), inputIndices.get(1),
@@ -3824,7 +3310,6 @@ public class Dag<N extends Lop>
MRJobLineNumbers.add(node._beginLine);
}
nodeIndexMapping.put(node, output_index);
- //}
}
else if( node.getType() == Type.ParameterizedBuiltin ){
otherInstructionsReducer.add(node.getInstructions(
@@ -3848,7 +3333,6 @@ public class Dag<N extends Lop>
}
return output_index;
-
}
else if (inputIndices.size() == 4) {
int output_index = start_index[0];
@@ -3868,7 +3352,6 @@ public class Dag<N extends Lop>
}
return -1;
-
}
/**
@@ -3885,7 +3368,6 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
-
@SuppressWarnings("unchecked")
private int getRecordReaderInstructions(N node, ArrayList<N> execNodes,
ArrayList<String> inputStrings,
@@ -3893,9 +3375,7 @@ public class Dag<N extends Lop>
HashMap<N, Integer> nodeIndexMapping, int[] start_index,
ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
{
-
// if input source, return index
if (nodeIndexMapping.containsKey(node))
return nodeIndexMapping.get(node);
@@ -3910,7 +3390,6 @@ public class Dag<N extends Lop>
// get mapper instructions
for (int i = 0; i < node.getInputs().size(); i++) {
-
// recurse
N childNode = (N) node.getInputs().get(i);
int ret_val = getRecordReaderInstructions(childNode, execNodes,
@@ -3932,17 +3411,14 @@ public class Dag<N extends Lop>
// cannot reuse index if this is true
// need to add better indexing schemes
- // if (child_for_max_input_index.getOutputs().size() > 1) {
output_index = start_index[0];
start_index[0]++;
- // }
nodeIndexMapping.put(node, output_index);
// populate list of input labels.
// only Ranagepick lop can contribute to labels
if (node.getType() == Type.PickValues) {
-
PickByCount pbc = (PickByCount) node;
if (pbc.getOperationType() == PickByCount.OperationTypes.RANGEPICK) {
int scalarIndex = 1; // always the second input is a scalar
@@ -3975,11 +3451,9 @@ public class Dag<N extends Lop>
"Unexpected number of inputs while generating a RecordReader Instruction");
return output_index;
-
}
return -1;
-
}
/**
@@ -3996,7 +3470,6 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
-
@SuppressWarnings("unchecked")
private int getMapperInstructions(N node, ArrayList<N> execNodes,
ArrayList<String> inputStrings,
@@ -4004,9 +3477,7 @@ public class Dag<N extends Lop>
HashMap<N, Integer> nodeIndexMapping, int[] start_index,
ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
{
-
// if input source, return index
if (nodeIndexMapping.containsKey(node))
return nodeIndexMapping.get(node);
@@ -4134,11 +3605,9 @@ public class Dag<N extends Lop>
MRJobLineNumbers.add(node._beginLine);
}
return output_index;
-
}
return -1;
-
}
// Method to populate inputs and also populates node index mapping.
@@ -4155,12 +3624,9 @@ public class Dag<N extends Lop>
&& !nodeIndexMapping.containsKey(node)) {
numRows.add(node.getOutputParameters().getNumRows());
numCols.add(node.getOutputParameters().getNumCols());
- numRowsPerBlock.add(node.getOutputParameters()
- .getRowsInBlock());
- numColsPerBlock.add(node.getOutputParameters()
- .getColsInBlock());
- inputStrings.add(node.getInstructions(inputStrings.size(),
- inputStrings.size()));
+ numRowsPerBlock.add(node.getOutputParameters().getRowsInBlock());
+ numColsPerBlock.add(node.getOutputParameters().getColsInBlock());
+ inputStrings.add(node.getInstructions(inputStrings.size(), inputStrings.size()));
if(DMLScript.ENABLE_DEBUG_MODE) {
MRJobLineNumbers.add(node._beginLine);
}
@@ -4170,10 +3636,6 @@ public class Dag<N extends Lop>
return;
}
- // && ( !(node.getExecLocation() == ExecLocation.ControlProgram)
- // || (node.getExecLocation() == ExecLocation.ControlProgram &&
- // node.getDataType() != DataType.SCALAR )
- // )
// get input file names
if (!execNodes.contains(node)
&& !nodeIndexMapping.containsKey(node)
@@ -4192,19 +3654,14 @@ public class Dag<N extends Lop>
inputStrings.add(Lop.VARIABLE_NAME_PLACEHOLDER + node.getOutputParameters().getLabel()
+ Lop.VARIABLE_NAME_PLACEHOLDER);
}
- //if ( node.getType() == Lops.Type.Data && ((Data)node).isTransient())
- // inputStrings.add("##" + node.getOutputParameters().getLabel() + "##");
- //else
- // inputStrings.add(node.getOutputParameters().getLabel());
+
inputLabels.add(node.getOutputParameters().getLabel());
inputLops.add(node);
numRows.add(node.getOutputParameters().getNumRows());
numCols.add(node.getOutputParameters().getNumCols());
- numRowsPerBlock.add(node.getOutputParameters()
- .getRowsInBlock());
- numColsPerBlock.add(node.getOutputParameters()
- .getColsInBlock());
+ numRowsPerBlock.add(node.getOutputParameters().getRowsInBlock());
+ numColsPerBlock.add(node.getOutputParameters().getColsInBlock());
InputInfo nodeInputInfo = null;
// Check if file format type is binary or text and update infos
@@ -4262,15 +3719,12 @@ public class Dag<N extends Lop>
}
// if exec nodes does not contain node at this point, return.
-
if (!execNodes.contains(node))
return;
// process children recursively
-
- for (int i = 0; i < node.getInputs().size(); i++) {
- N childNode = (N) node.getInputs().get(i);
- getInputPathsAndParameters(childNode, execNodes, inputStrings,
+ for ( Lop lop : node.getInputs() ) {
+ getInputPathsAndParameters((N)lop, execNodes, inputStrings,
inputInfos, numRows, numCols, numRowsPerBlock,
numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
}
@@ -4283,39 +3737,32 @@ public class Dag<N extends Lop>
* @param execNodes
* @param rootNodes
*/
-
private void getOutputNodes(ArrayList<N> execNodes, ArrayList<N> rootNodes, JobType jt) {
- for (int i = 0; i < execNodes.size(); i++) {
- N node = execNodes.get(i);
-
+ for ( N node : execNodes ) {
// terminal node
if (node.getOutputs().isEmpty() && !rootNodes.contains(node)) {
rootNodes.add(node);
- } else {
+ }
+ else {
// check for nodes with at least one child outside execnodes
int cnt = 0;
- for (int j = 0; j < node.getOutputs().size(); j++) {
- if (!execNodes.contains(node.getOutputs().get(j))) {
- cnt++;
- }
+ for (Lop lop : node.getOutputs() ) {
+ cnt += (!execNodes.contains(lop)) ? 1 : 0;
}
- if (cnt > 0 //!= 0
- //&& cnt <= node.getOutputs().size()
- && !rootNodes.contains(node) // not already a rootnode
- && !(node.getExecLocation() == ExecLocation.Data
- && ((Data) node).getOperationType() == OperationTypes.READ && ((Data) node)
- .getDataType() == DataType.MATRIX) // Not a matrix Data READ
- ) {
-
-
+ if (cnt > 0 && !rootNodes.contains(node) // not already a rootnode
+ && !(node.getExecLocation() == ExecLocation.Data
+ && ((Data) node).getOperationType() == OperationTypes.READ
+ && ((Data) node).getDataType() == DataType.MATRIX) ) // Not a matrix Data READ
+ {
if ( jt.allowsSingleShuffleInstruction() && node.getExecLocation() != ExecLocation.MapAndReduce)
continue;
if (cnt < node.getOutputs().size()) {
if(!node.getProducesIntermediateOutput())
rootNodes.add(node);
- } else
+ }
+ else
rootNodes.add(node);
}
}
@@ -4328,9 +3775,7 @@ public class Dag<N extends Lop>
* @param a
* @param b
*/
-
- public static boolean isChild(Lop a, Lop b, HashMap<Long, Integer> IDMap) {
- //int aID = IDMap.get(a.getID());
+ private static boolean isChild(Lop a, Lop b, HashMap<Long, Integer> IDMap) {
int bID = IDMap.get(b.getID());
return a.get_reachable()[bID];
}
@@ -4381,17 +3826,15 @@ public class Dag<N extends Lop>
// print the nodes in sorted order
if (LOG.isTraceEnabled()) {
- for (int i = 0; i < v.size(); i++) {
- // System.out.print(sortedNodes.get(i).getID() + "("
- // + levelmap.get(sortedNodes.get(i).getID()) + "), ");
+ for ( N vnode : v ) {
StringBuilder sb = new StringBuilder();
- sb.append(v.get(i).getID());
+ sb.append(vnode.getID());
sb.append("(");
- sb.append(v.get(i).getLevel());
+ sb.append(vnode.getLevel());
sb.append(") ");
- sb.append(v.get(i).getType());
+ sb.append(vnode.getType());
sb.append("(");
- for(Lop vin : v.get(i).getInputs()) {
+ for(Lop vin : vnode.getInputs()) {
sb.append(vin.getID());
sb.append(",");
}
@@ -4413,7 +3856,7 @@ public class Dag<N extends Lop>
* @param marked
*/
@SuppressWarnings("unchecked")
- void dagDFS(N root, boolean[] marked) {
+ private void dagDFS(N root, boolean[] marked) {
//contains check currently required for globalopt, will be removed when cleaned up
if( !IDMap.containsKey(root.getID()) )
return;
@@ -4422,33 +3865,32 @@ public class Dag<N extends Lop>
if ( marked[mapID] )
return;
marked[mapID] = true;
- for(int i=0; i < root.getOutputs().size(); i++) {
- //System.out.println("CALLING DFS "+root);
- dagDFS((N)root.getOutputs().get(i), marked);
+ for( Lop lop : root.getOutputs() ) {
+ dagDFS((N)lop, marked);
}
}
private boolean hasDirectChildNode(N node, ArrayList<N> childNodes) {
if ( childNodes.isEmpty() )
return false;
-
- for(int i=0; i < childNodes.size(); i++) {
- if ( childNodes.get(i).getOutputs().contains(node))
+ for( N cnode : childNodes ) {
+ if ( cnode.getOutputs().contains(node))
return true;
}
return false;
}
+ private boolean hasChildNode(N node, ArrayList<N> nodes) {
+ return hasChildNode(node, nodes, ExecLocation.INVALID);
+ }
+
private boolean hasChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
if ( childNodes.isEmpty() )
return false;
-
int index = IDMap.get(node.getID());
- for(int i=0; i < childNodes.size(); i++) {
- N cn = childNodes.get(i);
- if ( (type == ExecLocation.INVALID || cn.getExecLocation() == type) && cn.get_reachable()[index]) {
+ for( N cnode : childNodes ) {
+ if ( (type == ExecLocation.INVALID || cnode.getExecLocation() == type) && cnode.get_reachable()[index])
return true;
- }
}
return false;
}
@@ -4456,13 +3898,10 @@ public class Dag<N extends Lop>
private N getChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
if ( childNodes.isEmpty() )
return null;
-
int index = IDMap.get(node.getID());
- for(int i=0; i < childNodes.size(); i++) {
- N cn = childNodes.get(i);
- if ( cn.getExecLocation() == type && cn.get_reachable()[index]) {
- return cn;
- }
+ for( N cnode : childNodes ) {
+ if ( cnode.getExecLocation() == type && cnode.get_reachable()[index])
+ return cnode;
}
return null;
}
@@ -4479,8 +3918,7 @@ public class Dag<N extends Lop>
private N getParentNode(N node, ArrayList<N> parentNodes, ExecLocation type) {
if ( parentNodes.isEmpty() )
return null;
- for(int i=0; i < parentNodes.size(); i++ ) {
- N pn = parentNodes.get(i);
+ for( N pn : parentNodes ) {
int index = IDMap.get( pn.getID() );
if ( pn.getExecLocation() == type && node.get_reachable()[index])
return pn;
@@ -4495,9 +3933,7 @@ public class Dag<N extends Lop>
return false;
int index = IDMap.get(node.getID());
-
- for (int i = 0; i < nodesVec.size(); i++) {
- N n = nodesVec.get(i);
+ for( N n : nodesVec ) {
if ( n.definesMRJob() && n.get_reachable()[index])
return true;
}
@@ -4510,8 +3946,7 @@ public class Dag<N extends Lop>
int index = IDMap.get(node.getID());
boolean onlyDatagen = true;
- for (int i = 0; i < nodesVec.size(); i++) {
- N n = nodesVec.get(i);
+ for( N n : nodesVec ) {
if ( n.definesMRJob() && n.get_reachable()[index] && JobType.findJobTypeFromLop(n) != JobType.DATAGEN )
onlyDatagen = false;
}
@@ -4520,11 +3955,10 @@ public class Dag<N extends Lop>
}
@SuppressWarnings("unchecked")
- private int getChildAlignment(N node, ArrayList<N> execNodes, ExecLocation type) {
-
- for (int i = 0; i < node.getInputs().size(); i++) {
- N n = (N) node.getInputs().get(i);
-
+ private int getChildAlignment(N node, ArrayList<N> execNodes, ExecLocation type)
+ {
+ for (Lop lop : node.getInputs() ) {
+ N n = (N) lop;
if (!execNodes.contains(n))
continue;
@@ -4533,44 +3967,36 @@ public class Dag<N extends Lop>
return MR_CHILD_FOUND_BREAKS_ALIGNMENT;
else
return MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT;
- } else {
+ }
+ else {
int ret = getChildAlignment(n, execNodes, type);
if (ret == MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT
- || ret == CHILD_DOES_NOT_BREAK_ALIGNMENT) {
+ || ret == CHILD_DOES_NOT_BREAK_ALIGNMENT) {
if (n.getBreaksAlignment())
return CHILD_BREAKS_ALIGNMENT;
else
return CHILD_DOES_NOT_BREAK_ALIGNMENT;
}
-
else if (ret == MRCHILD_NOT_FOUND
|| ret == CHILD_BREAKS_ALIGNMENT
|| ret == MR_CHILD_FOUND_BREAKS_ALIGNMENT)
return ret;
else
- throw new RuntimeException(
- "Something wrong in getChildAlignment().");
+ throw new RuntimeException("Something wrong in getChildAlignment().");
}
}
return MRCHILD_NOT_FOUND;
-
- }
-
- private boolean hasChildNode(N node, ArrayList<N> nodes) {
- return hasChildNode(node, nodes, ExecLocation.INVALID);
}
private boolean hasParentNode(N node, ArrayList<N> parentNodes) {
if ( parentNodes.isEmpty() )
- return false;
-
- for( int i=0; i < parentNodes.size(); i++ ) {
- int index = IDMap.get( parentNodes.get(i).getID() );
+ return false;
+ for( N pnode : parentNodes ) {
+ int index = IDMap.get( pnode.getID() );
if ( node.get_reachable()[index])
return true;
}
return false;
}
-
}