You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/03/25 21:40:08 UTC

[4/5] incubator-systemml git commit: Cleanup runtime instruction generation (simplifications, logging)

Cleanup runtime instruction generation (simplifications, logging) 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b2aec7a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b2aec7a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b2aec7a7

Branch: refs/heads/master
Commit: b2aec7a7b990ebace6ba1bddd39fa963e818a73c
Parents: b095362
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Mar 25 00:34:39 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Mar 25 13:39:20 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/lops/compile/Dag.java | 436 ++++++++-----------
 1 file changed, 191 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2aec7a7/src/main/java/org/apache/sysml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java
index eb52053..29c1250 100644
--- a/src/main/java/org/apache/sysml/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java
@@ -85,10 +85,13 @@ import org.apache.sysml.runtime.matrix.sort.PickFromCompactInputFormat;
 
 /**
  * 
- *  Class to maintain a DAG and compile it into jobs
- * @param <N>
+ * Class to maintain a DAG of lops and compile it into 
+ * runtime instructions, incl piggybacking into jobs.
+ * 
+ * @param <N> the class parameter has no affect and is
+ * only kept for documentation purposes.
  */
-public class Dag<N extends Lop> 
+public class Dag<N extends Lop>
 {
 	private static final Log LOG = LogFactory.getLog(Dag.class.getName());
 
@@ -113,7 +116,7 @@ public class Dag<N extends Lop>
 	}
 	
 	// hash set for all nodes in dag
-	private ArrayList<N> nodes = null;
+	private ArrayList<Lop> nodes = null;
 	
 	/* 
 	 * Hashmap to translates the nodes in the DAG to a sequence of numbers
@@ -197,7 +200,7 @@ public class Dag<N extends Lop>
 	public Dag() 
 	{
 		//allocate internal data structures
-		nodes = new ArrayList<N>();
+		nodes = new ArrayList<Lop>();
 		IDMap = new HashMap<Long, Integer>();
 
 		// get number of reducers from dml config
@@ -211,7 +214,7 @@ public class Dag<N extends Lop>
 	 * @return true if node was not already present, false if not.
 	 */
 
-	public boolean addNode(N node) {
+	public boolean addNode(Lop node) {
 		if (nodes.contains(node))
 			return false;		
 		nodes.add(node);
@@ -253,7 +256,7 @@ public class Dag<N extends Lop>
 		}
 		
 		// hold all nodes in a vector (needed for ordering)
-		ArrayList<N> node_v = new ArrayList<N>();
+		ArrayList<Lop> node_v = new ArrayList<Lop>();
 		node_v.addAll(nodes);
 		
 		/*
@@ -272,25 +275,26 @@ public class Dag<N extends Lop>
 
 	}
 
-	private void deleteUpdatedTransientReadVariables(StatementBlock sb, ArrayList<N> nodeV,
+	private static void deleteUpdatedTransientReadVariables(StatementBlock sb, ArrayList<Lop> nodeV,
 			ArrayList<Instruction> inst) throws DMLRuntimeException,
 			DMLUnsupportedOperationException {
 
 		if ( sb == null ) 
 			return;
 		
-		LOG.trace("In delete updated variables");
+		if( LOG.isTraceEnabled() )
+			LOG.trace("In delete updated variables");
 
 		// CANDIDATE list of variables which could have been updated in this statement block 
-		HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
+		HashMap<String, Lop> labelNodeMapping = new HashMap<String, Lop>();
 		
 		// ACTUAL list of variables whose value is updated, AND the old value of the variable 
 		// is no longer accessible/used.
 		HashSet<String> updatedLabels = new HashSet<String>();
-		HashMap<String, N> updatedLabelsLineNum =  new HashMap<String, N>();
+		HashMap<String, Lop> updatedLabelsLineNum =  new HashMap<String, Lop>();
 		
 		// first capture all transient read variables
-		for ( N node : nodeV ) {
+		for ( Lop node : nodeV ) {
 
 			if (node.getExecLocation() == ExecLocation.Data
 					&& ((Data) node).isTransient()
@@ -317,7 +321,7 @@ public class Dag<N extends Lop>
 		}
 
 		// capture updated transient write variables
-		for ( N node : nodeV ) {
+		for ( Lop node : nodeV ) {
 
 			if (node.getExecLocation() == ExecLocation.Data
 					&& ((Data) node).isTransient()
@@ -346,14 +350,15 @@ public class Dag<N extends Lop>
 
 	}
 	  
-	private void generateRemoveInstructions(StatementBlock sb,
+	private static void generateRemoveInstructions(StatementBlock sb,
 			ArrayList<Instruction> deleteInst)
 			throws DMLUnsupportedOperationException, DMLRuntimeException {
 		
 		if ( sb == null ) 
 			return;
-		
-		LOG.trace("In generateRemoveInstructions()");
+
+		if( LOG.isTraceEnabled() )
+			LOG.trace("In generateRemoveInstructions()");
 		
 
 		Instruction inst = null;
@@ -396,30 +401,28 @@ public class Dag<N extends Lop>
 		}*/
 	}
 
