You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/03/25 21:40:08 UTC
[4/5] incubator-systemml git commit: Cleanup runtime instruction
generation (simplifications, logging)
Cleanup runtime instruction generation (simplifications, logging)
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b2aec7a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b2aec7a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b2aec7a7
Branch: refs/heads/master
Commit: b2aec7a7b990ebace6ba1bddd39fa963e818a73c
Parents: b095362
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Mar 25 00:34:39 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Mar 25 13:39:20 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/sysml/lops/compile/Dag.java | 436 ++++++++-----------
1 file changed, 191 insertions(+), 245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2aec7a7/src/main/java/org/apache/sysml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java
index eb52053..29c1250 100644
--- a/src/main/java/org/apache/sysml/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java
@@ -85,10 +85,13 @@ import org.apache.sysml.runtime.matrix.sort.PickFromCompactInputFormat;
/**
*
- * Class to maintain a DAG and compile it into jobs
- * @param <N>
+ * Class to maintain a DAG of lops and compile it into
+ * runtime instructions, incl piggybacking into jobs.
+ *
+ * @param <N> the class parameter has no affect and is
+ * only kept for documentation purposes.
*/
-public class Dag<N extends Lop>
+public class Dag<N extends Lop>
{
private static final Log LOG = LogFactory.getLog(Dag.class.getName());
@@ -113,7 +116,7 @@ public class Dag<N extends Lop>
}
// hash set for all nodes in dag
- private ArrayList<N> nodes = null;
+ private ArrayList<Lop> nodes = null;
/*
* Hashmap to translates the nodes in the DAG to a sequence of numbers
@@ -197,7 +200,7 @@ public class Dag<N extends Lop>
public Dag()
{
//allocate internal data structures
- nodes = new ArrayList<N>();
+ nodes = new ArrayList<Lop>();
IDMap = new HashMap<Long, Integer>();
// get number of reducers from dml config
@@ -211,7 +214,7 @@ public class Dag<N extends Lop>
* @return true if node was not already present, false if not.
*/
- public boolean addNode(N node) {
+ public boolean addNode(Lop node) {
if (nodes.contains(node))
return false;
nodes.add(node);
@@ -253,7 +256,7 @@ public class Dag<N extends Lop>
}
// hold all nodes in a vector (needed for ordering)
- ArrayList<N> node_v = new ArrayList<N>();
+ ArrayList<Lop> node_v = new ArrayList<Lop>();
node_v.addAll(nodes);
/*
@@ -272,25 +275,26 @@ public class Dag<N extends Lop>
}
- private void deleteUpdatedTransientReadVariables(StatementBlock sb, ArrayList<N> nodeV,
+ private static void deleteUpdatedTransientReadVariables(StatementBlock sb, ArrayList<Lop> nodeV,
ArrayList<Instruction> inst) throws DMLRuntimeException,
DMLUnsupportedOperationException {
if ( sb == null )
return;
- LOG.trace("In delete updated variables");
+ if( LOG.isTraceEnabled() )
+ LOG.trace("In delete updated variables");
// CANDIDATE list of variables which could have been updated in this statement block
- HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
+ HashMap<String, Lop> labelNodeMapping = new HashMap<String, Lop>();
// ACTUAL list of variables whose value is updated, AND the old value of the variable
// is no longer accessible/used.
HashSet<String> updatedLabels = new HashSet<String>();
- HashMap<String, N> updatedLabelsLineNum = new HashMap<String, N>();
+ HashMap<String, Lop> updatedLabelsLineNum = new HashMap<String, Lop>();
// first capture all transient read variables
- for ( N node : nodeV ) {
+ for ( Lop node : nodeV ) {
if (node.getExecLocation() == ExecLocation.Data
&& ((Data) node).isTransient()
@@ -317,7 +321,7 @@ public class Dag<N extends Lop>
}
// capture updated transient write variables
- for ( N node : nodeV ) {
+ for ( Lop node : nodeV ) {
if (node.getExecLocation() == ExecLocation.Data
&& ((Data) node).isTransient()
@@ -346,14 +350,15 @@ public class Dag<N extends Lop>
}
- private void generateRemoveInstructions(StatementBlock sb,
+ private static void generateRemoveInstructions(StatementBlock sb,
ArrayList<Instruction> deleteInst)
throws DMLUnsupportedOperationException, DMLRuntimeException {
if ( sb == null )
return;
-
- LOG.trace("In generateRemoveInstructions()");
+
+ if( LOG.isTraceEnabled() )
+ LOG.trace("In generateRemoveInstructions()");
Instruction inst = null;
@@ -396,30 +401,28 @@ public class Dag<N extends Lop>
}*/
}
- private ArrayList<ArrayList<N>> createNodeVectors(int size) {
- ArrayList<ArrayList<N>> arr = new ArrayList<ArrayList<N>>();
+ private static ArrayList<ArrayList<Lop>> createNodeVectors(int size) {
+ ArrayList<ArrayList<Lop>> arr = new ArrayList<ArrayList<Lop>>();
// for each job type, we need to create a vector.
// additionally, create another vector for execNodes
for (int i = 0; i < size; i++) {
- arr.add(new ArrayList<N>());
+ arr.add(new ArrayList<Lop>());
}
return arr;
}
- private void clearNodeVectors(ArrayList<ArrayList<N>> arr) {
- for (ArrayList<N> tmp : arr) {
+ private static void clearNodeVectors(ArrayList<ArrayList<Lop>> arr) {
+ for (ArrayList<Lop> tmp : arr) {
tmp.clear();
}
}
- private boolean isCompatible(ArrayList<N> nodes, JobType jt, int from,
- int to) throws LopsException {
-
+ private static boolean isCompatible(ArrayList<Lop> nodes, JobType jt, int from, int to)
+ throws LopsException
+ {
int base = jt.getBase();
-
- for (int i = from; i < to; i++) {
- N node = nodes.get(i);
+ for ( Lop node : nodes ) {
if ((node.getCompatibleJobs() & base) == 0) {
if( LOG.isTraceEnabled() )
LOG.trace("Not compatible "+ node.toString());
@@ -437,7 +440,7 @@ public class Dag<N extends Lop>
* @param node2
* @return
*/
- private boolean isCompatible(N node1, N node2) {
+ private static boolean isCompatible(Lop node1, Lop node2) {
return( (node1.getCompatibleJobs() & node2.getCompatibleJobs()) > 0);
}
@@ -448,7 +451,7 @@ public class Dag<N extends Lop>
* @param jt
* @return
*/
- private boolean isCompatible(N node, JobType jt) {
+ private static boolean isCompatible(Lop node, JobType jt) {
if ( jt == JobType.GMRCELL )
jt = JobType.GMR;
return ((node.getCompatibleJobs() & jt.getBase()) > 0);
@@ -457,8 +460,8 @@ public class Dag<N extends Lop>
/*
* Add node, and its relevant children to job-specific node vectors.
*/
- private void addNodeByJobType(N node, ArrayList<ArrayList<N>> arr,
- ArrayList<N> execNodes, boolean eliminate) throws LopsException {
+ private void addNodeByJobType(Lop node, ArrayList<ArrayList<Lop>> arr,
+ ArrayList<Lop> execNodes, boolean eliminate) throws LopsException {
if (!eliminate) {
// Check if this lop defines a MR job.
@@ -552,7 +555,7 @@ public class Dag<N extends Lop>
* Remove the node from all job-specific node vectors. This method is
* invoked from removeNodesForNextIteration().
*/
- private void removeNodeByJobType(N node, ArrayList<ArrayList<N>> arr) {
+ private static void removeNodeByJobType(Lop node, ArrayList<ArrayList<Lop>> arr) {
for ( JobType jt : JobType.values())
if ( jt.getId() > 0 )
arr.get(jt.getId()).remove(node);
@@ -566,14 +569,14 @@ public class Dag<N extends Lop>
* @param jobNodes
* @throws LopsException
*/
- private void handleSingleOutputJobs(ArrayList<N> execNodes,
- ArrayList<ArrayList<N>> jobNodes, ArrayList<N> finishedNodes)
+ private void handleSingleOutputJobs(ArrayList<Lop> execNodes,
+ ArrayList<ArrayList<Lop>> jobNodes, ArrayList<Lop> finishedNodes)
throws LopsException {
/*
* If the input of a MMCJ/MMRJ job (must have executed in a Mapper) is used
* by multiple lops then we should mark it as not-finished.
*/
- ArrayList<N> nodesWithUnfinishedOutputs = new ArrayList<N>();
+ ArrayList<Lop> nodesWithUnfinishedOutputs = new ArrayList<Lop>();
int[] jobIndices = {JobType.MMCJ.getId()};
Lop.Type[] lopTypes = { Lop.Type.MMCJ};
@@ -583,14 +586,14 @@ public class Dag<N extends Lop>
for ( int jobi=0; jobi < jobIndices.length; jobi++ ) {
int jindex = jobIndices[jobi];
if (!jobNodes.get(jindex).isEmpty()) {
- ArrayList<N> vec = jobNodes.get(jindex);
+ ArrayList<Lop> vec = jobNodes.get(jindex);
// first find all nodes with more than one parent that is not finished.
for (int i = 0; i < vec.size(); i++) {
- N node = vec.get(i);
+ Lop node = vec.get(i);
if (node.getExecLocation() == ExecLocation.MapOrReduce
|| node.getExecLocation() == ExecLocation.Map) {
- N MRparent = getParentNode(node, execNodes, ExecLocation.MapAndReduce);
+ Lop MRparent = getParentNode(node, execNodes, ExecLocation.MapAndReduce);
if ( MRparent != null && MRparent.getType() == lopTypes[jobi]) {
int numParents = node.getOutputs().size();
if (numParents > 1) {
@@ -606,7 +609,7 @@ public class Dag<N extends Lop>
}
// need to redo all nodes in nodesWithOutput as well as their children
- for ( N node : vec ) {
+ for ( Lop node : vec ) {
if (node.getExecLocation() == ExecLocation.MapOrReduce
|| node.getExecLocation() == ExecLocation.Map) {
if (nodesWithUnfinishedOutputs.contains(node))
@@ -623,7 +626,7 @@ public class Dag<N extends Lop>
}
/** Method to check if a lop can be eliminated from checking **/
- private boolean canEliminateLop(N node, ArrayList<N> execNodes) {
+ private static boolean canEliminateLop(Lop node, ArrayList<Lop> execNodes) {
// this function can only eliminate "aligner" lops such a group
if (!node.isAligner())
return false;
@@ -660,8 +663,8 @@ public class Dag<N extends Lop>
* @param nodes
* @throws LopsException
*/
- private void generateInstructionsForInputVariables(ArrayList<N> nodes_v, ArrayList<Instruction> inst) throws LopsException, IOException {
- for(N n : nodes_v) {
+ private static void generateInstructionsForInputVariables(ArrayList<Lop> nodes_v, ArrayList<Instruction> inst) throws LopsException, IOException {
+ for(Lop n : nodes_v) {
if (n.getExecLocation() == ExecLocation.Data && !((Data) n).isTransient()
&& ((Data) n).getOperationType() == OperationTypes.READ
&& (n.getDataType() == DataType.MATRIX || n.getDataType() == DataType.FRAME) ) {
@@ -694,12 +697,11 @@ public class Dag<N extends Lop>
* @param node
* @return
*/
- @SuppressWarnings("unchecked")
- private boolean sendWriteLopToMR(N node)
+ private static boolean sendWriteLopToMR(Lop node)
{
if ( DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE )
return false;
- N in = (N) node.getInputs().get(0);
+ Lop in = node.getInputs().get(0);
Format nodeFormat = node.getOutputParameters().getFormat();
// Case of a transient read feeding into only one output persistent binaryblock write
@@ -726,7 +728,7 @@ public class Dag<N extends Lop>
* It is used only for those nodes that use inputs from distributed cache. The returned
* value is utilized in limiting the number of instructions piggybacked onto a single GMR mapper.
*/
- private double computeFootprintInMapper(N node) {
+ private static double computeFootprintInMapper(Lop node) {
// Memory limits must be checked only for nodes that use distributed cache
if ( ! node.usesDistributedCache() )
// default behavior
@@ -775,7 +777,7 @@ public class Dag<N extends Lop>
* the mappers then <code>node</code> can be executed in current round, and <code>true</code> is returned. Otherwise,
* <code>node</code> must be queued and <code>false</code> is returned.
*/
- private boolean checkMemoryLimits(N node, double footprintInMapper) {
+ private static boolean checkMemoryLimits(Lop node, double footprintInMapper) {
boolean addNode = true;
// Memory limits must be checked only for nodes that use distributed cache
@@ -798,22 +800,21 @@ public class Dag<N extends Lop>
* @throws DMLUnsupportedOperationException
* @throws DMLRuntimeException
*/
- @SuppressWarnings("unchecked")
- private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, ArrayList<N> node_v)
+ private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, ArrayList<Lop> node_v)
throws LopsException, IOException, DMLRuntimeException,
DMLUnsupportedOperationException
-
{
- LOG.trace("Grouping DAG ============");
+ if( LOG.isTraceEnabled() )
+ LOG.trace("Grouping DAG ============");
// nodes to be executed in current iteration
- ArrayList<N> execNodes = new ArrayList<N>();
+ ArrayList<Lop> execNodes = new ArrayList<Lop>();
// nodes that have already been processed
- ArrayList<N> finishedNodes = new ArrayList<N>();
+ ArrayList<Lop> finishedNodes = new ArrayList<Lop>();
// nodes that are queued for the following iteration
- ArrayList<N> queuedNodes = new ArrayList<N>();
+ ArrayList<Lop> queuedNodes = new ArrayList<Lop>();
- ArrayList<ArrayList<N>> jobNodes = createNodeVectors(JobType.getNumJobTypes());
+ ArrayList<ArrayList<Lop>> jobNodes = createNodeVectors(JobType.getNumJobTypes());
// list of instructions
ArrayList<Instruction> inst = new ArrayList<Instruction>();
@@ -835,18 +836,16 @@ public class Dag<N extends Lop>
String indent = " ";
while (!done) {
- LOG.trace("Grouping nodes in DAG");
+ if( LOG.isTraceEnabled() )
+ LOG.trace("Grouping nodes in DAG");
execNodes.clear();
queuedNodes.clear();
clearNodeVectors(jobNodes);
gmrMapperFootprint=0;
- for (int i = 0; i < node_v.size(); i++) {
- N node = node_v.get(i);
-
+ for ( Lop node : node_v ) {
// finished nodes don't need to be processed
-
if (finishedNodes.contains(node))
continue;
@@ -951,13 +950,12 @@ public class Dag<N extends Lop>
if (node.getInputs().size() > 1
&& hasChildNode(node, execNodes, ExecLocation.RecordReader)) {
// get the actual RecordReader lop
- N rr_node = getChildNode(node, execNodes, ExecLocation.RecordReader);
+ Lop rr_node = getChildNode(node, execNodes, ExecLocation.RecordReader);
// all inputs of "node" must be ancestors of rr_node
boolean queue_it = false;
- for (int in = 0; in < node.getInputs().size(); in++) {
+ for (Lop n : node.getInputs()) {
// each input should be ancestor of RecordReader lop
- N n = (N) node.getInputs().get(in);
if (!n.equals(rr_node) && !isChild(rr_node, n, IDMap)) {
queue_it = true; // i.e., "node" must be queued
break;
@@ -1007,7 +1005,7 @@ public class Dag<N extends Lop>
// Hence, <code>node</code> can be avoided.
// TODO: this case should ideally be handled in the language layer
// prior to the construction of Hops Dag
- N input = (N) dnode.getInputs().get(0);
+ Lop input = dnode.getInputs().get(0);
if ( dnode.isTransient()
&& input.getExecLocation() == ExecLocation.Data
&& ((Data)input).isTransient()
@@ -1081,7 +1079,7 @@ public class Dag<N extends Lop>
// then that input must get executed in one of the previous jobs.
int[] dcInputIndexes = node.distributedCacheInputIndex();
for( int dcInputIndex : dcInputIndexes ){
- N dcInput = (N) node.getInputs().get(dcInputIndex-1);
+ Lop dcInput = node.getInputs().get(dcInputIndex-1);
if ( (dcInput.getType() != Lop.Type.Data && dcInput.getExecType()==ExecType.MR)
&& execNodes.contains(dcInput) )
{
@@ -1207,7 +1205,7 @@ public class Dag<N extends Lop>
// copy unassigned lops in execnodes to gmrnodes
for (int i = 0; i < execNodes.size(); i++) {
- N node = execNodes.get(i);
+ Lop node = execNodes.get(i);
if (jobType(node, jobNodes) == -1) {
if ( isCompatible(node, JobType.GMR) ) {
if ( node.hasNonBlockedInputs() ) {
@@ -1249,8 +1247,8 @@ public class Dag<N extends Lop>
}
- private boolean compatibleWithChildrenInExecNodes(ArrayList<N> execNodes, N node) {
- for( N tmpNode : execNodes ) {
+ private boolean compatibleWithChildrenInExecNodes(ArrayList<Lop> execNodes, Lop node) {
+ for( Lop tmpNode : execNodes ) {
// for lops that execute in control program, compatibleJobs property is set to LopProperties.INVALID
// we should not consider such lops in this check
if (isChild(tmpNode, node, IDMap)
@@ -1268,7 +1266,7 @@ public class Dag<N extends Lop>
* @param varName
* @param deleteInst
*/
- private void excludeRemoveInstruction(String varName, ArrayList<Instruction> deleteInst) {
+ private static void excludeRemoveInstruction(String varName, ArrayList<Instruction> deleteInst) {
//for(Instruction inst : deleteInst) {
for(int i=0; i < deleteInst.size(); i++) {
Instruction inst = deleteInst.get(i);
@@ -1288,21 +1286,20 @@ public class Dag<N extends Lop>
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
- @SuppressWarnings("unchecked")
- private void processConsumersForInputs(N node, ArrayList<Instruction> inst, ArrayList<Instruction> delteInst) throws DMLRuntimeException, DMLUnsupportedOperationException {
+ private void processConsumersForInputs(Lop node, ArrayList<Instruction> inst, ArrayList<Instruction> delteInst) throws DMLRuntimeException, DMLUnsupportedOperationException {
// reduce the consumer count for all input lops
// if the count becomes zero, then then variable associated w/ input can be removed
for(Lop in : node.getInputs() ) {
if(DMLScript.ENABLE_DEBUG_MODE) {
- processConsumers((N)in, inst, delteInst, node);
+ processConsumers(in, inst, delteInst, node);
}
else {
- processConsumers((N)in, inst, delteInst, null);
+ processConsumers(in, inst, delteInst, null);
}
}
}
- private void processConsumers(N node, ArrayList<Instruction> inst, ArrayList<Instruction> deleteInst, N locationInfo) throws DMLRuntimeException, DMLUnsupportedOperationException {
+ private static void processConsumers(Lop node, ArrayList<Instruction> inst, ArrayList<Instruction> deleteInst, Lop locationInfo) throws DMLRuntimeException, DMLUnsupportedOperationException {
// reduce the consumer count for all input lops
// if the count becomes zero, then then variable associated w/ input can be removed
if ( node.removeConsumer() == 0 ) {
@@ -1334,22 +1331,21 @@ public class Dag<N extends Lop>
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
- @SuppressWarnings("unchecked")
- private void generateControlProgramJobs(ArrayList<N> execNodes,
+ private void generateControlProgramJobs(ArrayList<Lop> execNodes,
ArrayList<Instruction> inst, ArrayList<Instruction> writeInst, ArrayList<Instruction> deleteInst) throws LopsException,
DMLUnsupportedOperationException, DMLRuntimeException {
// nodes to be deleted from execnodes
- ArrayList<N> markedNodes = new ArrayList<N>();
+ ArrayList<Lop> markedNodes = new ArrayList<Lop>();
// variable names to be deleted
ArrayList<String> var_deletions = new ArrayList<String>();
- HashMap<String, N> var_deletionsLineNum = new HashMap<String, N>();
+ HashMap<String, Lop> var_deletionsLineNum = new HashMap<String, Lop>();
boolean doRmVar = false;
for (int i = 0; i < execNodes.size(); i++) {
- N node = execNodes.get(i);
+ Lop node = execNodes.get(i);
doRmVar = false;
// mark input scalar read nodes for deletion
@@ -1379,8 +1375,7 @@ public class Dag<N extends Lop>
inst.addAll(out.getPreInstructions());
boolean hasTransientWriteParent = false;
- for ( int pid=0; pid < node.getOutputs().size(); pid++ ) {
- N parent = (N)node.getOutputs().get(pid);
+ for ( Lop parent : node.getOutputs() ) {
if ( parent.getExecLocation() == ExecLocation.Data
&& ((Data)parent).getOperationType() == Data.OperationTypes.WRITE
&& ((Data)parent).isTransient() ) {
@@ -1593,7 +1588,7 @@ public class Dag<N extends Lop>
}
// delete all marked nodes
- for ( N node : markedNodes ) {
+ for ( Lop node : markedNodes ) {
execNodes.remove(node);
}
}
@@ -1608,9 +1603,9 @@ public class Dag<N extends Lop>
* @param queuedNodes
* @throws LopsException
*/
- private void removeNodesForNextIteration(N node, ArrayList<N> finishedNodes,
- ArrayList<N> execNodes, ArrayList<N> queuedNodes,
- ArrayList<ArrayList<N>> jobvec) throws LopsException {
+ private void removeNodesForNextIteration(Lop node, ArrayList<Lop> finishedNodes,
+ ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes,
+ ArrayList<ArrayList<Lop>> jobvec) throws LopsException {
// only queued nodes with multiple inputs need to be handled.
if (node.getInputs().size() == 1)
@@ -1672,10 +1667,8 @@ public class Dag<N extends Lop>
// Evaluate each lop in <code>execNodes</code> for removal.
// Add lops to be removed to <code>markedNodes</code>.
- ArrayList<N> markedNodes = new ArrayList<N>();
- for (int i = 0; i < execNodes.size(); i++) {
-
- N tmpNode = execNodes.get(i);
+ ArrayList<Lop> markedNodes = new ArrayList<Lop>();
+ for (Lop tmpNode : execNodes ) {
if (LOG.isTraceEnabled()) {
LOG.trace(" Checking for removal (" + tmpNode.getID() + ") " + tmpNode.toString());
@@ -1761,7 +1754,7 @@ public class Dag<N extends Lop>
} // for i
// we also need to delete all parent nodes of marked nodes
- for ( N enode : execNodes ) {
+ for ( Lop enode : execNodes ) {
if( LOG.isTraceEnabled() ) {
LOG.trace(" Checking for removal - ("
+ enode.getID() + ") " + enode.toString());
@@ -1777,7 +1770,7 @@ public class Dag<N extends Lop>
if ( execNodes.size() != markedNodes.size() ) {
// delete marked nodes from finishedNodes and execNodes
// add to queued nodes
- for(N n : markedNodes) {
+ for(Lop n : markedNodes) {
if ( n.usesDistributedCache() )
gmrMapperFootprint -= computeFootprintInMapper(n);
finishedNodes.remove(n);
@@ -1788,7 +1781,7 @@ public class Dag<N extends Lop>
}
}
- private boolean branchCanBePiggyBackedReduce(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
+ private boolean branchCanBePiggyBackedReduce(Lop tmpNode, Lop node, ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes) {
if(node.getExecLocation() != ExecLocation.Reduce)
return false;
@@ -1798,7 +1791,7 @@ public class Dag<N extends Lop>
return false;
}
- for( N n : execNodes ) {
+ for( Lop n : execNodes ) {
if(n.equals(node))
continue;
@@ -1815,8 +1808,7 @@ public class Dag<N extends Lop>
return true;
}
- @SuppressWarnings("unchecked")
- private boolean branchCanBePiggyBackedMap(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes, ArrayList<N> markedNodes) {
+ private boolean branchCanBePiggyBackedMap(Lop tmpNode, Lop node, ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes, ArrayList<Lop> markedNodes) {
if(node.getExecLocation() != ExecLocation.Map)
return false;
@@ -1836,7 +1828,7 @@ public class Dag<N extends Lop>
// "dcInput" must be executed prior to "node", and removal of tmpNode does not make that happen.
if(node.usesDistributedCache() ) {
for(int dcInputIndex : node.distributedCacheInputIndex()) {
- N dcInput = (N) node.getInputs().get(dcInputIndex-1);
+ Lop dcInput = node.getInputs().get(dcInputIndex-1);
if(isChild(tmpNode, dcInput, IDMap))
return false;
}
@@ -1849,7 +1841,7 @@ public class Dag<N extends Lop>
if (node.usesDistributedCache() )
memsize += computeFootprintInMapper(node);
if ( markedNodes != null ) {
- for(N n : markedNodes) {
+ for(Lop n : markedNodes) {
if ( n.usesDistributedCache() )
memsize += computeFootprintInMapper(n);
}
@@ -1881,14 +1873,14 @@ public class Dag<N extends Lop>
* @param finishedNodes
* @return
*/
- private boolean branchCanBePiggyBackedMapAndReduce(N tmpNode, N node,
- ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
+ private boolean branchCanBePiggyBackedMapAndReduce(Lop tmpNode, Lop node,
+ ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes) {
if (node.getExecLocation() != ExecLocation.MapAndReduce)
return false;
JobType jt = JobType.findJobTypeFromLop(node);
- for ( N n : execNodes ) {
+ for ( Lop n : execNodes ) {
if (n.equals(node))
continue;
@@ -1906,32 +1898,31 @@ public class Dag<N extends Lop>
return true;
}
- private boolean branchHasNoOtherUnExecutedParents(N tmpNode, N node,
- ArrayList<N> execNodes, ArrayList<N> finishedNodes) {
+ private boolean branchHasNoOtherUnExecutedParents(Lop tmpNode, Lop node,
+ ArrayList<Lop> execNodes, ArrayList<Lop> finishedNodes) {
//if tmpNode has more than one unfinished output, return false
if(tmpNode.getOutputs().size() > 1)
{
int cnt = 0;
- for (int j = 0; j < tmpNode.getOutputs().size(); j++) {
- if (!finishedNodes.contains(tmpNode.getOutputs().get(j)))
- cnt++;
- }
+ for (Lop output : tmpNode.getOutputs() )
+ if (!finishedNodes.contains(output))
+ cnt++;
if(cnt != 1)
return false;
}
//check to see if any node between node and tmpNode has more than one unfinished output
- for( N n : execNodes ) {
+ for( Lop n : execNodes ) {
if(n.equals(node) || n.equals(tmpNode))
continue;
if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap))
{
int cnt = 0;
- for (int j = 0; j < n.getOutputs().size(); j++) {
- if (!finishedNodes.contains(n.getOutputs().get(j)))
+ for (Lop output : n.getOutputs() ) {
+ if (!finishedNodes.contains(output))
cnt++;
}
@@ -1951,7 +1942,7 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
- private int jobType(Lop lops, ArrayList<ArrayList<N>> jobvec) throws LopsException {
+ private static int jobType(Lop lops, ArrayList<ArrayList<Lop>> jobvec) throws LopsException {
for ( JobType jt : JobType.values()) {
int i = jt.getId();
if (i > 0 && jobvec.get(i) != null && jobvec.get(i).contains(lops)) {
@@ -1970,16 +1961,13 @@ public class Dag<N extends Lop>
* @param node
* @return
*/
- @SuppressWarnings("unchecked")
- private boolean hasOtherMapAndReduceParentNode(N tmpNode,
- ArrayList<N> nodeList, N node) {
+ private boolean hasOtherMapAndReduceParentNode(Lop tmpNode,
+ ArrayList<Lop> nodeList, Lop node) {
if ( tmpNode.getExecLocation() == ExecLocation.MapAndReduce)
return true;
- for (int i = 0; i < tmpNode.getOutputs().size(); i++) {
- N n = (N) tmpNode.getOutputs().get(i);
-
+ for ( Lop n : tmpNode.getOutputs() ) {
if ( nodeList.contains(n) && isChild(n,node,IDMap)) {
if(!n.equals(node) && n.getExecLocation() == ExecLocation.MapAndReduce)
return true;
@@ -1999,7 +1987,7 @@ public class Dag<N extends Lop>
* @param node
* @return
*/
- private boolean hasOtherQueuedParentNode(N tmpNode, ArrayList<N> queuedNodes, N node) {
+ private boolean hasOtherQueuedParentNode(Lop tmpNode, ArrayList<Lop> queuedNodes, Lop node) {
if ( queuedNodes.isEmpty() )
return false;
@@ -2008,7 +1996,7 @@ public class Dag<N extends Lop>
long nodeid = IDMap.get(node.getID());
long tmpid = IDMap.get(tmpNode.getID());
- for ( N qnode : queuedNodes ) {
+ for ( Lop qnode : queuedNodes ) {
int id = IDMap.get(qnode.getID());
if ((id != nodeid && nodeMarked[id]) && (id != tmpid && tmpMarked[id]) )
return true;
@@ -2023,7 +2011,7 @@ public class Dag<N extends Lop>
* @param jobNodes
* @throws DMLRuntimeException
*/
- private void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
+ private static void printJobNodes(ArrayList<ArrayList<Lop>> jobNodes)
throws DMLRuntimeException
{
if (LOG.isTraceEnabled()){
@@ -2050,36 +2038,31 @@ public class Dag<N extends Lop>
* @param loc
* @return
*/
- private boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
- for (int i = 0; i < nodes.size(); i++) {
- if (nodes.get(i).getExecLocation() == ExecLocation.RecordReader)
+ private static boolean hasANode(ArrayList<Lop> nodes, ExecLocation loc) {
+ for ( Lop n : nodes ) {
+ if (n.getExecLocation() == ExecLocation.RecordReader)
return true;
}
return false;
}
- private ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes)
+ private ArrayList<ArrayList<Lop>> splitGMRNodesByRecordReader(ArrayList<Lop> gmrnodes)
{
// obtain the list of record reader nodes
- ArrayList<N> rrnodes = new ArrayList<N>();
- for (int i = 0; i < gmrnodes.size(); i++) {
- if (gmrnodes.get(i).getExecLocation() == ExecLocation.RecordReader)
- rrnodes.add(gmrnodes.get(i));
+ ArrayList<Lop> rrnodes = new ArrayList<Lop>();
+ for (Lop gmrnode : gmrnodes ) {
+ if (gmrnode.getExecLocation() == ExecLocation.RecordReader)
+ rrnodes.add(gmrnode);
}
- /*
- * We allocate one extra vector to hold lops that do not depend on any
- * recordreader lops
- */
- ArrayList<ArrayList<N>> splitGMR = createNodeVectors(rrnodes.size() + 1);
+ // We allocate one extra vector to hold lops that do not depend on any
+ // recordreader lops
+ ArrayList<ArrayList<Lop>> splitGMR = createNodeVectors(rrnodes.size() + 1);
- // flags to indicate whether a lop has been added to one of the node
- // vectors
+ // flags to indicate whether a lop has been added to one of the node vectors
boolean[] flags = new boolean[gmrnodes.size()];
- for (int i = 0; i < gmrnodes.size(); i++) {
- flags[i] = false;
- }
-
+ Arrays.fill(flags, false);
+
// first, obtain all ancestors of recordreader lops
for (int rrid = 0; rrid < rrnodes.size(); rrid++) {
// prepare node list for i^th record reader lop
@@ -2123,10 +2106,10 @@ public class Dag<N extends Lop>
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
- private void generateMRJobs(ArrayList<N> execNodes,
+ private void generateMRJobs(ArrayList<Lop> execNodes,
ArrayList<Instruction> inst,
ArrayList<Instruction> writeinst,
- ArrayList<Instruction> deleteinst, ArrayList<ArrayList<N>> jobNodes)
+ ArrayList<Instruction> deleteinst, ArrayList<ArrayList<Lop>> jobNodes)
throws LopsException, DMLUnsupportedOperationException,
DMLRuntimeException
@@ -2141,7 +2124,7 @@ public class Dag<N extends Lop>
continue;
int index = jt.getId(); // job id is used as an index into jobNodes
- ArrayList<N> currNodes = jobNodes.get(index);
+ ArrayList<Lop> currNodes = jobNodes.get(index);
// generate MR job
if (currNodes != null && !currNodes.isEmpty() ) {
@@ -2151,7 +2134,7 @@ public class Dag<N extends Lop>
if (jt.allowsRecordReaderInstructions() && hasANode(jobNodes.get(index), ExecLocation.RecordReader)) {
// split the nodes by recordReader lops
- ArrayList<ArrayList<N>> rrlist = splitGMRNodesByRecordReader(jobNodes.get(index));
+ ArrayList<ArrayList<Lop>> rrlist = splitGMRNodesByRecordReader(jobNodes.get(index));
for (int i = 0; i < rrlist.size(); i++) {
generateMapReduceInstructions(rrlist.get(i), inst, writeinst, deleteinst, rmvarinst, jt);
}
@@ -2161,7 +2144,7 @@ public class Dag<N extends Lop>
// We should split the nodes so that a separate job is produced for each shuffle instruction.
Lop.Type splittingLopType = jt.getShuffleLopType();
- ArrayList<N> nodesForASingleJob = new ArrayList<N>();
+ ArrayList<Lop> nodesForASingleJob = new ArrayList<Lop>();
for (int i = 0; i < jobNodes.get(index).size(); i++) {
if (jobNodes.get(index).get(i).getType() == splittingLopType) {
nodesForASingleJob.clear();
@@ -2207,8 +2190,8 @@ public class Dag<N extends Lop>
* @param node_v
* @param exec_n
*/
- private void addParents(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
- for (N enode : exec_n ) {
+ private void addParents(Lop node, ArrayList<Lop> node_v, ArrayList<Lop> exec_n) {
+ for (Lop enode : exec_n ) {
if (isChild(node, enode, IDMap)) {
if (!node_v.contains(enode)) {
if( LOG.isTraceEnabled() )
@@ -2226,8 +2209,7 @@ public class Dag<N extends Lop>
* @param node_v
* @param exec_n
*/
- @SuppressWarnings("unchecked")
- private void addChildren(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
+ private static void addChildren(Lop node, ArrayList<Lop> node_v, ArrayList<Lop> exec_n) {
// add child in exec nodes that is not of type scalar
if (exec_n.contains(node)
@@ -2244,8 +2226,7 @@ public class Dag<N extends Lop>
return;
// recurse
- for (int i = 0; i < node.getInputs().size(); i++) {
- N n = (N) node.getInputs().get(i);
+ for (Lop n : node.getInputs() ) {
addChildren(n, node_v, exec_n);
}
}
@@ -2257,7 +2238,7 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
- private OutputInfo getOutputInfo(N node, boolean cellModeOverride)
+ private static OutputInfo getOutputInfo(Lop node, boolean cellModeOverride)
throws LopsException
{
if ( (node.getDataType() == DataType.SCALAR && node.getExecType() == ExecType.CP)
@@ -2348,8 +2329,7 @@ public class Dag<N extends Lop>
* @throws DMLUnsupportedOperationException
* @throws LopsException
*/
- @SuppressWarnings("unchecked")
- private NodeOutput setupNodeOutputs(N node, ExecType et, boolean cellModeOverride, boolean copyTWrite)
+ private NodeOutput setupNodeOutputs(Lop node, ExecType et, boolean cellModeOverride, boolean copyTWrite)
throws DMLUnsupportedOperationException, DMLRuntimeException, LopsException {
OutputParameters oparams = node.getOutputParameters();
@@ -2469,7 +2449,7 @@ public class Dag<N extends Lop>
fnOutParams.getLabel(),
getFilePath() + fnOutParams.getLabel(),
true,
- OutputInfo.outputInfoToString(getOutputInfo((N)fnOut, false)),
+ OutputInfo.outputInfoToString(getOutputInfo(fnOut, false)),
new MatrixCharacteristics(fnOutParams.getNumRows(), fnOutParams.getNumCols(), (int)fnOutParams.getRowsInBlock(), (int)fnOutParams.getColsInBlock(), fnOutParams.getNnz()),
oparams.getUpdateInPlace()
);
@@ -2813,8 +2793,7 @@ public class Dag<N extends Lop>
* @throws DMLUnsupportedOperationException
* @throws DMLRuntimeException
*/
- @SuppressWarnings("unchecked")
- private void generateMapReduceInstructions(ArrayList<N> execNodes,
+ private void generateMapReduceInstructions(ArrayList<Lop> execNodes,
ArrayList<Instruction> inst, ArrayList<Instruction> writeinst, ArrayList<Instruction> deleteinst, ArrayList<Instruction> rmvarinst,
JobType jt) throws LopsException,
DMLUnsupportedOperationException, DMLRuntimeException
@@ -2848,7 +2827,7 @@ public class Dag<N extends Lop>
boolean cellModeOverride = false;
/* Find the nodes that produce an output */
- ArrayList<N> rootNodes = new ArrayList<N>();
+ ArrayList<Lop> rootNodes = new ArrayList<Lop>();
getOutputNodes(execNodes, rootNodes, jt);
if( LOG.isTraceEnabled() )
LOG.trace("# of root nodes = " + rootNodes.size());
@@ -2856,9 +2835,9 @@ public class Dag<N extends Lop>
/* Remove transient writes that are simple copy of transient reads */
if (jt == JobType.GMR || jt == JobType.GMRCELL) {
- ArrayList<N> markedNodes = new ArrayList<N>();
+ ArrayList<Lop> markedNodes = new ArrayList<Lop>();
// only keep data nodes that are results of some computation.
- for ( N rnode : rootNodes ) {
+ for ( Lop rnode : rootNodes ) {
if (rnode.getExecLocation() == ExecLocation.Data
&& ((Data) rnode).isTransient()
&& ((Data) rnode).getOperationType() == OperationTypes.WRITE
@@ -2881,11 +2860,11 @@ public class Dag<N extends Lop>
}
// structure that maps node to their indices that will be used in the instructions
- HashMap<N, Integer> nodeIndexMapping = new HashMap<N, Integer>();
+ HashMap<Lop, Integer> nodeIndexMapping = new HashMap<Lop, Integer>();
/* Determine all input data files */
- for ( N rnode : rootNodes ) {
+ for ( Lop rnode : rootNodes ) {
getInputPathsAndParameters(rnode, execNodes, inputs, inputInfos, numRows, numCols,
numRowsPerBlock, numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
}
@@ -2901,7 +2880,7 @@ public class Dag<N extends Lop>
// currently, recordreader instructions are allowed only in GMR jobs
if (jt == JobType.GMR || jt == JobType.GMRCELL) {
- for ( N rnode : rootNodes ) {
+ for ( Lop rnode : rootNodes ) {
getRecordReaderInstructions(rnode, execNodes, inputs, recordReaderInstructions,
nodeIndexMapping, start_index, inputLabels, inputLops, MRJobLineNumbers);
if ( recordReaderInstructions.size() > 1 )
@@ -2953,8 +2932,7 @@ public class Dag<N extends Lop>
ArrayList<String> aggInstructionsReducer = new ArrayList<String>();
ArrayList<String> otherInstructionsReducer = new ArrayList<String>();
- for (int i = 0; i < rootNodes.size(); i++) {
- N rn = rootNodes.get(i);
+ for( Lop rn : rootNodes ) {
int resultIndex = getAggAndOtherInstructions(
rn, execNodes, shuffleInstructions, aggInstructionsReducer,
otherInstructionsReducer, nodeIndexMapping, start_index,
@@ -2968,14 +2946,14 @@ public class Dag<N extends Lop>
) {
// Both rn (a transient write) and its input are root nodes.
// Instead of creating two copies of the data, simply generate a cpvar instruction
- NodeOutput out = setupNodeOutputs(rootNodes.get(i), ExecType.MR, cellModeOverride, true);
+ NodeOutput out = setupNodeOutputs(rn, ExecType.MR, cellModeOverride, true);
writeinst.addAll(out.getLastInstructions());
}
else {
resultIndices.add(Byte.valueOf((byte)resultIndex));
// setup output filenames and outputInfos and generate related instructions
- NodeOutput out = setupNodeOutputs(rootNodes.get(i), ExecType.MR, cellModeOverride, false);
+ NodeOutput out = setupNodeOutputs(rn, ExecType.MR, cellModeOverride, false);
outputLabels.add(out.getVarName());
outputs.add(out.getFileName());
outputInfos.add(out.getOutInfo());
@@ -2985,8 +2963,7 @@ public class Dag<N extends Lop>
renameInstructions.addAll(out.getLastInstructions());
variableInstructions.addAll(out.getPreInstructions());
postInstructions.addAll(out.getPostInstructions());
- }
-
+ }
}
/* Determine if the output dimensions are known */
@@ -3013,7 +2990,7 @@ public class Dag<N extends Lop>
numReducers = total_reducers;
// set inputs, outputs, and other other properties for the job
- mr.setInputOutputLabels(getStringArray(inputLabels), getStringArray(outputLabels));
+ mr.setInputOutputLabels(inputLabels.toArray(new String[0]), outputLabels.toArray(new String[0]));
mr.setOutputs(resultIndicesByte);
mr.setDimsUnknownFilePrefix(getFilePath());
@@ -3027,7 +3004,7 @@ public class Dag<N extends Lop>
//compute and set mapper memory requirements (for consistency of runtime piggybacking)
if( jt == JobType.GMR ) {
double mem = 0;
- for( N n : execNodes )
+ for( Lop n : execNodes )
mem += computeFootprintInMapper(n);
mr.setMemoryRequirements(mem);
}
@@ -3054,10 +3031,10 @@ public class Dag<N extends Lop>
for (Lop l : inputLops) {
if(DMLScript.ENABLE_DEBUG_MODE) {
- processConsumers((N)l, rmvarinst, deleteinst, (N)l);
+ processConsumers(l, rmvarinst, deleteinst, l);
}
else {
- processConsumers((N)l, rmvarinst, deleteinst, null);
+ processConsumers(l, rmvarinst, deleteinst, null);
}
}
}
@@ -3068,7 +3045,7 @@ public class Dag<N extends Lop>
* @param inputStrings
* @return
*/
- private String getCSVString(ArrayList<String> inputStrings) {
+ private static String getCSVString(ArrayList<String> inputStrings) {
StringBuilder sb = new StringBuilder();
for ( String str : inputStrings ) {
if( str != null ) {
@@ -3081,23 +3058,6 @@ public class Dag<N extends Lop>
}
/**
- * converts an array list of strings into an array of string
- *
- * @param list
- * @return
- */
- private String[] getStringArray(ArrayList<String> list) {
- String[] arr = new String[list.size()];
-
- for (int i = 0; i < arr.length; i++) {
- arr[i] = list.get(i);
- }
-
- return arr;
- }
-
-
- /**
* Method to populate aggregate and other instructions in reducer.
*
* @param node
@@ -3111,12 +3071,11 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
- @SuppressWarnings("unchecked")
- private int getAggAndOtherInstructions(N node, ArrayList<N> execNodes,
+ private int getAggAndOtherInstructions(Lop node, ArrayList<Lop> execNodes,
ArrayList<String> shuffleInstructions,
ArrayList<String> aggInstructionsReducer,
ArrayList<String> otherInstructionsReducer,
- HashMap<N, Integer> nodeIndexMapping, int[] start_index,
+ HashMap<Lop, Integer> nodeIndexMapping, int[] start_index,
ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
ArrayList<Integer> MRJobLineNumbers) throws LopsException
{
@@ -3137,15 +3096,15 @@ public class Dag<N extends Lop>
// are parameters for the WRITE operation), so we only need to take care of the
// first element.
if (node.getType() == Lop.Type.Data && ((Data)node).getOperationType() == Data.OperationTypes.WRITE) {
- ret_val = getAggAndOtherInstructions((N) node.getInputs().get(0),
+ ret_val = getAggAndOtherInstructions(node.getInputs().get(0),
execNodes, shuffleInstructions, aggInstructionsReducer,
otherInstructionsReducer, nodeIndexMapping, start_index,
inputLabels, inputLops, MRJobLineNumbers);
inputIndices.add(ret_val);
}
else {
- for (int i = 0; i < node.getInputs().size(); i++) {
- ret_val = getAggAndOtherInstructions((N) node.getInputs().get(i),
+ for ( Lop cnode : node.getInputs() ) {
+ ret_val = getAggAndOtherInstructions(cnode,
execNodes, shuffleInstructions, aggInstructionsReducer,
otherInstructionsReducer, nodeIndexMapping, start_index,
inputLabels, inputLops, MRJobLineNumbers);
@@ -3372,11 +3331,10 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
- @SuppressWarnings("unchecked")
- private int getRecordReaderInstructions(N node, ArrayList<N> execNodes,
+ private static int getRecordReaderInstructions(Lop node, ArrayList<Lop> execNodes,
ArrayList<String> inputStrings,
ArrayList<String> recordReaderInstructions,
- HashMap<N, Integer> nodeIndexMapping, int[] start_index,
+ HashMap<Lop, Integer> nodeIndexMapping, int[] start_index,
ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
ArrayList<Integer> MRJobLineNumbers) throws LopsException
{
@@ -3395,7 +3353,7 @@ public class Dag<N extends Lop>
// get mapper instructions
for (int i = 0; i < node.getInputs().size(); i++) {
// recurse
- N childNode = (N) node.getInputs().get(i);
+ Lop childNode = node.getInputs().get(i);
int ret_val = getRecordReaderInstructions(childNode, execNodes,
inputStrings, recordReaderInstructions, nodeIndexMapping,
start_index, inputLabels, inputLops, MRJobLineNumbers);
@@ -3474,11 +3432,10 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
- @SuppressWarnings("unchecked")
- private int getMapperInstructions(N node, ArrayList<N> execNodes,
+ private int getMapperInstructions(Lop node, ArrayList<Lop> execNodes,
ArrayList<String> inputStrings,
ArrayList<String> instructionsInMapper,
- HashMap<N, Integer> nodeIndexMapping, int[] start_index,
+ HashMap<Lop, Integer> nodeIndexMapping, int[] start_index,
ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
ArrayList<Integer> MRJobLineNumbers) throws LopsException
{
@@ -3493,10 +3450,7 @@ public class Dag<N extends Lop>
ArrayList<Integer> inputIndices = new ArrayList<Integer>();
int max_input_index = -1;
// get mapper instructions
- for (int i = 0; i < node.getInputs().size(); i++) {
-
- // recurse
- N childNode = (N) node.getInputs().get(i);
+ for( Lop childNode : node.getInputs()) {
int ret_val = getMapperInstructions(childNode, execNodes,
inputStrings, instructionsInMapper, nodeIndexMapping,
start_index, inputLabels, inputLops, MRJobLineNumbers);
@@ -3615,12 +3569,11 @@ public class Dag<N extends Lop>
}
// Method to populate inputs and also populates node index mapping.
- @SuppressWarnings("unchecked")
- private void getInputPathsAndParameters(N node, ArrayList<N> execNodes,
+ private static void getInputPathsAndParameters(Lop node, ArrayList<Lop> execNodes,
ArrayList<String> inputStrings, ArrayList<InputInfo> inputInfos,
ArrayList<Long> numRows, ArrayList<Long> numCols,
ArrayList<Long> numRowsPerBlock, ArrayList<Long> numColsPerBlock,
- HashMap<N, Integer> nodeIndexMapping, ArrayList<String> inputLabels,
+ HashMap<Lop, Integer> nodeIndexMapping, ArrayList<String> inputLabels,
ArrayList<Lop> inputLops, ArrayList<Integer> MRJobLineNumbers)
throws LopsException {
// treat rand as an input.
@@ -3674,15 +3627,12 @@ public class Dag<N extends Lop>
nodeInputInfo = InputInfo.BinaryBlockInputInfo;
else
throw new LopsException("Invalid format (" + node.getOutputParameters().getFormat() + ") encountered for a node/lop (ID=" + node.getID() + ") with blocked output.");
- // inputInfos.add(InputInfo.BinaryBlockInputInfo);
- } else {
+ }
+ else {
if (node.getOutputParameters().getFormat() == Format.TEXT)
nodeInputInfo = InputInfo.TextCellInputInfo;
- // inputInfos.add(InputInfo.TextCellInputInfo);
- else {
+ else
nodeInputInfo = InputInfo.BinaryCellInputInfo;
- // inputInfos.add(InputInfo.BinaryCellInputInfo);
- }
}
/*
@@ -3716,7 +3666,6 @@ public class Dag<N extends Lop>
}
inputInfos.add(nodeInputInfo);
-
nodeIndexMapping.put(node, inputStrings.size() - 1);
return;
@@ -3728,7 +3677,7 @@ public class Dag<N extends Lop>
// process children recursively
for ( Lop lop : node.getInputs() ) {
- getInputPathsAndParameters((N)lop, execNodes, inputStrings,
+ getInputPathsAndParameters(lop, execNodes, inputStrings,
inputInfos, numRows, numCols, numRowsPerBlock,
numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
}
@@ -3741,8 +3690,8 @@ public class Dag<N extends Lop>
* @param execNodes
* @param rootNodes
*/
- private void getOutputNodes(ArrayList<N> execNodes, ArrayList<N> rootNodes, JobType jt) {
- for ( N node : execNodes ) {
+ private static void getOutputNodes(ArrayList<Lop> execNodes, ArrayList<Lop> rootNodes, JobType jt) {
+ for ( Lop node : execNodes ) {
// terminal node
if (node.getOutputs().isEmpty() && !rootNodes.contains(node)) {
rootNodes.add(node);
@@ -3790,7 +3739,7 @@ public class Dag<N extends Lop>
* @param v
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
- private void doTopologicalSort_strict_order(ArrayList<N> v) {
+ private void doTopologicalSort_strict_order(ArrayList<Lop> v) {
//int numNodes = v.size();
/*
@@ -3802,7 +3751,7 @@ public class Dag<N extends Lop>
// Step1: Performed at the time of creating Lops
// Step2: sort nodes by level, and then by node ID
- Object[] nodearray = v.toArray();
+ Lop[] nodearray = v.toArray(new Lop[0]);
Arrays.sort(nodearray, new LopComparator());
// Copy sorted nodes into "v" and construct a mapping between Lop IDs and sequence of numbers
@@ -3810,7 +3759,7 @@ public class Dag<N extends Lop>
IDMap.clear();
for (int i = 0; i < nodearray.length; i++) {
- v.add((N) nodearray[i]);
+ v.add(nodearray[i]);
IDMap.put(v.get(i).getID(), i);
}
@@ -3830,7 +3779,7 @@ public class Dag<N extends Lop>
// print the nodes in sorted order
if (LOG.isTraceEnabled()) {
- for ( N vnode : v ) {
+ for ( Lop vnode : v ) {
StringBuilder sb = new StringBuilder();
sb.append(vnode.getID());
sb.append("(");
@@ -3859,8 +3808,7 @@ public class Dag<N extends Lop>
* @param root
* @param marked
*/
- @SuppressWarnings("unchecked")
- private void dagDFS(N root, boolean[] marked) {
+ private void dagDFS(Lop root, boolean[] marked) {
//contains check currently required for globalopt, will be removed when cleaned up
if( !IDMap.containsKey(root.getID()) )
return;
@@ -3870,40 +3818,40 @@ public class Dag<N extends Lop>
return;
marked[mapID] = true;
for( Lop lop : root.getOutputs() ) {
- dagDFS((N)lop, marked);
+ dagDFS(lop, marked);
}
}
- private boolean hasDirectChildNode(N node, ArrayList<N> childNodes) {
+ private static boolean hasDirectChildNode(Lop node, ArrayList<Lop> childNodes) {
if ( childNodes.isEmpty() )
return false;
- for( N cnode : childNodes ) {
+ for( Lop cnode : childNodes ) {
if ( cnode.getOutputs().contains(node))
return true;
}
return false;
}
- private boolean hasChildNode(N node, ArrayList<N> nodes) {
+ private boolean hasChildNode(Lop node, ArrayList<Lop> nodes) {
return hasChildNode(node, nodes, ExecLocation.INVALID);
}
- private boolean hasChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
+ private boolean hasChildNode(Lop node, ArrayList<Lop> childNodes, ExecLocation type) {
if ( childNodes.isEmpty() )
return false;
int index = IDMap.get(node.getID());
- for( N cnode : childNodes ) {
+ for( Lop cnode : childNodes ) {
if ( (type == ExecLocation.INVALID || cnode.getExecLocation() == type) && cnode.get_reachable()[index])
return true;
}
return false;
}
- private N getChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
+ private Lop getChildNode(Lop node, ArrayList<Lop> childNodes, ExecLocation type) {
if ( childNodes.isEmpty() )
return null;
int index = IDMap.get(node.getID());
- for( N cnode : childNodes ) {
+ for( Lop cnode : childNodes ) {
if ( cnode.getExecLocation() == type && cnode.get_reachable()[index])
return cnode;
}
@@ -3919,10 +3867,10 @@ public class Dag<N extends Lop>
* Returns null if no such "n" exists
*
*/
- private N getParentNode(N node, ArrayList<N> parentNodes, ExecLocation type) {
+ private Lop getParentNode(Lop node, ArrayList<Lop> parentNodes, ExecLocation type) {
if ( parentNodes.isEmpty() )
return null;
- for( N pn : parentNodes ) {
+ for( Lop pn : parentNodes ) {
int index = IDMap.get( pn.getID() );
if ( pn.getExecLocation() == type && node.get_reachable()[index])
return pn;
@@ -3932,25 +3880,25 @@ public class Dag<N extends Lop>
// Checks if "node" has any descendants in nodesVec with definedMRJob flag
// set to true
- private boolean hasMRJobChildNode(N node, ArrayList<N> nodesVec) {
+ private boolean hasMRJobChildNode(Lop node, ArrayList<Lop> nodesVec) {
if ( nodesVec.isEmpty() )
return false;
int index = IDMap.get(node.getID());
- for( N n : nodesVec ) {
+ for( Lop n : nodesVec ) {
if ( n.definesMRJob() && n.get_reachable()[index])
return true;
}
return false;
}
- private boolean checkDataGenAsChildNode(N node, ArrayList<N> nodesVec) {
+ private boolean checkDataGenAsChildNode(Lop node, ArrayList<Lop> nodesVec) {
if( nodesVec.isEmpty() )
return true;
int index = IDMap.get(node.getID());
boolean onlyDatagen = true;
- for( N n : nodesVec ) {
+ for( Lop n : nodesVec ) {
if ( n.definesMRJob() && n.get_reachable()[index] && JobType.findJobTypeFromLop(n) != JobType.DATAGEN )
onlyDatagen = false;
}
@@ -3958,11 +3906,9 @@ public class Dag<N extends Lop>
return onlyDatagen;
}
- @SuppressWarnings("unchecked")
- private int getChildAlignment(N node, ArrayList<N> execNodes, ExecLocation type)
+ private static int getChildAlignment(Lop node, ArrayList<Lop> execNodes, ExecLocation type)
{
- for (Lop lop : node.getInputs() ) {
- N n = (N) lop;
+ for (Lop n : node.getInputs() ) {
if (!execNodes.contains(n))
continue;
@@ -3993,10 +3939,10 @@ public class Dag<N extends Lop>
return MRCHILD_NOT_FOUND;
}
- private boolean hasParentNode(N node, ArrayList<N> parentNodes) {
+ private boolean hasParentNode(Lop node, ArrayList<Lop> parentNodes) {
if ( parentNodes.isEmpty() )
return false;
- for( N pnode : parentNodes ) {
+ for( Lop pnode : parentNodes ) {
int index = IDMap.get( pnode.getID() );
if ( node.get_reachable()[index])
return true;