You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/04 01:01:05 UTC

[3/4] incubator-systemml git commit: Cleanup lop piggybacking (unused code, visibility, logging, iterators)

Cleanup lop piggybacking (unused code, visibility, logging, iterators)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/3ea3cdbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/3ea3cdbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/3ea3cdbd

Branch: refs/heads/master
Commit: 3ea3cdbd4cd83b0dc144f232a69b51cc25bc82a9
Parents: ba08b4c
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 2 16:54:33 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 2 16:54:33 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/lops/compile/Dag.java | 872 ++++---------------
 1 file changed, 149 insertions(+), 723 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ea3cdbd/src/main/java/org/apache/sysml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java
index 3b4cfdb..f5693ef 100644
--- a/src/main/java/org/apache/sysml/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -92,7 +91,6 @@ import org.apache.sysml.runtime.matrix.sort.PickFromCompactInputFormat;
  */
 public class Dag<N extends Lop> 
 {
-	
 	private static final Log LOG = LogFactory.getLog(Dag.class.getName());
 
 	private static final int CHILD_BREAKS_ALIGNMENT = 2;
@@ -106,7 +104,7 @@ public class Dag<N extends Lop>
 	
 	private int total_reducers = -1;
 	private String scratch = "";
-	private String scratchFilePath = "";
+	private String scratchFilePath = null;
 	
 	private double gmrMapperFootprint = 0;
 	
@@ -185,7 +183,7 @@ public class Dag<N extends Lop>
 	}
 	
 	private String getFilePath() {
-		if ( scratchFilePath.equalsIgnoreCase("") ) {
+		if ( scratchFilePath == null ) {
 			scratchFilePath = scratch + Lop.FILE_SEPARATOR
 								+ Lop.PROCESS_PREFIX + DMLScript.getUUID()
 								+ Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR
@@ -209,6 +207,20 @@ public class Dag<N extends Lop>
 	}
 
 	/**
+	 * Method to add a node to the DAG.
+	 * 
+	 * @param node
+	 * @return true if node was not already present, false if not.
+	 */
+
+	public boolean addNode(N node) {
+		if (nodes.contains(node))
+			return false;		
+		nodes.add(node);
+		return true;
+	}
+	
+	/**
 	 * 
 	 * @param config
 	 * @return
@@ -262,83 +274,6 @@ public class Dag<N extends Lop>
 
 	}
 
-	/**
-	 * Method to remove transient reads that do not have a transient write
-	 * 
-	 * @param nodeV
-	 * @param inst
-	 * @throws DMLUnsupportedOperationException
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("unused")
-	private void deleteUnwantedTransientReadVariables(ArrayList<N> nodeV,
-			ArrayList<Instruction> inst) throws DMLRuntimeException,
-			DMLUnsupportedOperationException {
-		HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
-		
-		LOG.trace("In delete unwanted variables");
-
-		// first capture all transient read variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
-
-			if (node.getExecLocation() == ExecLocation.Data
-					&& ((Data) node).isTransient()
-					&& ((Data) node).getOperationType() == OperationTypes.READ) {
-				labelNodeMapping.put(node.getOutputParameters().getLabel(),
-						node);
-			}
-		}
-
-		// generate delete instructions for all transient read variables without
-		// a transient write
-		// first capture all transient write variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
-
-			if (node.getExecLocation() == ExecLocation.Data
-					&& ((Data) node).isTransient()
-					&& ((Data) node).getOperationType() == OperationTypes.WRITE ) {
-				if (node.getInputs().get(0).getExecLocation() == ExecLocation.Data) {
-					// this transient write is NOT a result of actual computation, but is simple copy.
-					// in such a case, preserve the input variable so that appropriate copy instruction (cpvar or GMR) is generated
-					// therefore, remove the input label from labelNodeMapping
-					labelNodeMapping.remove(node.getInputs().get(0).getOutputParameters().getLabel());
-				}
-				else {
-					if(labelNodeMapping.containsKey(node.getOutputParameters().getLabel())) 
-						// corresponding transient read exists (i.e., the variable is updated)
-						// in such a case, generate rmvar instruction so that OLD data gets deleted
-						labelNodeMapping.remove(node.getOutputParameters().getLabel());
-				}
-			}
-		}
-
-		// generate RM instructions
-		Instruction rm_inst = null;
-		for( Entry<String, N> e : labelNodeMapping.entrySet() ) {
-			String label = e.getKey();
-			N node = e.getValue();
-
-			if (((Data) node).getDataType() == DataType.SCALAR) {
-				// if(DEBUG)
-				// System.out.println("rmvar" + Lops.OPERAND_DELIMITOR + label);
-				// inst.add(new VariableSimpleInstructions("rmvar" +
-				// Lops.OPERAND_DELIMITOR + label));
-
-			} else {
-				rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
-				rm_inst.setLocation(node);
-				
-				if( LOG.isTraceEnabled() )
-					LOG.trace(rm_inst.toString());
-				inst.add(rm_inst);
-				
-			}
-		}
-
-	}
-
 	private void deleteUpdatedTransientReadVariables(StatementBlock sb, ArrayList<N> nodeV,
 			ArrayList<Instruction> inst) throws DMLRuntimeException,
 			DMLUnsupportedOperationException {
@@ -348,20 +283,6 @@ public class Dag<N extends Lop>
 		
 		LOG.trace("In delete updated variables");
 
-		/*
-		Set<String> in = sb.liveIn().getVariables().keySet();
-		Set<String> out = sb.liveOut().getVariables().keySet();
-		Set<String> updated = sb.variablesUpdated().getVariables().keySet();
-		
-		Set<String> intersection = in;
-		intersection.retainAll(out);
-		intersection.retainAll(updated);
-		
-		for (String var : intersection) {
-			inst.add(VariableCPInstruction.prepareRemoveInstruction(var));
-		}
-		*/
-
 		// CANDIDATE list of variables which could have been updated in this statement block 
 		HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
 		
@@ -371,8 +292,7 @@ public class Dag<N extends Lop>
 		HashMap<String, N> updatedLabelsLineNum =  new HashMap<String, N>();
 		
 		// first capture all transient read variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
+		for ( N node : nodeV ) {
 
 			if (node.getExecLocation() == ExecLocation.Data
 					&& ((Data) node).isTransient()
@@ -399,8 +319,7 @@ public class Dag<N extends Lop>
 		}
 
 		// capture updated transient write variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
+		for ( N node : nodeV ) {
 
 			if (node.getExecLocation() == ExecLocation.Data
 					&& ((Data) node).isTransient()
@@ -422,8 +341,8 @@ public class Dag<N extends Lop>
 			rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
 			rm_inst.setLocation(updatedLabelsLineNum.get(label));
 			
-			
-			LOG.trace(rm_inst.toString());
+			if( LOG.isTraceEnabled() )
+				LOG.trace(rm_inst.toString());
 			inst.add(rm_inst);
 		}
 
@@ -479,23 +398,6 @@ public class Dag<N extends Lop>
 		}*/
 	}
 
-	/**
-	 * Method to add a node to the DAG.
-	 * 
-	 * @param node
-	 * @return true if node was not already present, false if not.
-	 */
-
-	public boolean addNode(N node) {
-		if (nodes.contains(node))
-			return false;
-		else {
-			nodes.add(node);
-			return true;
-		}
-		
-	}
-
 	private ArrayList<ArrayList<N>> createNodeVectors(int size) {
 		ArrayList<ArrayList<N>> arr = new ArrayList<ArrayList<N>>();
 
@@ -508,8 +410,8 @@ public class Dag<N extends Lop>
 	}
 
 	private void clearNodeVectors(ArrayList<ArrayList<N>> arr) {
-		for (int i = 0; i < arr.size(); i++) {
-			arr.get(i).clear();
+		for (ArrayList<N> tmp : arr) {
+			tmp.clear();
 		}
 	}
 
@@ -519,9 +421,10 @@ public class Dag<N extends Lop>
 		int base = jt.getBase();
 
 		for (int i = from; i < to; i++) {
-			if ((nodes.get(i).getCompatibleJobs() & base) == 0) {
+			N node = nodes.get(i);
+			if ((node.getCompatibleJobs() & base) == 0) {
 				if( LOG.isTraceEnabled() )
-					LOG.trace("Not compatible "+ nodes.get(i).toString());
+					LOG.trace("Not compatible "+ node.toString());
 				return false;
 			}
 		}
@@ -665,7 +568,6 @@ public class Dag<N extends Lop>
 	 * @param jobNodes
 	 * @throws LopsException
 	 */
-
 	private void handleSingleOutputJobs(ArrayList<N> execNodes,
 			ArrayList<ArrayList<N>> jobNodes, ArrayList<N> finishedNodes)
 			throws LopsException {
@@ -685,9 +587,7 @@ public class Dag<N extends Lop>
 			if (!jobNodes.get(jindex).isEmpty()) {
 				ArrayList<N> vec = jobNodes.get(jindex);
 
-				// first find all nodes with more than one parent that is not
-				// finished.
-
+				// first find all nodes with more than one parent that is not finished.
 				for (int i = 0; i < vec.size(); i++) {
 					N node = vec.get(i);
 					if (node.getExecLocation() == ExecLocation.MapOrReduce
@@ -705,24 +605,10 @@ public class Dag<N extends Lop>
 							}
 						}
 					} 
-					/*
-					// Following condition will not work for MMRJ because execNodes may contain non-MapAndReduce 
-					// lops that are compatible with MMRJ (e.g., Data-WRITE)
-					else if (!(node.getExecLocation() == ExecLocation.MapAndReduce 
-							     && node.getType() == lopTypes[jobi])) {
-						throw new LopsException(
-								"Error in handleSingleOutputJobs(): encountered incompatible execLocation='"
-										+ node.getExecLocation() + "' for lop (ID="
-										+ node.getID() + ").");
-					}
-					*/
 				}
 
-				// need to redo all nodes in nodesWithOutput as well as their
-				// children
-
-				for (int i = 0; i < vec.size(); i++) {
-					N node = vec.get(i);
+				// need to redo all nodes in nodesWithOutput as well as their children
+				for ( N node : vec ) {
 					if (node.getExecLocation() == ExecLocation.MapOrReduce
 							|| node.getExecLocation() == ExecLocation.Map) {
 						if (nodesWithUnfinishedOutputs.contains(node))
@@ -798,14 +684,6 @@ public class Dag<N extends Lop>
 		}
 	}
 	
-	/*private String getOutputFormat(Data node) { 
-		if(node.getFileFormatType() == FileFormatTypes.TEXT)
-			return "textcell";
-		else if ( node.getOutputParameters().isBlocked_representation() )
-			return "binaryblock";
-		else
-			return "binarycell";
-	}*/
 	
 	/**
 	 * Determine whether to send <code>node</code> to MR or to process it in the control program.
@@ -922,7 +800,6 @@ public class Dag<N extends Lop>
 	 * @throws DMLUnsupportedOperationException
 	 * @throws DMLRuntimeException
 	 */
-
 	@SuppressWarnings("unchecked")
 	private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, ArrayList<N> node_v)
 			throws LopsException, IOException, DMLRuntimeException,
@@ -940,7 +817,6 @@ public class Dag<N extends Lop>
 
 		ArrayList<ArrayList<N>> jobNodes = createNodeVectors(JobType.getNumJobTypes());
 		
-
 		// list of instructions
 		ArrayList<Instruction> inst = new ArrayList<Instruction>();
 
@@ -949,9 +825,6 @@ public class Dag<N extends Lop>
 		ArrayList<Instruction> deleteInst = new ArrayList<Instruction>();
 		ArrayList<Instruction> endOfBlockInst = new ArrayList<Instruction>();
 
-		// delete transient variables that are no longer needed
-		//deleteUnwantedTransientReadVariables(node_v, deleteInst);
-
 		// remove files for transient reads that are updated.
 		deleteUpdatedTransientReadVariables(sb, node_v, writeInst);
 		
@@ -1004,9 +877,6 @@ public class Dag<N extends Lop>
 					if( LOG.isTraceEnabled() )
 						LOG.trace(indent + "Queueing node "
 								+ node.toString() + " (code 2)");
-					//for(N q: queuedNodes) {
-					//	LOG.trace(indent + "  " + q.getType() + "," + q.getID());
-					//}
 					queuedNodes.add(node);
 
 					// if node has more than two inputs,
@@ -1043,22 +913,6 @@ public class Dag<N extends Lop>
 					}
 				}
 
-				/*if (node.getInputs().size() == 2) {
-					int j1 = jobType(node.getInputs().get(0), jobNodes);
-					int j2 = jobType(node.getInputs().get(1), jobNodes);
-					if (j1 != -1 && j2 != -1 && j1 != j2) {
-						LOG.trace(indent + "Queueing node "
-									+ node.toString() + " (code 3)");
-
-						queuedNodes.add(node);
-
-						removeNodesForNextIteration(node, finishedNodes,
-								execNodes, queuedNodes, jobNodes);
-
-						continue;
-					}
-				}*/
-
 				// See if this lop can be eliminated
 				// This check is for "aligner" lops (e.g., group)
 				boolean eliminate = false;
@@ -1267,13 +1121,6 @@ public class Dag<N extends Lop>
 
 				// reduce node, make sure no parent needs reduce, else queue
 				if (node.getExecLocation() == ExecLocation.MapAndReduce) {
-
-					// boolean eliminate = false;
-					// eliminate = canEliminateLop(node, execNodes);
-					// if (eliminate || (!hasChildNode(node, execNodes,
-					// ExecLocation.MapAndReduce)) &&
-					// !hasMRJobChildNode(node,execNodes)) {
-
 					// TODO: statiko -- keep the middle condition
 					// discuss about having a lop that is MapAndReduce but does
 					// not define a job
@@ -1282,14 +1129,7 @@ public class Dag<N extends Lop>
 					execNodes.add(node);
 					finishedNodes.add(node);
 					addNodeByJobType(node, jobNodes, execNodes, eliminate);
-
-					// } else {
-					// if (DEBUG)
-					// System.out.println("Queueing -" + node.toString());
-					// queuedNodes.add(node);
-					// removeNodesForNextIteration(node, finishedNodes,
-					// execNodes, queuedNodes, jobNodes);
-					// }
+					
 					continue;
 				}
 
@@ -1319,10 +1159,10 @@ public class Dag<N extends Lop>
 				// add Scalar to execNodes if it has no child in exec nodes
 				// that will be executed in a MR job.
 				if (node.getExecLocation() == ExecLocation.ControlProgram) {
-					for (int j = 0; j < node.getInputs().size(); j++) {
-						if (execNodes.contains(node.getInputs().get(j))
-								&& !(node.getInputs().get(j).getExecLocation() == ExecLocation.Data)
-								&& !(node.getInputs().get(j).getExecLocation() == ExecLocation.ControlProgram)) {
+					for ( Lop lop : node.getInputs() ) {
+						if (execNodes.contains(lop)
+								&& !(lop.getExecLocation() == ExecLocation.Data)
+								&& !(lop.getExecLocation() == ExecLocation.ControlProgram)) {
 							if( LOG.isTraceEnabled() )
 								LOG.trace(indent + "Queueing -"+ node.toString() + " (code 9)");
 
@@ -1396,11 +1236,8 @@ public class Dag<N extends Lop>
 				// next generate MR instructions
 				if (!execNodes.isEmpty())
 					generateMRJobs(execNodes, inst, writeInst, deleteInst, jobNodes);
-
 				handleSingleOutputJobs(execNodes, jobNodes, finishedNodes);
-
 			}
-
 		}
 
 		// add write and delete inst at the very end.
@@ -1415,9 +1252,7 @@ public class Dag<N extends Lop>
 	}
 
 	private boolean compatibleWithChildrenInExecNodes(ArrayList<N> execNodes, N node) {
-	  for(int i=0; i < execNodes.size(); i++)
-	  {
-	    N tmpNode = execNodes.get(i);
+	  for( N tmpNode : execNodes ) {
 	    // for lops that execute in control program, compatibleJobs property is set to LopProperties.INVALID
 	    // we should not consider such lops in this check
 	    if (isChild(tmpNode, node, IDMap) 
@@ -1466,11 +1301,6 @@ public class Dag<N extends Lop>
 			else {
 				processConsumers((N)in, inst, delteInst, null);
 			}
-			
-			/*if ( in.removeConsumer() == 0 ) {
-				String label = in.getOutputParameters().getLabel();
-				inst.add(VariableCPInstruction.prepareRemoveInstruction(label));
-			}*/
 		}
 	}
 	
@@ -1506,7 +1336,6 @@ public class Dag<N extends Lop>
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-
 	@SuppressWarnings("unchecked")
 	private void generateControlProgramJobs(ArrayList<N> execNodes,
 			ArrayList<Instruction> inst, ArrayList<Instruction> writeInst, ArrayList<Instruction> deleteInst) throws LopsException,
@@ -1568,8 +1397,6 @@ public class Dag<N extends Lop>
 					else {
 						var_deletions.add(node.getOutputParameters().getLabel());
 						var_deletionsLineNum.put(node.getOutputParameters().getLabel(), node);
-						
-						//System.out.println("    --> skipping " + out.getLastInstructions() + " while processing node " + node.getID());
 					}
 				}
 
@@ -1768,10 +1595,9 @@ public class Dag<N extends Lop>
 		}
 
 		// delete all marked nodes
-		for (int i = 0; i < markedNodes.size(); i++) {
-			execNodes.remove(markedNodes.get(i));
+		for ( N node : markedNodes ) {
+			execNodes.remove(node);
 		}
-
 	}
 
 	/**
@@ -1784,7 +1610,6 @@ public class Dag<N extends Lop>
 	 * @param queuedNodes
 	 * @throws LopsException
 	 */
-
 	private void removeNodesForNextIteration(N node, ArrayList<N> finishedNodes,
 			ArrayList<N> execNodes, ArrayList<N> queuedNodes,
 			ArrayList<ArrayList<N>> jobvec) throws LopsException {
@@ -1794,10 +1619,9 @@ public class Dag<N extends Lop>
 			return;
 		
 		//if all children are queued, then there is nothing to do.
-		int numInputs = node.getInputs().size();
 		boolean allQueued = true;
-		for(int i=0; i < numInputs; i++) {
-			if( !queuedNodes.contains(node.getInputs().get(i)) ) {
+		for( Lop input : node.getInputs() ) {
+			if( !queuedNodes.contains(input) ) {
 				allQueued = false;
 				break;
 			}
@@ -1811,8 +1635,8 @@ public class Dag<N extends Lop>
 		// Determine if <code>node</code> has inputs from the same job or multiple jobs
 	    int jobid = Integer.MIN_VALUE;
 		boolean inputs_in_same_job = true;
-		for(int idx=0; idx < node.getInputs().size(); idx++) {
-			int input_jobid = jobType(node.getInputs().get(idx), jobvec);
+		for( Lop input : node.getInputs() ) {
+			int input_jobid = jobType(input, jobvec);
 			if ( jobid == Integer.MIN_VALUE )
 				jobid = input_jobid;
 			else if ( jobid != input_jobid ) { 
@@ -1824,8 +1648,7 @@ public class Dag<N extends Lop>
 		// Determine if there exist any unassigned inputs to <code>node</code>
 		// Evaluate only those lops that execute in MR.
 		boolean unassigned_inputs = false;
-		for(int i=0; i < numInputs; i++) {
-			Lop input = node.getInputs().get(i);
+		for( Lop input : node.getInputs() ) {
 			//if ( input.getExecLocation() != ExecLocation.ControlProgram && jobType(input, jobvec) == -1 ) {
 			if ( input.getExecType() == ExecType.MR && !execNodes.contains(input)) { //jobType(input, jobvec) == -1 ) {
 				unassigned_inputs = true;
@@ -1835,8 +1658,8 @@ public class Dag<N extends Lop>
 
 		// Determine if any node's children are queued
 		boolean child_queued = false;
-		for(int i=0; i < numInputs; i++) {
-			if (queuedNodes.contains(node.getInputs().get(i)) ) {
+		for( Lop input : node.getInputs() ) {
+			if (queuedNodes.contains(input) ) {
 				child_queued = true;
 				break;
 			}
@@ -1940,14 +1763,16 @@ public class Dag<N extends Lop>
 		} // for i
 
 		// we also need to delete all parent nodes of marked nodes
-		for (int i = 0; i < execNodes.size(); i++) {
-			LOG.trace("  Checking for removal - ("
-						+ execNodes.get(i).getID() + ") "
-						+ execNodes.get(i).toString());
-
-			if (hasChildNode(execNodes.get(i), markedNodes) && !markedNodes.contains(execNodes.get(i))) {
-				markedNodes.add(execNodes.get(i));
-				LOG.trace("    Removing for next iteration (code 6) (" + execNodes.get(i).getID() + ") " + execNodes.get(i).toString());
+		for ( N enode : execNodes ) {
+			if( LOG.isTraceEnabled() ) {
+				LOG.trace("  Checking for removal - ("
+							+ enode.getID() + ") " + enode.toString());
+			}
+			
+			if (hasChildNode(enode, markedNodes) && !markedNodes.contains(enode)) {
+				markedNodes.add(enode);
+				if( LOG.isTraceEnabled() )
+					LOG.trace("    Removing for next iteration (code 6) (" + enode.getID() + ") " + enode.toString());
 			}
 		}
 
@@ -1963,193 +1788,6 @@ public class Dag<N extends Lop>
 				queuedNodes.add(n);
 			}
 		}
-		/*for (int i = 0; i < markedNodes.size(); i++) {
-			finishedNodes.remove(markedNodes.elementAt(i));
-			execNodes.remove(markedNodes.elementAt(i));
-			removeNodeByJobType(markedNodes.elementAt(i), jobvec);
-			queuedNodes.add(markedNodes.elementAt(i));
-		}*/
-	}
-
-	@SuppressWarnings("unused")
-	private void removeNodesForNextIterationOLD(N node, ArrayList<N> finishedNodes,
-			ArrayList<N> execNodes, ArrayList<N> queuedNodes,
-			ArrayList<ArrayList<N>> jobvec) throws LopsException {
-		// only queued nodes with two inputs need to be handled.
-
-		// TODO: statiko -- this should be made == 1
-		if (node.getInputs().size() != 2)
-			return;
-		
-		
-		//if both children are queued, nothing to do. 
-	    if (queuedNodes.contains(node.getInputs().get(0))
-	        && queuedNodes.contains(node.getInputs().get(1)))
-	      return;
-	    
-	    if( LOG.isTraceEnabled() )
-	    	LOG.trace("Before remove nodes for next iteration -- size of execNodes " + execNodes.size());
- 
-	    
-
-		boolean inputs_in_same_job = false;
-		// TODO: handle tertiary
-		if (jobType(node.getInputs().get(0), jobvec) != -1
-				&& jobType(node.getInputs().get(0), jobvec) == jobType(node
-						.getInputs().get(1), jobvec)) {
-			inputs_in_same_job = true;
-		}
-
-		boolean unassigned_inputs = false;
-		// here.. scalars shd be ignored
-		if ((node.getInputs().get(0).getExecLocation() != ExecLocation.ControlProgram && jobType(
-				node.getInputs().get(0), jobvec) == -1)
-				|| (node.getInputs().get(1).getExecLocation() != ExecLocation.ControlProgram && jobType(
-						node.getInputs().get(1), jobvec) == -1)) {
-			unassigned_inputs = true;
-		}
-
-		boolean child_queued = false;
-
-		// check if atleast one child was queued.
-		if (queuedNodes.contains(node.getInputs().get(0))
-				|| queuedNodes.contains(node.getInputs().get(1)))
-			child_queued = true;
-
-		// nodes to be dropped
-		ArrayList<N> markedNodes = new ArrayList<N>();
-
-		for (int i = 0; i < execNodes.size(); i++) {
-
-			N tmpNode = execNodes.get(i);
-
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("Checking for removal (" + tmpNode.getID()
-						+ ") " + tmpNode.toString());
-				LOG.trace("Inputs in same job " + inputs_in_same_job);
-				LOG.trace("Unassigned inputs " + unassigned_inputs);
-				LOG.trace("Child queued " + child_queued);
-
-			}
-			
-			//if(node.definesMRJob() && isChild(tmpNode,node) && (tmpNode.getCompatibleJobs() & node.getCompatibleJobs()) == 0)
-			//  continue;
-
-			// TODO: statiko -- check if this is too conservative?
-			if (child_queued) {
-  	     // if one of the children are queued, 
-	       // remove some child nodes on other leg that may be needed later on. 
-	       // For e.g. Group lop. 
- 
-			  if((tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) && 
-			      tmpNode.isAligner())
-			  {
-			    markedNodes.add(tmpNode);
-			    if( LOG.isTraceEnabled() )
-			    	LOG.trace("Removing for next iteration: ("
-			    			+ tmpNode.getID() + ") " + tmpNode.toString());
-			  }
-			  else {
-				if (!hasOtherQueuedParentNode(tmpNode, queuedNodes, node) 
-						&& isChild(tmpNode, node, IDMap)  && branchHasNoOtherUnExecutedParents(tmpNode, node, execNodes, finishedNodes)) {
-
-	
-				    if( 
-				        //e.g. MMCJ
-				        (node.getExecLocation() == ExecLocation.MapAndReduce &&
-						branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes) && !tmpNode.definesMRJob() )
-						||
-						//e.g. Binary
-						(node.getExecLocation() == ExecLocation.Reduce && branchCanBePiggyBackedReduce(tmpNode, node, execNodes, finishedNodes))  )
-					{
-					
-				    	if( LOG.isTraceEnabled() )
-							LOG.trace("Removing for next iteration: ("
-								    + tmpNode.getID() + ") " + tmpNode.toString());
-					
-					
-	        			markedNodes.add(tmpNode);	
-					}
-				 }
-			  }
-			} else {
-				/*
-				 * "node" has no other queued children.
-				 * 
-				 * If inputs are in the same job and "node" is of type
-				 * MapAndReduce, then remove nodes of all types other than
-				 * Reduce, MapAndReduce, and the ones that define a MR job as
-				 * they can be piggybacked later.
-				 * 
-				 * e.g: A=Rand, B=Rand, C=A%*%B Here, both inputs of MMCJ lop
-				 * come from Rand job, and they should not be removed.
-				 * 
-				 * Other examples: -- MMCJ whose children are of type
-				 * MapAndReduce (say GMR) -- Inputs coming from two different
-				 * jobs .. GMR & REBLOCK
-				 */
-				if ((inputs_in_same_job || unassigned_inputs)
-						&& node.getExecLocation() == ExecLocation.MapAndReduce
-						&& !hasOtherMapAndReduceParentNode(tmpNode, execNodes,
-								node)
-						&& isChild(tmpNode, node, IDMap) &&
-						branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes)
-						&& !tmpNode.definesMRJob()) {
-					if( LOG.isTraceEnabled() )
-						LOG.trace("Removing for next iteration:: ("
-								+ tmpNode.getID() + ") " + tmpNode.toString());
-
-					markedNodes.add(tmpNode);
-				}
-
-				// as this node has inputs coming from different jobs, need to
-				// free up everything
-				// below and include the closest MapAndReduce lop if this is of
-				// type Reduce.
-				// if it is of type MapAndReduce, don't need to free any nodes
-
-				if (!inputs_in_same_job && !unassigned_inputs
-						&& isChild(tmpNode, node, IDMap) && 
-						(tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) && 
-		            tmpNode.isAligner()) 
-				{
-					if( LOG.isTraceEnabled() )
-						LOG.trace("Removing for next iteration ("
-										+ tmpNode.getID()
-										+ ") "
-										+ tmpNode.toString());
-
-					markedNodes.add(tmpNode);
-
-				}
-
-			}
-		}
-
-		// we also need to delete all parent nodes of marked nodes
-		for (int i = 0; i < execNodes.size(); i++) {
-			if( LOG.isTraceEnabled() )
-				LOG.trace("Checking for removal - ("
-						+ execNodes.get(i).getID() + ") "
-						+ execNodes.get(i).toString());
-
-			if (hasChildNode(execNodes.get(i), markedNodes)) {
-				markedNodes.add(execNodes.get(i));
-				if( LOG.isTraceEnabled() )
-					LOG.trace("Removing for next iteration - ("
-							+ execNodes.get(i).getID() + ") "
-							+ execNodes.get(i).toString());
-			}
-		}
-
-		// delete marked nodes from finishedNodes and execNodes
-		// add to queued nodes
-		for (int i = 0; i < markedNodes.size(); i++) {
-			finishedNodes.remove(markedNodes.get(i));
-			execNodes.remove(markedNodes.get(i));
-			removeNodeByJobType(markedNodes.get(i), jobvec);
-			queuedNodes.add(markedNodes.get(i));
-		}
 	}
 
 	private boolean branchCanBePiggyBackedReduce(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
@@ -2162,9 +1800,7 @@ public class Dag<N extends Lop>
 				return false;
 		}
 		
-		for(int i=0; i < execNodes.size(); i++) {
-		   N n = execNodes.get(i);
-       
+		for( N n : execNodes ) {
 		   if(n.equals(node))
 			   continue;
        
@@ -2177,10 +1813,6 @@ public class Dag<N extends Lop>
 				   && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
 				   return false;
 		   }
-		   /*if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap) 
-				   && !node.getInputs().contains(tmpNode) 
-				   && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
-			   return false;*/
 	   }
 	   return true;
 	}
@@ -2258,9 +1890,7 @@ public class Dag<N extends Lop>
 			return false;
 		JobType jt = JobType.findJobTypeFromLop(node);
 
-		for (int i = 0; i < execNodes.size(); i++) {
-			N n = execNodes.get(i);
-
+		for ( N n : execNodes ) {
 			if (n.equals(node))
 				continue;
 
@@ -2274,16 +1904,6 @@ public class Dag<N extends Lop>
 				else if (!isCompatible(n, jt))
 					return false;
 			}
-
-			/*
-			 * if(n.equals(tmpNode) && n.getExecLocation() != ExecLocation.Map
-			 * && n.getExecLocation() != ExecLocation.MapOrReduce) return false;
-			 * 
-			 * if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap) &&
-			 * n.getExecLocation() != ExecLocation.Map && n.getExecLocation() !=
-			 * ExecLocation.MapOrReduce) return false;
-			 */
-
 		}
 		return true;
 	}
@@ -2305,10 +1925,7 @@ public class Dag<N extends Lop>
 	  }
 	  
 	  //check to see if any node between node and tmpNode has more than one unfinished output 
-	  for(int i=0; i < execNodes.size(); i++)
-	  {
-	    N n = execNodes.get(i);
-	    
+	  for( N n : execNodes ) {
 	    if(n.equals(node) || n.equals(tmpNode))
 	      continue;
 	    
@@ -2328,36 +1945,6 @@ public class Dag<N extends Lop>
 	  return true;
   }
 
-  /**
-	 * Method to check of there is a node of type MapAndReduce between a child
-	 * (tmpNode) and its parent (node)
-	 * 
-	 * @param tmpNode
-	 * @param execNodes
-	 * @param node
-	 * @return
-	 */
-
-	@SuppressWarnings({ "unchecked", "unused" })
-	private boolean hasMapAndReduceParentNode(N tmpNode, ArrayList<N> execNodes, N node) 
-	{
-		for (int i = 0; i < tmpNode.getOutputs().size(); i++) {
-			N n = (N) tmpNode.getOutputs().get(i);
-
-			if (execNodes.contains(n)
-					&& n.getExecLocation() == ExecLocation.MapAndReduce
-					&& isChild(n, node, IDMap)) {
-				return true;
-			} else {
-				if (hasMapAndReduceParentNode(n, execNodes, node))
-					return true;
-			}
-
-		}
-
-		return false;
-	}
-
 	/**
 	 * Method to return the job index for a lop.
 	 * 
@@ -2366,7 +1953,6 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-
 	private int jobType(Lop lops, ArrayList<ArrayList<N>> jobvec) throws LopsException {
 		for ( JobType jt : JobType.values()) {
 			int i = jt.getId();
@@ -2386,7 +1972,6 @@ public class Dag<N extends Lop>
 	 * @param node
 	 * @return
 	 */
-
 	@SuppressWarnings("unchecked")
 	private boolean hasOtherMapAndReduceParentNode(N tmpNode,
 			ArrayList<N> nodeList, N node) {
@@ -2425,10 +2010,9 @@ public class Dag<N extends Lop>
 		long nodeid = IDMap.get(node.getID());
 		long tmpid = IDMap.get(tmpNode.getID());
 		
-		for ( int i=0; i < queuedNodes.size(); i++ ) {
-			int id = IDMap.get(queuedNodes.get(i).getID());
+		for ( N qnode : queuedNodes ) {
+			int id = IDMap.get(qnode.getID());
 			if ((id != nodeid && nodeMarked[id]) && (id != tmpid && tmpMarked[id]) )
-			//if (nodeMarked[id] && tmpMarked[id])
 				return true;
 		}
 		
@@ -2441,8 +2025,9 @@ public class Dag<N extends Lop>
 	 * @param jobNodes
 	 * @throws DMLRuntimeException
 	 */
-	void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
-			throws DMLRuntimeException {
+	private void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
+		throws DMLRuntimeException 
+	{
 		if (LOG.isTraceEnabled()){
 			for ( JobType jt : JobType.values() ) {
 				int i = jt.getId();
@@ -2458,8 +2043,6 @@ public class Dag<N extends Lop>
 			}
 			
 		}
-
-		
 	}
 
 	/**
@@ -2469,7 +2052,7 @@ public class Dag<N extends Lop>
 	 * @param loc
 	 * @return
 	 */
-	boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
+	private boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
 		for (int i = 0; i < nodes.size(); i++) {
 			if (nodes.get(i).getExecLocation() == ExecLocation.RecordReader)
 				return true;
@@ -2477,8 +2060,8 @@ public class Dag<N extends Lop>
 		return false;
 	}
 
-	ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes) {
-
+	private ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes) 
+	{
 		// obtain the list of record reader nodes
 		ArrayList<N> rrnodes = new ArrayList<N>();
 		for (int i = 0; i < gmrnodes.size(); i++) {
@@ -2542,8 +2125,7 @@ public class Dag<N extends Lop>
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-
-	public void generateMRJobs(ArrayList<N> execNodes,
+	private void generateMRJobs(ArrayList<N> execNodes,
 			ArrayList<Instruction> inst,
 			ArrayList<Instruction> writeinst,
 			ArrayList<Instruction> deleteinst, ArrayList<ArrayList<N>> jobNodes)
@@ -2551,17 +2133,6 @@ public class Dag<N extends Lop>
 			DMLRuntimeException
 
 	{
-
-		/*// copy unassigned lops in execnodes to gmrnodes
-		for (int i = 0; i < execNodes.size(); i++) {
-			N node = execNodes.elementAt(i);
-			if (jobType(node, jobNodes) == -1) {
-				jobNodes.get(JobType.GMR.getId()).add(node);
-				addChildren(node, jobNodes.get(JobType.GMR.getId()),
-						execNodes);
-			}
-		}*/
-
 		printJobNodes(jobNodes);
 		
 		ArrayList<Instruction> rmvarinst = new ArrayList<Instruction>();
@@ -2632,61 +2203,22 @@ public class Dag<N extends Lop>
 	}
 
 	/**
-	 * Method to get the input format for a lop
-	 * 
-	 * @param elementAt
-	 * @return
-	 * @throws LopsException
-	 */
-
-	// This code is replicated in ReBlock.java
-	@SuppressWarnings("unused")
-	private Format getChildFormat(N node) 
-		throws LopsException 
-	{
-		if (node.getOutputParameters().getFile_name() != null
-				|| node.getOutputParameters().getLabel() != null) {
-			return node.getOutputParameters().getFormat();
-		} else {
-			if (node.getInputs().size() > 1)
-				throw new LopsException(node.printErrorLocation() + "Should only have one child! \n");
-			/*
-			 * Return the format of the child node (i.e., input lop) No need of
-			 * recursion here.. because 1) Reblock lop's input can either be
-			 * DataLop or some intermediate computation If it is Data then we
-			 * just take its format (TEXT or BINARY) If it is intermediate lop
-			 * then it is always BINARY since we assume that all intermediate
-			 * computations will be in Binary format 2) Note that Reblock job
-			 * will never have any instructions in the mapper => the input lop
-			 * (if it is other than Data) is always executed in a different job
-			 */
-			return node.getInputs().get(0).getOutputParameters().getFormat();
-			// return getChildFormat((N) node.getInputs().get(0));
-		}
-
-	}
-
-	/**
 	 * Method to add all parents of "node" in exec_n to node_v.
 	 * 
 	 * @param node
 	 * @param node_v
 	 * @param exec_n
 	 */
-
 	private void addParents(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
-		for (int i = 0; i < exec_n.size(); i++) {
-			if (isChild(node, exec_n.get(i), IDMap)) {
-				if (!node_v.contains(exec_n.get(i))) {
+		for (N enode : exec_n ) {
+			if (isChild(node, enode, IDMap)) {
+				if (!node_v.contains(enode)) {
 					if( LOG.isTraceEnabled() )
-						LOG.trace("Adding parent - "
-								+ exec_n.get(i).toString());
-					node_v.add(exec_n.get(i));
+						LOG.trace("Adding parent - " + enode.toString());
+					node_v.add(enode);
 				}
 			}
-
 		}
-
 	}
 
 	/**
@@ -2696,11 +2228,10 @@ public class Dag<N extends Lop>
 	 * @param node_v
 	 * @param exec_n
 	 */
-
 	@SuppressWarnings("unchecked")
 	private void addChildren(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
 
-		/** add child in exec nodes that is not of type scalar **/
+		// add child in exec nodes that is not of type scalar
 		if (exec_n.contains(node)
 				&& node.getExecLocation() != ExecLocation.ControlProgram) {
 			if (!node_v.contains(node)) {
@@ -2714,9 +2245,7 @@ public class Dag<N extends Lop>
 		if (!exec_n.contains(node))
 			return;
 
-		/**
-		 * recurse
-		 */
+		// recurse
 		for (int i = 0; i < node.getInputs().size(); i++) {
 			N n = (N) node.getInputs().get(i);
 			addChildren(n, node_v, exec_n);
@@ -2730,7 +2259,7 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException 
 	 */
-	OutputInfo getOutputInfo(N node, boolean cellModeOverride) 
+	private OutputInfo getOutputInfo(N node, boolean cellModeOverride) 
 		throws LopsException 
 	{
 		if ( (node.getDataType() == DataType.SCALAR && node.getExecType() == ExecType.CP) 
@@ -2797,22 +2326,8 @@ public class Dag<N extends Lop>
 
 		return oinfo;
 	}
-
-	/** 
-	 * Method that determines the output format for a given node, 
-	 * and returns a string representation of OutputInfo. This 
-	 * method is primarily used while generating instructions that
-	 * execute in the control program.
-	 * 
-	 * @param node
-	 * @return
-	 */
-/*  	String getOutputFormat(N node) {
-  		return OutputInfo.outputInfoToString(getOutputInfo(node, false));
-  	}
-*/  	
 	
-	public String prepareAssignVarInstruction(Lop input, Lop node) {
+	private String prepareAssignVarInstruction(Lop input, Lop node) {
 		StringBuilder sb = new StringBuilder();
 		
 		sb.append(ExecType.CP);
@@ -3295,7 +2810,7 @@ public class Dag<N extends Lop>
 	 * @throws DMLRuntimeException
 	 */
 	@SuppressWarnings("unchecked")
-	public void generateMapReduceInstructions(ArrayList<N> execNodes,
+	private void generateMapReduceInstructions(ArrayList<N> execNodes,
 			ArrayList<Instruction> inst, ArrayList<Instruction> writeinst, ArrayList<Instruction> deleteinst, ArrayList<Instruction> rmvarinst, 
 			JobType jt) throws LopsException,
 			DMLUnsupportedOperationException, DMLRuntimeException
@@ -3339,19 +2854,18 @@ public class Dag<N extends Lop>
 		if (jt == JobType.GMR || jt == JobType.GMRCELL) {
 			ArrayList<N> markedNodes = new ArrayList<N>();
 			// only keep data nodes that are results of some computation.
-			for (int i = 0; i < rootNodes.size(); i++) {
-				N node = rootNodes.get(i);
-				if (node.getExecLocation() == ExecLocation.Data
-						&& ((Data) node).isTransient()
-						&& ((Data) node).getOperationType() == OperationTypes.WRITE
-						&& ((Data) node).getDataType() == DataType.MATRIX) {
+			for ( N rnode : rootNodes ) {
+				if (rnode.getExecLocation() == ExecLocation.Data
+						&& ((Data) rnode).isTransient()
+						&& ((Data) rnode).getOperationType() == OperationTypes.WRITE
+						&& ((Data) rnode).getDataType() == DataType.MATRIX) {
 					// no computation, just a copy
-					if (node.getInputs().get(0).getExecLocation() == ExecLocation.Data
-							&& ((Data) node.getInputs().get(0)).isTransient()
-							&& node.getOutputParameters().getLabel().compareTo(
-									node.getInputs().get(0)
-											.getOutputParameters().getLabel()) == 0) {
-						markedNodes.add(node);
+					if (rnode.getInputs().get(0).getExecLocation() == ExecLocation.Data
+							&& ((Data) rnode.getInputs().get(0)).isTransient()
+							&& rnode.getOutputParameters().getLabel().compareTo(
+								rnode.getInputs().get(0).getOutputParameters().getLabel()) == 0) 
+					{
+						markedNodes.add(rnode);
 					}
 				}
 			}
@@ -3367,10 +2881,9 @@ public class Dag<N extends Lop>
 		
 		/* Determine all input data files */
 		
-		for (int i = 0; i < rootNodes.size(); i++) {
-			getInputPathsAndParameters(rootNodes.get(i), execNodes,
-					inputs, inputInfos, numRows, numCols, numRowsPerBlock,
-					numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
+		for ( N rnode : rootNodes ) {
+			getInputPathsAndParameters(rnode, execNodes, inputs, inputInfos, numRows, numCols, 
+				numRowsPerBlock, numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
 		}
 		
 		// In case of RAND job, instructions are defined in the input file
@@ -3384,10 +2897,9 @@ public class Dag<N extends Lop>
 		
 		// currently, recordreader instructions are allowed only in GMR jobs
 		if (jt == JobType.GMR || jt == JobType.GMRCELL) {
-			for (int i = 0; i < rootNodes.size(); i++) {
-				getRecordReaderInstructions(rootNodes.get(i), execNodes,
-						inputs, recordReaderInstructions, nodeIndexMapping,
-						start_index, inputLabels, inputLops, MRJobLineNumbers);
+			for ( N rnode : rootNodes ) {
+				getRecordReaderInstructions(rnode, execNodes, inputs, recordReaderInstructions, 
+					nodeIndexMapping, start_index, inputLabels, inputLops, MRJobLineNumbers);
 				if ( recordReaderInstructions.size() > 1 )
 					throw new LopsException("MapReduce job can only have a single recordreader instruction: " + recordReaderInstructions.toString());
 			}
@@ -3544,23 +3056,6 @@ public class Dag<N extends Lop>
 				processConsumers((N)l, rmvarinst, deleteinst, null);
 			}
 		}
-
-	}
-
-	/**
-	 * Convert a byte array into string
-	 * 
-	 * @param arr
-	 * @return none
-	 */
-	public String getString(byte[] arr) {
-		StringBuilder sb = new StringBuilder();
-		for (int i = 0; i < arr.length; i++) {
-			sb.append(",");
-			sb.append(Byte.toString(arr[i]));
-		}
-
-		return sb.toString();
 	}
 
 	/**
@@ -3571,16 +3066,14 @@ public class Dag<N extends Lop>
 	 */
 	private String getCSVString(ArrayList<String> inputStrings) {
 		StringBuilder sb = new StringBuilder();
-		for (int i = 0; i < inputStrings.size(); i++) {
-			String tmp = inputStrings.get(i);
-			if( tmp != null ) {
+		for ( String str : inputStrings ) {
+			if( str != null ) {
 				if( sb.length()>0 )
 					sb.append(Lop.INSTRUCTION_DELIMITOR);
-				sb.append( tmp ); 
+				sb.append( str ); 
 			}
 		}
 		return sb.toString();
-
 	}
 
 	/**
@@ -3589,7 +3082,6 @@ public class Dag<N extends Lop>
 	 * @param list
 	 * @return
 	 */
-
 	private String[] getStringArray(ArrayList<String> list) {
 		String[] arr = new String[list.size()];
 
@@ -3615,7 +3107,6 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-
 	@SuppressWarnings("unchecked")
 	private int getAggAndOtherInstructions(N node, ArrayList<N> execNodes,
 			ArrayList<String> shuffleInstructions,
@@ -3624,9 +3115,7 @@ public class Dag<N extends Lop>
 			HashMap<N, Integer> nodeIndexMapping, int[] start_index,
 			ArrayList<String> inputLabels, ArrayList<Lop> inputLops, 
 			ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
 	{
-
 		int ret_val = -1;
 
 		if (nodeIndexMapping.containsKey(node))
@@ -3813,9 +3302,6 @@ public class Dag<N extends Lop>
 				start_index[0]++;
 
 				if (node.getType() == Type.Ternary ) {
-					//Tertiary.OperationTypes op = ((Tertiary<?, ?, ?, ?>) node).getOperationType();
-					//if ( op == Tertiary.OperationTypes.CTABLE_TRANSFORM ) {
-					
 					// in case of CTABLE_TRANSFORM_SCALAR_WEIGHT: inputIndices.get(2) would be -1
 					otherInstructionsReducer.add(node.getInstructions(
 							inputIndices.get(0), inputIndices.get(1),
@@ -3824,7 +3310,6 @@ public class Dag<N extends Lop>
 						MRJobLineNumbers.add(node._beginLine);
 					}
 					nodeIndexMapping.put(node, output_index);
-					//}
 				}
 				else if( node.getType() == Type.ParameterizedBuiltin ){
 					otherInstructionsReducer.add(node.getInstructions(
@@ -3848,7 +3333,6 @@ public class Dag<N extends Lop>
 				}
 
 				return output_index;
-
 			}
 			else if (inputIndices.size() == 4) {
 				int output_index = start_index[0];
@@ -3868,7 +3352,6 @@ public class Dag<N extends Lop>
 		}
 
 		return -1;
-
 	}
 
 	/**
@@ -3885,7 +3368,6 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-
 	@SuppressWarnings("unchecked")
 	private int getRecordReaderInstructions(N node, ArrayList<N> execNodes,
 			ArrayList<String> inputStrings,
@@ -3893,9 +3375,7 @@ public class Dag<N extends Lop>
 			HashMap<N, Integer> nodeIndexMapping, int[] start_index,
 			ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
 			ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
 	{
-
 		// if input source, return index
 		if (nodeIndexMapping.containsKey(node))
 			return nodeIndexMapping.get(node);
@@ -3910,7 +3390,6 @@ public class Dag<N extends Lop>
 
 		// get mapper instructions
 		for (int i = 0; i < node.getInputs().size(); i++) {
-
 			// recurse
 			N childNode = (N) node.getInputs().get(i);
 			int ret_val = getRecordReaderInstructions(childNode, execNodes,
@@ -3932,17 +3411,14 @@ public class Dag<N extends Lop>
 
 			// cannot reuse index if this is true
 			// need to add better indexing schemes
-			// if (child_for_max_input_index.getOutputs().size() > 1) {
 			output_index = start_index[0];
 			start_index[0]++;
-			// }
 
 			nodeIndexMapping.put(node, output_index);
 
 			// populate list of input labels.
 			// only Ranagepick lop can contribute to labels
 			if (node.getType() == Type.PickValues) {
-
 				PickByCount pbc = (PickByCount) node;
 				if (pbc.getOperationType() == PickByCount.OperationTypes.RANGEPICK) {
 					int scalarIndex = 1; // always the second input is a scalar
@@ -3975,11 +3451,9 @@ public class Dag<N extends Lop>
 						"Unexpected number of inputs while generating a RecordReader Instruction");
 
 			return output_index;
-
 		}
 
 		return -1;
-
 	}
 
 	/**
@@ -3996,7 +3470,6 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-
 	@SuppressWarnings("unchecked")
 	private int getMapperInstructions(N node, ArrayList<N> execNodes,
 			ArrayList<String> inputStrings,
@@ -4004,9 +3477,7 @@ public class Dag<N extends Lop>
 			HashMap<N, Integer> nodeIndexMapping, int[] start_index,
 			ArrayList<String> inputLabels, ArrayList<Lop> inputLops, 
 			ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
 	{
-
 		// if input source, return index
 		if (nodeIndexMapping.containsKey(node))
 			return nodeIndexMapping.get(node);
@@ -4134,11 +3605,9 @@ public class Dag<N extends Lop>
 				MRJobLineNumbers.add(node._beginLine);
 			}
 			return output_index;
-
 		}
 
 		return -1;
-
 	}
 
 	// Method to populate inputs and also populates node index mapping.
@@ -4155,12 +3624,9 @@ public class Dag<N extends Lop>
 				&& !nodeIndexMapping.containsKey(node)) {
 			numRows.add(node.getOutputParameters().getNumRows());
 			numCols.add(node.getOutputParameters().getNumCols());
-			numRowsPerBlock.add(node.getOutputParameters()
-					.getRowsInBlock());
-			numColsPerBlock.add(node.getOutputParameters()
-					.getColsInBlock());
-			inputStrings.add(node.getInstructions(inputStrings.size(),
-					inputStrings.size()));
+			numRowsPerBlock.add(node.getOutputParameters().getRowsInBlock());
+			numColsPerBlock.add(node.getOutputParameters().getColsInBlock());
+			inputStrings.add(node.getInstructions(inputStrings.size(), inputStrings.size()));
 			if(DMLScript.ENABLE_DEBUG_MODE) {
 				MRJobLineNumbers.add(node._beginLine);
 			}
@@ -4170,10 +3636,6 @@ public class Dag<N extends Lop>
 			return;
 		}
 
-		// && ( !(node.getExecLocation() == ExecLocation.ControlProgram)
-		// || (node.getExecLocation() == ExecLocation.ControlProgram &&
-		// node.getDataType() != DataType.SCALAR )
-		// )
 		// get input file names
 		if (!execNodes.contains(node)
 				&& !nodeIndexMapping.containsKey(node)
@@ -4192,19 +3654,14 @@ public class Dag<N extends Lop>
 				inputStrings.add(Lop.VARIABLE_NAME_PLACEHOLDER + node.getOutputParameters().getLabel()
 						               + Lop.VARIABLE_NAME_PLACEHOLDER);
 			}
-			//if ( node.getType() == Lops.Type.Data && ((Data)node).isTransient())
-			//	inputStrings.add("##" + node.getOutputParameters().getLabel() + "##");
-			//else
-			//	inputStrings.add(node.getOutputParameters().getLabel());
+			
 			inputLabels.add(node.getOutputParameters().getLabel());
 			inputLops.add(node);
 
 			numRows.add(node.getOutputParameters().getNumRows());
 			numCols.add(node.getOutputParameters().getNumCols());
-			numRowsPerBlock.add(node.getOutputParameters()
-					.getRowsInBlock());
-			numColsPerBlock.add(node.getOutputParameters()
-					.getColsInBlock());
+			numRowsPerBlock.add(node.getOutputParameters().getRowsInBlock());
+			numColsPerBlock.add(node.getOutputParameters().getColsInBlock());
 
 			InputInfo nodeInputInfo = null;
 			// Check if file format type is binary or text and update infos
@@ -4262,15 +3719,12 @@ public class Dag<N extends Lop>
 		}
 
 		// if exec nodes does not contain node at this point, return.
-
 		if (!execNodes.contains(node))
 			return;
 
 		// process children recursively
-
-		for (int i = 0; i < node.getInputs().size(); i++) {
-			N childNode = (N) node.getInputs().get(i);
-			getInputPathsAndParameters(childNode, execNodes, inputStrings,
+		for ( Lop lop : node.getInputs() ) {
+			getInputPathsAndParameters((N)lop, execNodes, inputStrings,
 					inputInfos, numRows, numCols, numRowsPerBlock,
 					numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
 		}
@@ -4283,39 +3737,32 @@ public class Dag<N extends Lop>
 	 * @param execNodes
 	 * @param rootNodes
 	 */
-
 	private void getOutputNodes(ArrayList<N> execNodes, ArrayList<N> rootNodes, JobType jt) {
-		for (int i = 0; i < execNodes.size(); i++) {
-			N node = execNodes.get(i);
-
+		for ( N node : execNodes ) {
 			// terminal node
 			if (node.getOutputs().isEmpty() && !rootNodes.contains(node)) {
 				rootNodes.add(node);
-			} else {
+			} 
+			else {
 				// check for nodes with at least one child outside execnodes
 				int cnt = 0;
-				for (int j = 0; j < node.getOutputs().size(); j++) {
-					if (!execNodes.contains(node.getOutputs().get(j))) {
-						cnt++;
-					}
+				for (Lop lop : node.getOutputs() ) {
+					cnt += (!execNodes.contains(lop)) ? 1 : 0; 
 				}
 
-				if (cnt > 0 //!= 0
-						//&& cnt <= node.getOutputs().size()
-						&& !rootNodes.contains(node) // not already a rootnode
-						&& !(node.getExecLocation() == ExecLocation.Data
-								&& ((Data) node).getOperationType() == OperationTypes.READ && ((Data) node)
-								.getDataType() == DataType.MATRIX)  // Not a matrix Data READ 
-					) {
-					
-
+				if (cnt > 0 && !rootNodes.contains(node) // not already a rootnode
+					&& !(node.getExecLocation() == ExecLocation.Data
+					&& ((Data) node).getOperationType() == OperationTypes.READ 
+					&& ((Data) node).getDataType() == DataType.MATRIX) ) // Not a matrix Data READ 
+				{
 					if ( jt.allowsSingleShuffleInstruction() && node.getExecLocation() != ExecLocation.MapAndReduce)
 						continue;
 					
 					if (cnt < node.getOutputs().size()) {
 						if(!node.getProducesIntermediateOutput())	
 							rootNodes.add(node);
-					} else
+					} 
+					else
 						rootNodes.add(node);
 				}
 			}
@@ -4328,9 +3775,7 @@ public class Dag<N extends Lop>
 	 * @param a
 	 * @param b
 	 */
-
-	public static boolean isChild(Lop a, Lop b, HashMap<Long, Integer> IDMap) {
-		//int aID = IDMap.get(a.getID());
+	private static boolean isChild(Lop a, Lop b, HashMap<Long, Integer> IDMap) {
 		int bID = IDMap.get(b.getID());
 		return a.get_reachable()[bID];
 	}
@@ -4381,17 +3826,15 @@ public class Dag<N extends Lop>
 
 		// print the nodes in sorted order
 		if (LOG.isTraceEnabled()) {
-			for (int i = 0; i < v.size(); i++) {
-				// System.out.print(sortedNodes.get(i).getID() + "("
-				// + levelmap.get(sortedNodes.get(i).getID()) + "), ");
+			for ( N vnode : v ) {
 				StringBuilder sb = new StringBuilder();
-				sb.append(v.get(i).getID());
+				sb.append(vnode.getID());
 				sb.append("(");
-				sb.append(v.get(i).getLevel());
+				sb.append(vnode.getLevel());
 				sb.append(") ");
-				sb.append(v.get(i).getType());
+				sb.append(vnode.getType());
 				sb.append("(");
-				for(Lop vin : v.get(i).getInputs()) {
+				for(Lop vin : vnode.getInputs()) {
 					sb.append(vin.getID());
 					sb.append(",");
 				}
@@ -4413,7 +3856,7 @@ public class Dag<N extends Lop>
 	 * @param marked
 	 */
 	@SuppressWarnings("unchecked")
-	void dagDFS(N root, boolean[] marked) {
+	private void dagDFS(N root, boolean[] marked) {
 		//contains check currently required for globalopt, will be removed when cleaned up
 		if( !IDMap.containsKey(root.getID()) )
 			return;
@@ -4422,33 +3865,32 @@ public class Dag<N extends Lop>
 		if ( marked[mapID] )
 			return;
 		marked[mapID] = true;
-		for(int i=0; i < root.getOutputs().size(); i++) {
-			//System.out.println("CALLING DFS "+root);
-			dagDFS((N)root.getOutputs().get(i), marked);
+		for( Lop lop : root.getOutputs() ) {
+			dagDFS((N)lop, marked);
 		}
 	}
 	
 	private boolean hasDirectChildNode(N node, ArrayList<N> childNodes) {
 		if ( childNodes.isEmpty() ) 
 			return false;
-		
-		for(int i=0; i < childNodes.size(); i++) {
-			if ( childNodes.get(i).getOutputs().contains(node))
+		for( N cnode : childNodes ) {
+			if ( cnode.getOutputs().contains(node))
 				return true;
 		}
 		return false;
 	}
 	
+	private boolean hasChildNode(N node, ArrayList<N> nodes) {
+		return hasChildNode(node, nodes, ExecLocation.INVALID);
+	}
+
 	private boolean hasChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
 		if ( childNodes.isEmpty() ) 
 			return false;
-		
 		int index = IDMap.get(node.getID());
-		for(int i=0; i < childNodes.size(); i++) {
-			N cn = childNodes.get(i);
-			if ( (type == ExecLocation.INVALID || cn.getExecLocation() == type) && cn.get_reachable()[index]) {
+		for( N cnode : childNodes ) {
+			if ( (type == ExecLocation.INVALID || cnode.getExecLocation() == type) && cnode.get_reachable()[index])
 				return true;
-			}
 		}
 		return false;
 	}
@@ -4456,13 +3898,10 @@ public class Dag<N extends Lop>
 	private N getChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
 		if ( childNodes.isEmpty() )
 			return null;
-		
 		int index = IDMap.get(node.getID());
-		for(int i=0; i < childNodes.size(); i++) {
-			N cn = childNodes.get(i);
-			if ( cn.getExecLocation() == type && cn.get_reachable()[index]) {
-				return cn;
-			}
+		for( N cnode : childNodes ) {
+			if ( cnode.getExecLocation() == type && cnode.get_reachable()[index])
+				return cnode;
 		}
 		return null;
 	}
@@ -4479,8 +3918,7 @@ public class Dag<N extends Lop>
 	private N getParentNode(N node, ArrayList<N> parentNodes, ExecLocation type) {
 		if ( parentNodes.isEmpty() )
 			return null;
-		for(int i=0; i < parentNodes.size(); i++ ) {
-			N pn = parentNodes.get(i);
+		for( N pn : parentNodes ) {
 			int index = IDMap.get( pn.getID() );
 			if ( pn.getExecLocation() == type && node.get_reachable()[index])
 				return pn;
@@ -4495,9 +3933,7 @@ public class Dag<N extends Lop>
 			return false;
 		
 		int index = IDMap.get(node.getID());
-		
-		for (int i = 0; i < nodesVec.size(); i++) {
-			N n = nodesVec.get(i);
+		for( N n : nodesVec ) {
 			if ( n.definesMRJob() && n.get_reachable()[index]) 
 				return true;
 		}
@@ -4510,8 +3946,7 @@ public class Dag<N extends Lop>
 		
 		int index = IDMap.get(node.getID());
 		boolean onlyDatagen = true;
-		for (int i = 0; i < nodesVec.size(); i++) {
-			N n = nodesVec.get(i);
+		for( N n : nodesVec ) {
 			if ( n.definesMRJob() && n.get_reachable()[index] &&  JobType.findJobTypeFromLop(n) != JobType.DATAGEN )
 				onlyDatagen = false;
 		}
@@ -4520,11 +3955,10 @@ public class Dag<N extends Lop>
 	}
 
 	@SuppressWarnings("unchecked")
-	private int getChildAlignment(N node, ArrayList<N> execNodes, ExecLocation type) {
-
-		for (int i = 0; i < node.getInputs().size(); i++) {
-			N n = (N) node.getInputs().get(i);
-
+	private int getChildAlignment(N node, ArrayList<N> execNodes, ExecLocation type) 
+	{
+		for (Lop lop : node.getInputs() ) {
+			N n = (N) lop;
 			if (!execNodes.contains(n))
 				continue;
 
@@ -4533,44 +3967,36 @@ public class Dag<N extends Lop>
 					return MR_CHILD_FOUND_BREAKS_ALIGNMENT;
 				else
 					return MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT;
-			} else {
+			} 
+			else {
 				int ret = getChildAlignment(n, execNodes, type);
 				if (ret == MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT
-						|| ret == CHILD_DOES_NOT_BREAK_ALIGNMENT) {
+					|| ret == CHILD_DOES_NOT_BREAK_ALIGNMENT) {
 					if (n.getBreaksAlignment())
 						return CHILD_BREAKS_ALIGNMENT;
 					else
 						return CHILD_DOES_NOT_BREAK_ALIGNMENT;
 				}
-
 				else if (ret == MRCHILD_NOT_FOUND
 						|| ret == CHILD_BREAKS_ALIGNMENT
 						|| ret == MR_CHILD_FOUND_BREAKS_ALIGNMENT)
 					return ret;
 				else
-					throw new RuntimeException(
-							"Something wrong in getChildAlignment().");
+					throw new RuntimeException("Something wrong in getChildAlignment().");
 			}
 		}
 
 		return MRCHILD_NOT_FOUND;
-
-	}
-
-	private boolean hasChildNode(N node, ArrayList<N> nodes) {
-		return hasChildNode(node, nodes, ExecLocation.INVALID);
 	}
 
 	private boolean hasParentNode(N node, ArrayList<N> parentNodes) {
 		if ( parentNodes.isEmpty() )
-			return false;
-		
-		for( int i=0; i < parentNodes.size(); i++ ) {
-			int index = IDMap.get( parentNodes.get(i).getID() );
+			return false;		
+		for( N pnode : parentNodes ) {
+			int index = IDMap.get( pnode.getID() );
 			if ( node.get_reachable()[index])
 				return true;
 		}
 		return false;
 	}
-	
 }