-	private ArrayList<ArrayList<N>> createNodeVectors(int size) {
-		ArrayList<ArrayList<N>> arr = new ArrayList<ArrayList<N>>();
+	private static ArrayList<ArrayList<Lop>> createNodeVectors(int size) {
+		ArrayList<ArrayList<Lop>> arr = new ArrayList<ArrayList<Lop>>();
 
 		// for each job type, we need to create a vector.
 		// additionally, create another vector for execNodes
 		for (int i = 0; i < size; i++) {
-			arr.add(new ArrayList<N>());
+			arr.add(new ArrayList<Lop>());
 		}
 		return arr;
 	}
 
-	private void clearNodeVectors(ArrayList<ArrayList<N>> arr) {
-		for (ArrayList<N> tmp : arr) {
+	private static void clearNodeVectors(ArrayList<ArrayList<Lop>> arr) {
+		for (ArrayList<Lop> tmp : arr) {
 			tmp.clear();
 		}
 	}
 
-	private boolean isCompatible(ArrayList<N> nodes, JobType jt, int from,
-			int to) throws LopsException {
-		
+	private static boolean isCompatible(ArrayList<Lop> nodes, JobType jt, int from, int to) 
+		throws LopsException 
+	{
 		int base = jt.getBase();
-
-		for (int i = from; i < to; i++) {
-			N node = nodes.get(i);
+		for ( Lop node : nodes ) {
 			if ((node.getCompatibleJobs() & base) == 0) {
 				if( LOG.isTraceEnabled() )
 					LOG.trace("Not compatible "+ node.toString());
@@ -437,7 +440,7 @@ public class Dag<N extends Lop>
 	 * @param node2
 	 * @return
 	 */
-	private boolean isCompatible(N node1, N node2) {
+	private static boolean isCompatible(Lop node1, Lop node2) {
 		return( (node1.getCompatibleJobs() & node2.getCompatibleJobs()) > 0);
 	}
 	
@@ -448,7 +451,7 @@ public class Dag<N extends Lop>
 	 * @param jt
 	 * @return
 	 */
-	private boolean isCompatible(N node, JobType jt) {
+	private static boolean isCompatible(Lop node, JobType jt) {
 		if ( jt == JobType.GMRCELL )
 			jt = JobType.GMR;
 		return ((node.getCompatibleJobs() & jt.getBase()) > 0);
@@ -457,8 +460,8 @@ public class Dag<N extends Lop>
 	/*
 	 * Add node, and its relevant children to job-specific node vectors.
 	 */
-	private void addNodeByJobType(N node, ArrayList<ArrayList<N>> arr,
-			ArrayList<N> execNodes, boolean eliminate) throws LopsException {
+	private void addNodeByJobType(Lop node, ArrayList<ArrayList<Lop>> arr,
+			ArrayList<Lop> execNodes, boolean eliminate) throws LopsException {
 		
 		if (!eliminate) {
 			// Check if this lop defines a MR job.
@@ -552,7 +555,7 @@ public class Dag<N extends Lop>
 	 * Remove the node from all job-specific node vectors. This method is
 	 * invoked from removeNodesForNextIteration().
 	 */
-	private void removeNodeByJobType(N node, ArrayList<ArrayList<N>> arr) {
+	private static void removeNodeByJobType(Lop node, ArrayList<ArrayList<Lop>> arr) {
 		for ( JobType jt : JobType.values())
 			if ( jt.getId() > 0 ) 
 				arr.get(jt.getId()).remove(node);
@@ -566,14 +569,14 @@ public class Dag<N extends Lop>
 	 * @param jobNodes
 	 * @throws LopsException
 	 */
-	private void handleSingleOutputJobs(ArrayList<N> execNodes,
-			ArrayList<ArrayList<N>> jobNodes, ArrayList<N> finishedNodes)
+	private void handleSingleOutputJobs(ArrayList<Lop> execNodes,
+			ArrayList<ArrayList<Lop>> jobNodes, ArrayList<Lop> finishedNodes)
 			throws LopsException {
 		/*
 		 * If the input of a MMCJ/MMRJ job (must have executed in a Mapper) is used
 		 * by multiple lops then we should mark it as not-finished.
 		 */
-		ArrayList<N> nodesWithUnfinishedOutputs = new ArrayList<N>();
+		ArrayList<Lop> nodesWithUnfinishedOutputs = new ArrayList<Lop>();
 		int[] jobIndices = {JobType.MMCJ.getId()};
 		Lop.Type[] lopTypes = { Lop.Type.MMCJ};
 		
@@ -583,14 +586,14 @@ public class Dag<N extends Lop>
 		for ( int jobi=0; jobi < jobIndices.length; jobi++ ) {
 			int jindex = jobIndices[jobi];
 			if (!jobNodes.get(jindex).isEmpty()) {
-				ArrayList<N> vec = jobNodes.get(jindex);
+				ArrayList<Lop> vec = jobNodes.get(jindex);
 
 				// first find all nodes with more than one parent that is not finished.
 				for (int i = 0; i < vec.size(); i++) {
-					N node = vec.get(i);
+					Lop node = vec.get(i);
 					if (node.getExecLocation() == ExecLocation.MapOrReduce
 							|| node.getExecLocation() == ExecLocation.Map) {
-						N MRparent = getParentNode(node, execNodes, ExecLocation.MapAndReduce);
+						Lop MRparent = getParentNode(node, execNodes, ExecLocation.MapAndReduce);
 						if ( MRparent != null && MRparent.getType() == lopTypes[jobi]) {
 							int numParents = node.getOutputs().size();
 							if (numParents > 1) {
@@ -606,7 +609,7 @@ public class Dag<N extends Lop>
 				}
 
 				// need to redo all nodes in nodesWithOutput as well as their children
-				for ( N node : vec ) {
+				for ( Lop node : vec ) {
 					if (node.getExecLocation() == ExecLocation.MapOrReduce
 							|| node.getExecLocation() == ExecLocation.Map) {
 						if (nodesWithUnfinishedOutputs.contains(node))
@@ -623,7 +626,7 @@ public class Dag<N extends Lop>
 	}
 
 	/** Method to check if a lop can be eliminated from checking **/
-	private boolean canEliminateLop(N node, ArrayList<N> execNodes) {
+	private static boolean canEliminateLop(Lop node, ArrayList<Lop> execNodes) {
 		// this function can only eliminate "aligner" lops such a group
 		if (!node.isAligner())
 			return false;
@@ -660,8 +663,8 @@ public class Dag<N extends Lop>
 	 * @param nodes
 	 * @throws LopsException 
 	 */
-	private void generateInstructionsForInputVariables(ArrayList<N> nodes_v, ArrayList<Instruction> inst) throws LopsException, IOException {
-		for(N n : nodes_v) {
+	private static void generateInstructionsForInputVariables(ArrayList<Lop> nodes_v, ArrayList<Instruction> inst) throws LopsException, IOException {
+		for(Lop n : nodes_v) {
 			if (n.getExecLocation() == ExecLocation.Data && !((Data) n).isTransient() 
 					&& ((Data) n).getOperationType() == OperationTypes.READ 
 					&& (n.getDataType() == DataType.MATRIX || n.getDataType() == DataType.FRAME) ) {
@@ -694,12 +697,11 @@ public class Dag<N extends Lop>
 	 * @param node
 	 * @return
 	 */
-	@SuppressWarnings("unchecked")
-	private boolean sendWriteLopToMR(N node) 
+	private static boolean sendWriteLopToMR(Lop node) 
 	{
 		if ( DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE )
 			return false;
-		N in = (N) node.getInputs().get(0);
+		Lop in = node.getInputs().get(0);
 		Format nodeFormat = node.getOutputParameters().getFormat();
 		
 		// Case of a transient read feeding into only one output persistent binaryblock write
@@ -726,7 +728,7 @@ public class Dag<N extends Lop>
 	 * It is used only for those nodes that use inputs from distributed cache. The returned 
 	 * value is utilized in limiting the number of instructions piggybacked onto a single GMR mapper.
 	 */
-	private double computeFootprintInMapper(N node) {
+	private static double computeFootprintInMapper(Lop node) {
 		// Memory limits must be checked only for nodes that use distributed cache
 		if ( ! node.usesDistributedCache() )
 			// default behavior
@@ -775,7 +777,7 @@ public class Dag<N extends Lop>
 	 * the mappers then <code>node</code> can be executed in current round, and <code>true</code> is returned. Otherwise, 
 	 * <code>node</code> must be queued and <code>false</code> is returned. 
 	 */
-	private boolean checkMemoryLimits(N node, double footprintInMapper) {
+	private static boolean checkMemoryLimits(Lop node, double footprintInMapper) {
 		boolean addNode = true;
 		
 		// Memory limits must be checked only for nodes that use distributed cache
@@ -798,22 +800,21 @@ public class Dag<N extends Lop>
 	 * @throws DMLUnsupportedOperationException
 	 * @throws DMLRuntimeException
 	 */
-	@SuppressWarnings("unchecked")
-	private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, ArrayList<N> node_v)
+	private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, ArrayList<Lop> node_v)
 			throws LopsException, IOException, DMLRuntimeException,
 			DMLUnsupportedOperationException
-
 	{
-		LOG.trace("Grouping DAG ============");
+		if( LOG.isTraceEnabled() )
+			LOG.trace("Grouping DAG ============");
 
 		// nodes to be executed in current iteration
-		ArrayList<N> execNodes = new ArrayList<N>();
+		ArrayList<Lop> execNodes = new ArrayList<Lop>();
 		// nodes that have already been processed
-		ArrayList<N> finishedNodes = new ArrayList<N>();
+		ArrayList<Lop> finishedNodes = new ArrayList<Lop>();
 		// nodes that are queued for the following iteration
-		ArrayList<N> queuedNodes = new ArrayList<N>();
+		ArrayList<Lop> queuedNodes = new ArrayList<Lop>();
 
-		ArrayList<ArrayList<N>> jobNodes = createNodeVectors(JobType.getNumJobTypes());
+		ArrayList<ArrayList<Lop>> jobNodes = createNodeVectors(JobType.getNumJobTypes());
 		
 		// list of instructions
 		ArrayList<Instruction> inst = new ArrayList<Instruction>();
@@ -835,18 +836,16 @@ public class Dag<N extends Lop>
 		String indent = "    ";
 
 		while (!done) {
-			LOG.trace("Grouping nodes in DAG");
+			if( LOG.isTraceEnabled() )
+				LOG.trace("Grouping nodes in DAG");
 
 			execNodes.clear();
 			queuedNodes.clear();
 			clearNodeVectors(jobNodes);
 			gmrMapperFootprint=0;
 
-			for (int i = 0; i < node_v.size(); i++) {
-				N node = node_v.get(i);
-
+			for ( Lop  node : node_v ) {
 				// finished nodes don't need to be processed
-
 				if (finishedNodes.contains(node))
 					continue;
 
@@ -951,13 +950,12 @@ public class Dag<N extends Lop>
 				if (node.getInputs().size() > 1
 						&& hasChildNode(node, execNodes, ExecLocation.RecordReader)) {
 					// get the actual RecordReader lop
-					N rr_node = getChildNode(node, execNodes, ExecLocation.RecordReader);
+					Lop rr_node = getChildNode(node, execNodes, ExecLocation.RecordReader);
 
 					// all inputs of "node" must be ancestors of rr_node
 					boolean queue_it = false;
-					for (int in = 0; in < node.getInputs().size(); in++) {
+					for (Lop n : node.getInputs()) {
 						// each input should be ancestor of RecordReader lop
-						N n = (N) node.getInputs().get(in);
 						if (!n.equals(rr_node) && !isChild(rr_node, n, IDMap)) {
 							queue_it = true; // i.e., "node" must be queued
 							break;
@@ -1007,7 +1005,7 @@ public class Dag<N extends Lop>
 						// Hence, <code>node</code> can be avoided.
 						// TODO: this case should ideally be handled in the language layer 
 						//       prior to the construction of Hops Dag 
-						N input = (N) dnode.getInputs().get(0);
+						Lop input = dnode.getInputs().get(0);
 						if ( dnode.isTransient() 
 								&& input.getExecLocation() == ExecLocation.Data 
 								&& ((Data)input).isTransient() 
@@ -1081,7 +1079,7 @@ public class Dag<N extends Lop>
 						// then that input must get executed in one of the previous jobs.
 						int[] dcInputIndexes = node.distributedCacheInputIndex();
 						for( int dcInputIndex : dcInputIndexes ){
-							N dcInput = (N) node.getInputs().get(dcInputIndex-1);
+							Lop dcInput = node.getInputs().get(dcInputIndex-1);
 							if ( (dcInput.getType() != Lop.Type.Data && dcInput.getExecType()==ExecType.MR)
 								  &&  execNodes.contains(dcInput) )
 							{
@@ -1207,7 +1205,7 @@ public class Dag<N extends Lop>
 
 				// copy unassigned lops in execnodes to gmrnodes
 				for (int i = 0; i < execNodes.size(); i++) {
-					N node = execNodes.get(i);
+					Lop node = execNodes.get(i);
 					if (jobType(node, jobNodes) == -1) {
 						if ( isCompatible(node,  JobType.GMR) ) {
 							if ( node.hasNonBlockedInputs() ) {
@@ -1249,8 +1247,8 @@ public class Dag<N extends Lop>
 
 	}
 
-	private boolean compatibleWithChildrenInExecNodes(ArrayList<N> execNodes, N node) {
-	  for( N tmpNode : execNodes ) {
+	private boolean compatibleWithChildrenInExecNodes(ArrayList<Lop> execNodes, Lop node) {
+	  for( Lop tmpNode : execNodes ) {
 	    // for lops that execute in control program, compatibleJobs property is set to LopProperties.INVALID
 	    // we should not consider such lops in this check
 	    if (isChild(tmpNode, node, IDMap) 
@@ -1268,7 +1266,7 @@ public class Dag<N extends Lop>
 	 * @param varName
 	 * @param deleteInst
 	 */
-	private void excludeRemoveInstruction(String varName, ArrayList<Instruction> deleteInst) {
+	private static void excludeRemoveInstruction(String varName, ArrayList<Instruction> deleteInst) {
 		//for(Instruction inst : deleteInst) {
 		for(int i=0; i < deleteInst.size(); i++) {
 			Instruction inst = deleteInst.get(i);
@@ -1288,21 +1286,20 @@ public class Dag<N extends Lop>
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-	@SuppressWarnings("unchecked")
-	private void processConsumersForInputs(N node, ArrayList<Instruction> inst, ArrayList<Instruction> delteInst) throws DMLRuntimeException, DMLUnsupportedOperationException {
+	private void processConsumersForInputs(Lop node, ArrayList<Instruction> inst, ArrayList<Instruction> delteInst) throws DMLRuntimeException, DMLUnsupportedOperationException {
 		// reduce the consumer count for all input lops
 		// if the count becomes zero, then then variable associated w/ input can be removed
 		for(Lop in : node.getInputs() ) {
 			if(DMLScript.ENABLE_DEBUG_MODE) {
-				processConsumers((N)in, inst, delteInst, node);
+				processConsumers(in, inst, delteInst, node);
 			}
 			else {
-				processConsumers((N)in, inst, delteInst, null);
+				processConsumers(in, inst, delteInst, null);
 			}
 		}
 	}
 	
-	private void processConsumers(N node, ArrayList<Instruction> inst, ArrayList<Instruction> deleteInst, N locationInfo) throws DMLRuntimeException, DMLUnsupportedOperationException {
+	private static void processConsumers(Lop node, ArrayList<Instruction> inst, ArrayList<Instruction> deleteInst, Lop locationInfo) throws DMLRuntimeException, DMLUnsupportedOperationException {
 		// reduce the consumer count for all input lops
 		// if the count becomes zero, then then variable associated w/ input can be removed
 		if ( node.removeConsumer() == 0 ) {
@@ -1334,22 +1331,21 @@ public class Dag<N extends Lop>
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-	@SuppressWarnings("unchecked")
-	private void generateControlProgramJobs(ArrayList<N> execNodes,
+	private void generateControlProgramJobs(ArrayList<Lop> execNodes,
 			ArrayList<Instruction> inst, ArrayList<Instruction> writeInst, ArrayList<Instruction> deleteInst) throws LopsException,
 			DMLUnsupportedOperationException, DMLRuntimeException {
 
 		// nodes to be deleted from execnodes
-		ArrayList<N> markedNodes = new ArrayList<N>();
+		ArrayList<Lop> markedNodes = new ArrayList<Lop>();
 
 		// variable names to be deleted
 		ArrayList<String> var_deletions = new ArrayList<String>();
-		HashMap<String, N> var_deletionsLineNum =  new HashMap<String, N>();
+		HashMap<String, Lop> var_deletionsLineNum =  new HashMap<String, Lop>();
 		
 		boolean doRmVar = false;
 
 		for (int i = 0; i < execNodes.size(); i++) {
-			N node = execNodes.get(i);
+			Lop node = execNodes.get(i);
 			doRmVar = false;
 
 			// mark input scalar read nodes for deletion
@@ -1379,8 +1375,7 @@ public class Dag<N extends Lop>
 					inst.addAll(out.getPreInstructions());
 					
 					boolean hasTransientWriteParent = false;
-					for ( int pid=0; pid < node.getOutputs().size(); pid++ ) {
-						N parent = (N)node.getOutputs().get(pid); 
+					for ( Lop parent : node.getOutputs() ) {
 						if ( parent.getExecLocation() == ExecLocation.Data 
 								&& ((Data)parent).getOperationType() == Data.OperationTypes.WRITE 
 								&& ((Data)parent).isTransient() ) {
@@ -1593,7 +1588,7 @@ public class Dag<N extends Lop>
 		}
 
 		// delete all marked nodes
-		for ( N node : markedNodes ) {
+		for ( Lop node : markedNodes ) {
 			execNodes.remove(node);
 		}
 	}
@@ -1608,9 +1603,9 @@ public class Dag<N extends Lop>
 	 * @param queuedNodes
 	 * @throws LopsException
 	 */
-	private void removeNodesForNextIteration(N node, ArrayList<N> finishedNodes,
-			ArrayList<N> execNodes, ArrayList<N> queuedNodes,
-			ArrayList<ArrayList<N>> jobvec) throws LopsException {
+	private void removeNodesForNextIteration(Lop node, ArrayList<Lop> finishedNodes,
+			ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes,
+			ArrayList<ArrayList<Lop>> jobvec) throws LopsException {
 		
 		// only queued nodes with multiple inputs need to be handled.
 		if (node.getInputs().size() == 1)
@@ -1672,10 +1667,8 @@ public class Dag<N extends Lop>
 		// Evaluate each lop in <code>execNodes</code> for removal.
 		// Add lops to be removed to <code>markedNodes</code>.
 		
-		ArrayList<N> markedNodes = new ArrayList<N>();
-		for (int i = 0; i < execNodes.size(); i++) {
-
-			N tmpNode = execNodes.get(i);
+		ArrayList<Lop> markedNodes = new ArrayList<Lop>();
+		for (Lop tmpNode : execNodes ) {
 
 			if (LOG.isTraceEnabled()) {
 				LOG.trace("  Checking for removal (" + tmpNode.getID() + ") " + tmpNode.toString());
@@ -1761,7 +1754,7 @@ public class Dag<N extends Lop>
 		} // for i
 
 		// we also need to delete all parent nodes of marked nodes
-		for ( N enode : execNodes ) {
+		for ( Lop enode : execNodes ) {
 			if( LOG.isTraceEnabled() ) {
 				LOG.trace("  Checking for removal - ("
 							+ enode.getID() + ") " + enode.toString());
@@ -1777,7 +1770,7 @@ public class Dag<N extends Lop>
 		if ( execNodes.size() != markedNodes.size() ) {
 			// delete marked nodes from finishedNodes and execNodes
 			// add to queued nodes
-			for(N n : markedNodes) {
+			for(Lop n : markedNodes) {
 				if ( n.usesDistributedCache() )
 					gmrMapperFootprint -= computeFootprintInMapper(n);
 				finishedNodes.remove(n);
@@ -1788,7 +1781,7 @@ public class Dag<N extends Lop>
 		}
 	}
 
-	private boolean branchCanBePiggyBackedReduce(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
+	private boolean branchCanBePiggyBackedReduce(Lop tmpNode, Lop node, ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes) {
 		if(node.getExecLocation() != ExecLocation.Reduce)
 			return false;
 	    
@@ -1798,7 +1791,7 @@ public class Dag<N extends Lop>
 				return false;
 		}
 		
-		for( N n : execNodes ) {
+		for( Lop n : execNodes ) {
 		   if(n.equals(node))
 			   continue;
        
@@ -1815,8 +1808,7 @@ public class Dag<N extends Lop>
 	   return true;
 	}
 
-	@SuppressWarnings("unchecked")
-	private boolean branchCanBePiggyBackedMap(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes, ArrayList<N> markedNodes) {
+	private boolean branchCanBePiggyBackedMap(Lop tmpNode, Lop node, ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes, ArrayList<Lop> markedNodes) {
 		if(node.getExecLocation() != ExecLocation.Map)
 			return false;
 		
@@ -1836,7 +1828,7 @@ public class Dag<N extends Lop>
 		// "dcInput" must be executed prior to "node", and removal of tmpNode does not make that happen.
 		if(node.usesDistributedCache() ) {
 			for(int dcInputIndex : node.distributedCacheInputIndex()) { 
-				N dcInput = (N) node.getInputs().get(dcInputIndex-1);
+				Lop dcInput = node.getInputs().get(dcInputIndex-1);
 				if(isChild(tmpNode, dcInput, IDMap))
 					return false;
 			}
@@ -1849,7 +1841,7 @@ public class Dag<N extends Lop>
 			if (node.usesDistributedCache() )
 				memsize += computeFootprintInMapper(node);
 			if ( markedNodes != null ) {
-				for(N n : markedNodes) {
+				for(Lop n : markedNodes) {
 					if ( n.usesDistributedCache() ) 
 						memsize += computeFootprintInMapper(n);
 				}
@@ -1881,14 +1873,14 @@ public class Dag<N extends Lop>
 	 * @param finishedNodes
 	 * @return
 	 */
-	private boolean branchCanBePiggyBackedMapAndReduce(N tmpNode, N node,
-			ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
+	private boolean branchCanBePiggyBackedMapAndReduce(Lop tmpNode, Lop node,
+			ArrayList<Lop> execNodes, ArrayList<Lop> queuedNodes) {
 
 		if (node.getExecLocation() != ExecLocation.MapAndReduce)
 			return false;
 		JobType jt = JobType.findJobTypeFromLop(node);
 
-		for ( N n : execNodes ) {
+		for ( Lop n : execNodes ) {
 			if (n.equals(node))
 				continue;
 
@@ -1906,32 +1898,31 @@ public class Dag<N extends Lop>
 		return true;
 	}
 
-  private boolean branchHasNoOtherUnExecutedParents(N tmpNode, N node,
-      ArrayList<N> execNodes, ArrayList<N> finishedNodes) {
+  private boolean branchHasNoOtherUnExecutedParents(Lop tmpNode, Lop node,
+      ArrayList<Lop> execNodes, ArrayList<Lop> finishedNodes) {
 
 	  //if tmpNode has more than one unfinished output, return false 
 	  if(tmpNode.getOutputs().size() > 1)
 	  {
 	    int cnt = 0;
-	    for (int j = 0; j < tmpNode.getOutputs().size(); j++) {
-        if (!finishedNodes.contains(tmpNode.getOutputs().get(j)))
-          cnt++;
-      } 
+	    for (Lop output : tmpNode.getOutputs() )
+	    	if (!finishedNodes.contains(output))
+	    		cnt++; 
 	    
 	    if(cnt != 1)
 	      return false;
 	  }
 	  
 	  //check to see if any node between node and tmpNode has more than one unfinished output 
-	  for( N n : execNodes ) {
+	  for( Lop n : execNodes ) {
 	    if(n.equals(node) || n.equals(tmpNode))
 	      continue;
 	    
 	    if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap))
 	    {      
 	      int cnt = 0;
-	      for (int j = 0; j < n.getOutputs().size(); j++) {
-	        if (!finishedNodes.contains(n.getOutputs().get(j)))    
+	      for (Lop output : n.getOutputs() ) {
+	        if (!finishedNodes.contains(output))    
 	          cnt++;
 	      } 
 	      
@@ -1951,7 +1942,7 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-	private int jobType(Lop lops, ArrayList<ArrayList<N>> jobvec) throws LopsException {
+	private static int jobType(Lop lops, ArrayList<ArrayList<Lop>> jobvec) throws LopsException {
 		for ( JobType jt : JobType.values()) {
 			int i = jt.getId();
 			if (i > 0 && jobvec.get(i) != null && jobvec.get(i).contains(lops)) {
@@ -1970,16 +1961,13 @@ public class Dag<N extends Lop>
 	 * @param node
 	 * @return
 	 */
-	@SuppressWarnings("unchecked")
-	private boolean hasOtherMapAndReduceParentNode(N tmpNode,
-			ArrayList<N> nodeList, N node) {
+	private boolean hasOtherMapAndReduceParentNode(Lop tmpNode,
+			ArrayList<Lop> nodeList, Lop node) {
 		
 		if ( tmpNode.getExecLocation() == ExecLocation.MapAndReduce)
 			return true;
 		
-		for (int i = 0; i < tmpNode.getOutputs().size(); i++) {
-			N n = (N) tmpNode.getOutputs().get(i);
-
+		for ( Lop n : tmpNode.getOutputs() ) {
 			if ( nodeList.contains(n) && isChild(n,node,IDMap)) {
 				if(!n.equals(node) && n.getExecLocation() == ExecLocation.MapAndReduce)
 					return true;
@@ -1999,7 +1987,7 @@ public class Dag<N extends Lop>
 	 * @param node
 	 * @return
 	 */
-	private boolean hasOtherQueuedParentNode(N tmpNode, ArrayList<N> queuedNodes, N node) {
+	private boolean hasOtherQueuedParentNode(Lop tmpNode, ArrayList<Lop> queuedNodes, Lop node) {
 		if ( queuedNodes.isEmpty() )
 			return false;
 		
@@ -2008,7 +1996,7 @@ public class Dag<N extends Lop>
 		long nodeid = IDMap.get(node.getID());
 		long tmpid = IDMap.get(tmpNode.getID());
 		
-		for ( N qnode : queuedNodes ) {
+		for ( Lop qnode : queuedNodes ) {
 			int id = IDMap.get(qnode.getID());
 			if ((id != nodeid && nodeMarked[id]) && (id != tmpid && tmpMarked[id]) )
 				return true;
@@ -2023,7 +2011,7 @@ public class Dag<N extends Lop>
 	 * @param jobNodes
 	 * @throws DMLRuntimeException
 	 */
-	private void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
+	private static void printJobNodes(ArrayList<ArrayList<Lop>> jobNodes)
 		throws DMLRuntimeException 
 	{
 		if (LOG.isTraceEnabled()){
@@ -2050,36 +2038,31 @@ public class Dag<N extends Lop>
 	 * @param loc
 	 * @return
 	 */
-	private boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
-		for (int i = 0; i < nodes.size(); i++) {
-			if (nodes.get(i).getExecLocation() == ExecLocation.RecordReader)
+	private static boolean hasANode(ArrayList<Lop> nodes, ExecLocation loc) {
+		for ( Lop n : nodes ) {
+			if (n.getExecLocation() == ExecLocation.RecordReader)
 				return true;
 		}
 		return false;
 	}
 
-	private ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes) 
+	private ArrayList<ArrayList<Lop>> splitGMRNodesByRecordReader(ArrayList<Lop> gmrnodes) 
 	{
 		// obtain the list of record reader nodes
-		ArrayList<N> rrnodes = new ArrayList<N>();
-		for (int i = 0; i < gmrnodes.size(); i++) {
-			if (gmrnodes.get(i).getExecLocation() == ExecLocation.RecordReader)
-				rrnodes.add(gmrnodes.get(i));
+		ArrayList<Lop> rrnodes = new ArrayList<Lop>();
+		for (Lop gmrnode : gmrnodes ) {
+			if (gmrnode.getExecLocation() == ExecLocation.RecordReader)
+				rrnodes.add(gmrnode);
 		}
 
-		/*
-		 * We allocate one extra vector to hold lops that do not depend on any
-		 * recordreader lops
-		 */
-		ArrayList<ArrayList<N>> splitGMR = createNodeVectors(rrnodes.size() + 1);
+		// We allocate one extra vector to hold lops that do not depend on any
+		// recordreader lops
+		ArrayList<ArrayList<Lop>> splitGMR = createNodeVectors(rrnodes.size() + 1);
 
-		// flags to indicate whether a lop has been added to one of the node
-		// vectors
+		// flags to indicate whether a lop has been added to one of the node vectors
 		boolean[] flags = new boolean[gmrnodes.size()];
-		for (int i = 0; i < gmrnodes.size(); i++) {
-			flags[i] = false;
-		}
-
+		Arrays.fill(flags, false);
+		
 		// first, obtain all ancestors of recordreader lops
 		for (int rrid = 0; rrid < rrnodes.size(); rrid++) {
 			// prepare node list for i^th record reader lop
@@ -2123,10 +2106,10 @@ public class Dag<N extends Lop>
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-	private void generateMRJobs(ArrayList<N> execNodes,
+	private void generateMRJobs(ArrayList<Lop> execNodes,
 			ArrayList<Instruction> inst,
 			ArrayList<Instruction> writeinst,
-			ArrayList<Instruction> deleteinst, ArrayList<ArrayList<N>> jobNodes)
+			ArrayList<Instruction> deleteinst, ArrayList<ArrayList<Lop>> jobNodes)
 			throws LopsException, DMLUnsupportedOperationException,
 			DMLRuntimeException
 
@@ -2141,7 +2124,7 @@ public class Dag<N extends Lop>
 				continue;
 			
 			int index = jt.getId(); // job id is used as an index into jobNodes
-			ArrayList<N> currNodes = jobNodes.get(index);
+			ArrayList<Lop> currNodes = jobNodes.get(index);
 			
 			// generate MR job
 			if (currNodes != null && !currNodes.isEmpty() ) {
@@ -2151,7 +2134,7 @@ public class Dag<N extends Lop>
 
 				if (jt.allowsRecordReaderInstructions() && hasANode(jobNodes.get(index), ExecLocation.RecordReader)) {
 					// split the nodes by recordReader lops
-					ArrayList<ArrayList<N>> rrlist = splitGMRNodesByRecordReader(jobNodes.get(index));
+					ArrayList<ArrayList<Lop>> rrlist = splitGMRNodesByRecordReader(jobNodes.get(index));
 					for (int i = 0; i < rrlist.size(); i++) {
 						generateMapReduceInstructions(rrlist.get(i), inst, writeinst, deleteinst, rmvarinst, jt);
 					}
@@ -2161,7 +2144,7 @@ public class Dag<N extends Lop>
 					// We should split the nodes so that a separate job is produced for each shuffle instruction.
 					Lop.Type splittingLopType = jt.getShuffleLopType();
 					
-					ArrayList<N> nodesForASingleJob = new ArrayList<N>();
+					ArrayList<Lop> nodesForASingleJob = new ArrayList<Lop>();
 					for (int i = 0; i < jobNodes.get(index).size(); i++) {
 						if (jobNodes.get(index).get(i).getType() == splittingLopType) {
 							nodesForASingleJob.clear();
@@ -2207,8 +2190,8 @@ public class Dag<N extends Lop>
 	 * @param node_v
 	 * @param exec_n
 	 */
-	private void addParents(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
-		for (N enode : exec_n ) {
+	private void addParents(Lop node, ArrayList<Lop> node_v, ArrayList<Lop> exec_n) {
+		for (Lop enode : exec_n ) {
 			if (isChild(node, enode, IDMap)) {
 				if (!node_v.contains(enode)) {
 					if( LOG.isTraceEnabled() )
@@ -2226,8 +2209,7 @@ public class Dag<N extends Lop>
 	 * @param node_v
 	 * @param exec_n
 	 */
-	@SuppressWarnings("unchecked")
-	private void addChildren(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
+	private static void addChildren(Lop node, ArrayList<Lop> node_v, ArrayList<Lop> exec_n) {
 
 		// add child in exec nodes that is not of type scalar
 		if (exec_n.contains(node)
@@ -2244,8 +2226,7 @@ public class Dag<N extends Lop>
 			return;
 
 		// recurse
-		for (int i = 0; i < node.getInputs().size(); i++) {
-			N n = (N) node.getInputs().get(i);
+		for (Lop n : node.getInputs() ) {
 			addChildren(n, node_v, exec_n);
 		}
 	}
@@ -2257,7 +2238,7 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException 
 	 */
-	private OutputInfo getOutputInfo(N node, boolean cellModeOverride) 
+	private static OutputInfo getOutputInfo(Lop node, boolean cellModeOverride) 
 		throws LopsException 
 	{
 		if ( (node.getDataType() == DataType.SCALAR && node.getExecType() == ExecType.CP) 
@@ -2348,8 +2329,7 @@ public class Dag<N extends Lop>
 	 * @throws DMLUnsupportedOperationException 
 	 * @throws LopsException 
 	 */
-	@SuppressWarnings("unchecked")
-	private NodeOutput setupNodeOutputs(N node, ExecType et, boolean cellModeOverride, boolean copyTWrite) 
+	private NodeOutput setupNodeOutputs(Lop node, ExecType et, boolean cellModeOverride, boolean copyTWrite) 
 	throws DMLUnsupportedOperationException, DMLRuntimeException, LopsException {
 		
 		OutputParameters oparams = node.getOutputParameters();
@@ -2469,7 +2449,7 @@ public class Dag<N extends Lop>
 								fnOutParams.getLabel(),
 								getFilePath() + fnOutParams.getLabel(), 
 								true, 
-								OutputInfo.outputInfoToString(getOutputInfo((N)fnOut, false)),
+								OutputInfo.outputInfoToString(getOutputInfo(fnOut, false)),
 								new MatrixCharacteristics(fnOutParams.getNumRows(), fnOutParams.getNumCols(), (int)fnOutParams.getRowsInBlock(), (int)fnOutParams.getColsInBlock(), fnOutParams.getNnz()),
 								oparams.getUpdateInPlace()
 							);
@@ -2813,8 +2793,7 @@ public class Dag<N extends Lop>
 	 * @throws DMLUnsupportedOperationException
 	 * @throws DMLRuntimeException
 	 */
-	@SuppressWarnings("unchecked")
-	private void generateMapReduceInstructions(ArrayList<N> execNodes,
+	private void generateMapReduceInstructions(ArrayList<Lop> execNodes,
 			ArrayList<Instruction> inst, ArrayList<Instruction> writeinst, ArrayList<Instruction> deleteinst, ArrayList<Instruction> rmvarinst, 
 			JobType jt) throws LopsException,
 			DMLUnsupportedOperationException, DMLRuntimeException
@@ -2848,7 +2827,7 @@ public class Dag<N extends Lop>
 		boolean cellModeOverride = false;
 		
 		/* Find the nodes that produce an output */
-		ArrayList<N> rootNodes = new ArrayList<N>();
+		ArrayList<Lop> rootNodes = new ArrayList<Lop>();
 		getOutputNodes(execNodes, rootNodes, jt);
 		if( LOG.isTraceEnabled() )
 			LOG.trace("# of root nodes = " + rootNodes.size());
@@ -2856,9 +2835,9 @@ public class Dag<N extends Lop>
 		
 		/* Remove transient writes that are simple copy of transient reads */
 		if (jt == JobType.GMR || jt == JobType.GMRCELL) {
-			ArrayList<N> markedNodes = new ArrayList<N>();
+			ArrayList<Lop> markedNodes = new ArrayList<Lop>();
 			// only keep data nodes that are results of some computation.
-			for ( N rnode : rootNodes ) {
+			for ( Lop rnode : rootNodes ) {
 				if (rnode.getExecLocation() == ExecLocation.Data
 						&& ((Data) rnode).isTransient()
 						&& ((Data) rnode).getOperationType() == OperationTypes.WRITE
@@ -2881,11 +2860,11 @@ public class Dag<N extends Lop>
 		}
 		
 		// structure that maps node to their indices that will be used in the instructions
-		HashMap<N, Integer> nodeIndexMapping = new HashMap<N, Integer>();
+		HashMap<Lop, Integer> nodeIndexMapping = new HashMap<Lop, Integer>();
 		
 		/* Determine all input data files */
 		
-		for ( N rnode : rootNodes ) {
+		for ( Lop rnode : rootNodes ) {
 			getInputPathsAndParameters(rnode, execNodes, inputs, inputInfos, numRows, numCols, 
 				numRowsPerBlock, numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
 		}
@@ -2901,7 +2880,7 @@ public class Dag<N extends Lop>
 		
 		// currently, recordreader instructions are allowed only in GMR jobs
 		if (jt == JobType.GMR || jt == JobType.GMRCELL) {
-			for ( N rnode : rootNodes ) {
+			for ( Lop rnode : rootNodes ) {
 				getRecordReaderInstructions(rnode, execNodes, inputs, recordReaderInstructions, 
 					nodeIndexMapping, start_index, inputLabels, inputLops, MRJobLineNumbers);
 				if ( recordReaderInstructions.size() > 1 )
@@ -2953,8 +2932,7 @@ public class Dag<N extends Lop>
 		ArrayList<String> aggInstructionsReducer = new ArrayList<String>();
 		ArrayList<String> otherInstructionsReducer = new ArrayList<String>();
 
-		for (int i = 0; i < rootNodes.size(); i++) {
-			N rn = rootNodes.get(i);
+		for( Lop rn : rootNodes ) {
 			int resultIndex = getAggAndOtherInstructions(
 					rn, execNodes, shuffleInstructions, aggInstructionsReducer,
 					otherInstructionsReducer, nodeIndexMapping, start_index,
@@ -2968,14 +2946,14 @@ public class Dag<N extends Lop>
 					) {
 				// Both rn (a transient write) and its input are root nodes.
 				// Instead of creating two copies of the data, simply generate a cpvar instruction 
-				NodeOutput out = setupNodeOutputs(rootNodes.get(i), ExecType.MR, cellModeOverride, true);
+				NodeOutput out = setupNodeOutputs(rn, ExecType.MR, cellModeOverride, true);
 				writeinst.addAll(out.getLastInstructions());
 			}
 			else {
 				resultIndices.add(Byte.valueOf((byte)resultIndex));
 			
 				// setup output filenames and outputInfos and generate related instructions
-				NodeOutput out = setupNodeOutputs(rootNodes.get(i), ExecType.MR, cellModeOverride, false);
+				NodeOutput out = setupNodeOutputs(rn, ExecType.MR, cellModeOverride, false);
 				outputLabels.add(out.getVarName());
 				outputs.add(out.getFileName());
 				outputInfos.add(out.getOutInfo());
@@ -2985,8 +2963,7 @@ public class Dag<N extends Lop>
 				renameInstructions.addAll(out.getLastInstructions());
 				variableInstructions.addAll(out.getPreInstructions());
 				postInstructions.addAll(out.getPostInstructions());
-			}
-			
+			}			
 		}
 		
 		/* Determine if the output dimensions are known */
@@ -3013,7 +2990,7 @@ public class Dag<N extends Lop>
 			numReducers = total_reducers;
 		
 		// set inputs, outputs, and other other properties for the job 
-		mr.setInputOutputLabels(getStringArray(inputLabels), getStringArray(outputLabels));
+		mr.setInputOutputLabels(inputLabels.toArray(new String[0]), outputLabels.toArray(new String[0]));
 		mr.setOutputs(resultIndicesByte);
 		mr.setDimsUnknownFilePrefix(getFilePath());
 		
@@ -3027,7 +3004,7 @@ public class Dag<N extends Lop>
 		//compute and set mapper memory requirements (for consistency of runtime piggybacking)
 		if( jt == JobType.GMR ) {
 			double mem = 0;
-			for( N n : execNodes )
+			for( Lop n : execNodes )
 				mem += computeFootprintInMapper(n);
 			mr.setMemoryRequirements(mem);
 		}
@@ -3054,10 +3031,10 @@ public class Dag<N extends Lop>
 		
 		for (Lop l : inputLops) {
 			if(DMLScript.ENABLE_DEBUG_MODE) {
-				processConsumers((N)l, rmvarinst, deleteinst, (N)l);
+				processConsumers(l, rmvarinst, deleteinst, l);
 			}
 			else {
-				processConsumers((N)l, rmvarinst, deleteinst, null);
+				processConsumers(l, rmvarinst, deleteinst, null);
 			}
 		}
 	}
@@ -3068,7 +3045,7 @@ public class Dag<N extends Lop>
 	 * @param inputStrings
 	 * @return
 	 */
-	private String getCSVString(ArrayList<String> inputStrings) {
+	private static String getCSVString(ArrayList<String> inputStrings) {
 		StringBuilder sb = new StringBuilder();
 		for ( String str : inputStrings ) {
 			if( str != null ) {
@@ -3081,23 +3058,6 @@ public class Dag<N extends Lop>
 	}
 
 	/**
-	 * converts an array list of strings into an array of string
-	 * 
-	 * @param list
-	 * @return
-	 */
-	private String[] getStringArray(ArrayList<String> list) {
-		String[] arr = new String[list.size()];
-
-		for (int i = 0; i < arr.length; i++) {
-			arr[i] = list.get(i);
-		}
-
-		return arr;
-	}
-
-
-	/**
 	 * Method to populate aggregate and other instructions in reducer.
 	 * 
 	 * @param node
@@ -3111,12 +3071,11 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-	@SuppressWarnings("unchecked")
-	private int getAggAndOtherInstructions(N node, ArrayList<N> execNodes,
+	private int getAggAndOtherInstructions(Lop node, ArrayList<Lop> execNodes,
 			ArrayList<String> shuffleInstructions,
 			ArrayList<String> aggInstructionsReducer,
 			ArrayList<String> otherInstructionsReducer,
-			HashMap<N, Integer> nodeIndexMapping, int[] start_index,
+			HashMap<Lop, Integer> nodeIndexMapping, int[] start_index,
 			ArrayList<String> inputLabels, ArrayList<Lop> inputLops, 
 			ArrayList<Integer> MRJobLineNumbers) throws LopsException
 	{
@@ -3137,15 +3096,15 @@ public class Dag<N extends Lop>
 		// are parameters for the WRITE operation), so we only need to take care of the
 		// first element.
 		if (node.getType() == Lop.Type.Data && ((Data)node).getOperationType() == Data.OperationTypes.WRITE) {
-			ret_val = getAggAndOtherInstructions((N) node.getInputs().get(0),
+			ret_val = getAggAndOtherInstructions(node.getInputs().get(0),
 					execNodes, shuffleInstructions, aggInstructionsReducer,
 					otherInstructionsReducer, nodeIndexMapping, start_index,
 					inputLabels, inputLops, MRJobLineNumbers);
 			inputIndices.add(ret_val);
 		}
 		else {
-			for (int i = 0; i < node.getInputs().size(); i++) {
-				ret_val = getAggAndOtherInstructions((N) node.getInputs().get(i),
+			for ( Lop cnode : node.getInputs() ) {
+				ret_val = getAggAndOtherInstructions(cnode,
 						execNodes, shuffleInstructions, aggInstructionsReducer,
 						otherInstructionsReducer, nodeIndexMapping, start_index,
 						inputLabels, inputLops, MRJobLineNumbers);
@@ -3372,11 +3331,10 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-	@SuppressWarnings("unchecked")
-	private int getRecordReaderInstructions(N node, ArrayList<N> execNodes,
+	private static int getRecordReaderInstructions(Lop node, ArrayList<Lop> execNodes,
 			ArrayList<String> inputStrings,
 			ArrayList<String> recordReaderInstructions,
-			HashMap<N, Integer> nodeIndexMapping, int[] start_index,
+			HashMap<Lop, Integer> nodeIndexMapping, int[] start_index,
 			ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
 			ArrayList<Integer> MRJobLineNumbers) throws LopsException
 	{
@@ -3395,7 +3353,7 @@ public class Dag<N extends Lop>
 		// get mapper instructions
 		for (int i = 0; i < node.getInputs().size(); i++) {
 			// recurse
-			N childNode = (N) node.getInputs().get(i);
+			Lop childNode = node.getInputs().get(i);
 			int ret_val = getRecordReaderInstructions(childNode, execNodes,
 					inputStrings, recordReaderInstructions, nodeIndexMapping,
 					start_index, inputLabels, inputLops, MRJobLineNumbers);
@@ -3474,11 +3432,10 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-	@SuppressWarnings("unchecked")
-	private int getMapperInstructions(N node, ArrayList<N> execNodes,
+	private int getMapperInstructions(Lop node, ArrayList<Lop> execNodes,
 			ArrayList<String> inputStrings,
 			ArrayList<String> instructionsInMapper,
-			HashMap<N, Integer> nodeIndexMapping, int[] start_index,
+			HashMap<Lop, Integer> nodeIndexMapping, int[] start_index,
 			ArrayList<String> inputLabels, ArrayList<Lop> inputLops, 
 			ArrayList<Integer> MRJobLineNumbers) throws LopsException
 	{
@@ -3493,10 +3450,7 @@ public class Dag<N extends Lop>
 		ArrayList<Integer> inputIndices = new ArrayList<Integer>();
 		int max_input_index = -1;
 		// get mapper instructions
-		for (int i = 0; i < node.getInputs().size(); i++) {
-
-			// recurse
-			N childNode = (N) node.getInputs().get(i);
+		for( Lop childNode : node.getInputs()) {
 			int ret_val = getMapperInstructions(childNode, execNodes,
 					inputStrings, instructionsInMapper, nodeIndexMapping,
 					start_index, inputLabels, inputLops, MRJobLineNumbers);
@@ -3615,12 +3569,11 @@ public class Dag<N extends Lop>
 	}
 
 	// Method to populate inputs and also populates node index mapping.
-	@SuppressWarnings("unchecked")
-	private void getInputPathsAndParameters(N node, ArrayList<N> execNodes,
+	private static void getInputPathsAndParameters(Lop node, ArrayList<Lop> execNodes,
 			ArrayList<String> inputStrings, ArrayList<InputInfo> inputInfos,
 			ArrayList<Long> numRows, ArrayList<Long> numCols,
 			ArrayList<Long> numRowsPerBlock, ArrayList<Long> numColsPerBlock,
-			HashMap<N, Integer> nodeIndexMapping, ArrayList<String> inputLabels, 
+			HashMap<Lop, Integer> nodeIndexMapping, ArrayList<String> inputLabels, 
 			ArrayList<Lop> inputLops, ArrayList<Integer> MRJobLineNumbers)
 			throws LopsException {
 		// treat rand as an input.
@@ -3674,15 +3627,12 @@ public class Dag<N extends Lop>
 					nodeInputInfo = InputInfo.BinaryBlockInputInfo;
 				else 
 					throw new LopsException("Invalid format (" + node.getOutputParameters().getFormat() + ") encountered for a node/lop (ID=" + node.getID() + ") with blocked output.");
-				// inputInfos.add(InputInfo.BinaryBlockInputInfo);
-			} else {
+			} 
+			else {
 				if (node.getOutputParameters().getFormat() == Format.TEXT)
 					nodeInputInfo = InputInfo.TextCellInputInfo;
-				// inputInfos.add(InputInfo.TextCellInputInfo);
-				else {
+				else
 					nodeInputInfo = InputInfo.BinaryCellInputInfo;
-					// inputInfos.add(InputInfo.BinaryCellInputInfo);
-				}
 			}
 
 			/*
@@ -3716,7 +3666,6 @@ public class Dag<N extends Lop>
 			}
 
 			inputInfos.add(nodeInputInfo);
-
 			nodeIndexMapping.put(node, inputStrings.size() - 1);
 			return;
 
@@ -3728,7 +3677,7 @@ public class Dag<N extends Lop>
 
 		// process children recursively
 		for ( Lop lop : node.getInputs() ) {
-			getInputPathsAndParameters((N)lop, execNodes, inputStrings,
+			getInputPathsAndParameters(lop, execNodes, inputStrings,
 					inputInfos, numRows, numCols, numRowsPerBlock,
 					numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
 		}
@@ -3741,8 +3690,8 @@ public class Dag<N extends Lop>
 	 * @param execNodes
 	 * @param rootNodes
 	 */
-	private void getOutputNodes(ArrayList<N> execNodes, ArrayList<N> rootNodes, JobType jt) {
-		for ( N node : execNodes ) {
+	private static void getOutputNodes(ArrayList<Lop> execNodes, ArrayList<Lop> rootNodes, JobType jt) {
+		for ( Lop node : execNodes ) {
 			// terminal node
 			if (node.getOutputs().isEmpty() && !rootNodes.contains(node)) {
 				rootNodes.add(node);
@@ -3790,7 +3739,7 @@ public class Dag<N extends Lop>
 	 * @param v
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private void doTopologicalSort_strict_order(ArrayList<N> v) {
+	private void doTopologicalSort_strict_order(ArrayList<Lop> v) {
 		//int numNodes = v.size();
 
 		/*
@@ -3802,7 +3751,7 @@ public class Dag<N extends Lop>
 		// Step1: Performed at the time of creating Lops
 		
 		// Step2: sort nodes by level, and then by node ID
-		Object[] nodearray = v.toArray();
+		Lop[] nodearray = v.toArray(new Lop[0]);
 		Arrays.sort(nodearray, new LopComparator());
 
 		// Copy sorted nodes into "v" and construct a mapping between Lop IDs and sequence of numbers
@@ -3810,7 +3759,7 @@ public class Dag<N extends Lop>
 		IDMap.clear();
 		
 		for (int i = 0; i < nodearray.length; i++) {
-			v.add((N) nodearray[i]);
+			v.add(nodearray[i]);
 			IDMap.put(v.get(i).getID(), i);
 		}
 		
@@ -3830,7 +3779,7 @@ public class Dag<N extends Lop>
 
 		// print the nodes in sorted order
 		if (LOG.isTraceEnabled()) {
-			for ( N vnode : v ) {
+			for ( Lop vnode : v ) {
 				StringBuilder sb = new StringBuilder();
 				sb.append(vnode.getID());
 				sb.append("(");
@@ -3859,8 +3808,7 @@ public class Dag<N extends Lop>
 	 * @param root
 	 * @param marked
 	 */
-	@SuppressWarnings("unchecked")
-	private void dagDFS(N root, boolean[] marked) {
+	private void dagDFS(Lop root, boolean[] marked) {
 		//contains check currently required for globalopt, will be removed when cleaned up
 		if( !IDMap.containsKey(root.getID()) )
 			return;
@@ -3870,40 +3818,40 @@ public class Dag<N extends Lop>
 			return;
 		marked[mapID] = true;
 		for( Lop lop : root.getOutputs() ) {
-			dagDFS((N)lop, marked);
+			dagDFS(lop, marked);
 		}
 	}
 	
-	private boolean hasDirectChildNode(N node, ArrayList<N> childNodes) {
+	private static boolean hasDirectChildNode(Lop node, ArrayList<Lop> childNodes) {
 		if ( childNodes.isEmpty() ) 
 			return false;
-		for( N cnode : childNodes ) {
+		for( Lop cnode : childNodes ) {
 			if ( cnode.getOutputs().contains(node))
 				return true;
 		}
 		return false;
 	}
 	
-	private boolean hasChildNode(N node, ArrayList<N> nodes) {
+	private boolean hasChildNode(Lop node, ArrayList<Lop> nodes) {
 		return hasChildNode(node, nodes, ExecLocation.INVALID);
 	}
 
-	private boolean hasChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
+	private boolean hasChildNode(Lop node, ArrayList<Lop> childNodes, ExecLocation type) {
 		if ( childNodes.isEmpty() ) 
 			return false;
 		int index = IDMap.get(node.getID());
-		for( N cnode : childNodes ) {
+		for( Lop cnode : childNodes ) {
 			if ( (type == ExecLocation.INVALID || cnode.getExecLocation() == type) && cnode.get_reachable()[index])
 				return true;
 		}
 		return false;
 	}
 	
-	private N getChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
+	private Lop getChildNode(Lop node, ArrayList<Lop> childNodes, ExecLocation type) {
 		if ( childNodes.isEmpty() )
 			return null;
 		int index = IDMap.get(node.getID());
-		for( N cnode : childNodes ) {
+		for( Lop cnode : childNodes ) {
 			if ( cnode.getExecLocation() == type && cnode.get_reachable()[index])
 				return cnode;
 		}
@@ -3919,10 +3867,10 @@ public class Dag<N extends Lop>
 	 * Returns null if no such "n" exists
 	 * 
 	 */
-	private N getParentNode(N node, ArrayList<N> parentNodes, ExecLocation type) {
+	private Lop getParentNode(Lop node, ArrayList<Lop> parentNodes, ExecLocation type) {
 		if ( parentNodes.isEmpty() )
 			return null;
-		for( N pn : parentNodes ) {
+		for( Lop pn : parentNodes ) {
 			int index = IDMap.get( pn.getID() );
 			if ( pn.getExecLocation() == type && node.get_reachable()[index])
 				return pn;
@@ -3932,25 +3880,25 @@ public class Dag<N extends Lop>
 
 	// Checks if "node" has any descendants in nodesVec with definedMRJob flag
 	// set to true
-	private boolean hasMRJobChildNode(N node, ArrayList<N> nodesVec) {
+	private boolean hasMRJobChildNode(Lop node, ArrayList<Lop> nodesVec) {
 		if ( nodesVec.isEmpty() )
 			return false;
 		
 		int index = IDMap.get(node.getID());
-		for( N n : nodesVec ) {
+		for( Lop n : nodesVec ) {
 			if ( n.definesMRJob() && n.get_reachable()[index]) 
 				return true;
 		}
 		return false;
 	}
 
-	private boolean checkDataGenAsChildNode(N node, ArrayList<N> nodesVec) {
+	private boolean checkDataGenAsChildNode(Lop node, ArrayList<Lop> nodesVec) {
 		if( nodesVec.isEmpty() )
 			return true;
 		
 		int index = IDMap.get(node.getID());
 		boolean onlyDatagen = true;
-		for( N n : nodesVec ) {
+		for( Lop n : nodesVec ) {
 			if ( n.definesMRJob() && n.get_reachable()[index] &&  JobType.findJobTypeFromLop(n) != JobType.DATAGEN )
 				onlyDatagen = false;
 		}
@@ -3958,11 +3906,9 @@ public class Dag<N extends Lop>
 		return onlyDatagen;
 	}
 
-	@SuppressWarnings("unchecked")
-	private int getChildAlignment(N node, ArrayList<N> execNodes, ExecLocation type) 
+	private static int getChildAlignment(Lop node, ArrayList<Lop> execNodes, ExecLocation type) 
 	{
-		for (Lop lop : node.getInputs() ) {
-			N n = (N) lop;
+		for (Lop n : node.getInputs() ) {
 			if (!execNodes.contains(n))
 				continue;
 
@@ -3993,10 +3939,10 @@ public class Dag<N extends Lop>
 		return MRCHILD_NOT_FOUND;
 	}
 
-	private boolean hasParentNode(N node, ArrayList<N> parentNodes) {
+	private boolean hasParentNode(Lop node, ArrayList<Lop> parentNodes) {
 		if ( parentNodes.isEmpty() )
 			return false;		
-		for( N pnode : parentNodes ) {
+		for( Lop pnode : parentNodes ) {
 			int index = IDMap.get( pnode.getID() );
 			if ( node.get_reachable()[index])
 				return true;