You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:45:55 UTC

[24/78] [abbrv] [partial] incubator-systemml git commit: Move files to new package folder structure

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/lops/compile/Dag.java b/src/main/java/com/ibm/bi/dml/lops/compile/Dag.java
deleted file mode 100644
index e5b63f5..0000000
--- a/src/main/java/com/ibm/bi/dml/lops/compile/Dag.java
+++ /dev/null
@@ -1,4574 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.lops.compile;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.api.DMLScript.RUNTIME_PLATFORM;
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.conf.DMLConfig;
-import com.ibm.bi.dml.hops.AggBinaryOp;
-import com.ibm.bi.dml.hops.BinaryOp;
-import com.ibm.bi.dml.hops.Hop.FileFormatTypes;
-import com.ibm.bi.dml.hops.HopsException;
-import com.ibm.bi.dml.hops.OptimizerUtils;
-import com.ibm.bi.dml.lops.AppendM;
-import com.ibm.bi.dml.lops.BinaryM;
-import com.ibm.bi.dml.lops.CombineBinary;
-import com.ibm.bi.dml.lops.Data;
-import com.ibm.bi.dml.lops.PMMJ;
-import com.ibm.bi.dml.lops.ParameterizedBuiltin;
-import com.ibm.bi.dml.lops.SortKeys;
-import com.ibm.bi.dml.lops.Data.OperationTypes;
-import com.ibm.bi.dml.lops.FunctionCallCP;
-import com.ibm.bi.dml.lops.Lop;
-import com.ibm.bi.dml.lops.Lop.Type;
-import com.ibm.bi.dml.lops.LopProperties.ExecLocation;
-import com.ibm.bi.dml.lops.LopProperties.ExecType;
-import com.ibm.bi.dml.lops.LopsException;
-import com.ibm.bi.dml.lops.MapMult;
-import com.ibm.bi.dml.lops.OutputParameters;
-import com.ibm.bi.dml.lops.OutputParameters.Format;
-import com.ibm.bi.dml.lops.PickByCount;
-import com.ibm.bi.dml.lops.Unary;
-import com.ibm.bi.dml.parser.DataExpression;
-import com.ibm.bi.dml.parser.Expression;
-import com.ibm.bi.dml.parser.ParameterizedBuiltinFunctionExpression;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.StatementBlock;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.ProgramConverter;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.IDSequence;
-import com.ibm.bi.dml.runtime.instructions.CPInstructionParser;
-import com.ibm.bi.dml.runtime.instructions.Instruction;
-import com.ibm.bi.dml.runtime.instructions.Instruction.INSTRUCTION_TYPE;
-import com.ibm.bi.dml.runtime.instructions.InstructionParser;
-import com.ibm.bi.dml.runtime.instructions.SPInstructionParser;
-import com.ibm.bi.dml.runtime.instructions.cp.CPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.VariableCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.CPInstruction.CPINSTRUCTION_TYPE;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.sort.PickFromCompactInputFormat;
-
-
-
-/**
- * 
- *  Class to maintain a DAG and compile it into jobs
- * @param <N>
- */
-public class Dag<N extends Lop> 
-{
-	
-	private static final Log LOG = LogFactory.getLog(Dag.class.getName());
-
-	private static final int CHILD_BREAKS_ALIGNMENT = 2;
-	private static final int CHILD_DOES_NOT_BREAK_ALIGNMENT = 1;
-	private static final int MRCHILD_NOT_FOUND = 0;
-	private static final int MR_CHILD_FOUND_BREAKS_ALIGNMENT = 4;
-	private static final int MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT = 5;
-
-	private static IDSequence job_id = null;
-	private static IDSequence var_index = null;
-	
-	private int total_reducers = -1;
-	private String scratch = "";
-	private String scratchFilePath = "";
-	
-	private double gmrMapperFootprint = 0;
-	
-	static {
-		job_id = new IDSequence();
-		var_index = new IDSequence();
-	}
-	
-	// hash set for all nodes in dag
-	private ArrayList<N> nodes = null;
-	
-	/* 
-	 * Hashmap to translates the nodes in the DAG to a sequence of numbers
-	 *     key:   Lop ID
-	 *     value: Sequence Number (0 ... |DAG|)
-	 *     
-	 * This map is primarily used in performing DFS on the DAG, and subsequently in performing ancestor-descendant checks.
-	 */
-	private HashMap<Long, Integer> IDMap = null;
-
-
-	private class NodeOutput {
-		String fileName;
-		String varName;
-		OutputInfo outInfo;
-		ArrayList<Instruction> preInstructions; // instructions added before a MR instruction
-		ArrayList<Instruction> postInstructions; // instructions added after a MR instruction
-		ArrayList<Instruction> lastInstructions;
-		
-		NodeOutput() {
-			fileName = null;
-			varName = null;
-			outInfo = null;
-			preInstructions = new ArrayList<Instruction>(); 
-			postInstructions = new ArrayList<Instruction>(); 
-			lastInstructions = new ArrayList<Instruction>();
-		}
-		
-		public String getFileName() {
-			return fileName;
-		}
-		public void setFileName(String fileName) {
-			this.fileName = fileName;
-		}
-		public String getVarName() {
-			return varName;
-		}
-		public void setVarName(String varName) {
-			this.varName = varName;
-		}
-		public OutputInfo getOutInfo() {
-			return outInfo;
-		}
-		public void setOutInfo(OutputInfo outInfo) {
-			this.outInfo = outInfo;
-		}
-		public ArrayList<Instruction> getPreInstructions() {
-			return preInstructions;
-		}
-		public void addPreInstruction(Instruction inst) {
-			preInstructions.add(inst);
-		}
-		public ArrayList<Instruction> getPostInstructions() {
-			return postInstructions;
-		}
-		public void addPostInstruction(Instruction inst) {
-			postInstructions.add(inst);
-		}
-		public ArrayList<Instruction> getLastInstructions() {
-			return lastInstructions;
-		}
-		public void addLastInstruction(Instruction inst) {
-			lastInstructions.add(inst);
-		}
-		
-	}
-	
-	private String getFilePath() {
-		if ( scratchFilePath.equalsIgnoreCase("") ) {
-			scratchFilePath = scratch + Lop.FILE_SEPARATOR
-								+ Lop.PROCESS_PREFIX + DMLScript.getUUID()
-								+ Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR
-								+ ProgramConverter.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
-		}
-		return scratchFilePath;
-	}
-	
-	/**
-	 * Constructor
-	 */
-	public Dag() 
-	{
-		//allocate internal data structures
-		nodes = new ArrayList<N>();
-		IDMap = new HashMap<Long, Integer>();
-
-		// get number of reducers from dml config
-		DMLConfig conf = ConfigurationManager.getConfig();
-		total_reducers = conf.getIntValue(DMLConfig.NUM_REDUCERS);
-	}
-
-	/**
-	 * 
-	 * @param config
-	 * @return
-	 * @throws LopsException
-	 * @throws IOException
-	 * @throws DMLRuntimeException
-	 * @throws DMLUnsupportedOperationException
-	 */
-	public ArrayList<Instruction> getJobs(DMLConfig config)
-			throws LopsException, IOException, DMLRuntimeException, DMLUnsupportedOperationException 
-	{
-		return getJobs(null, config);
-	}
-	
-	/**
-	 * Method to compile a dag generically
-	 * 
-	 * @param config
-	 * @throws LopsException
-	 * @throws DMLUnsupportedOperationException
-	 * @throws DMLRuntimeException
-	 */
-
-	public ArrayList<Instruction> getJobs(StatementBlock sb, DMLConfig config)
-			throws LopsException, IOException, DMLRuntimeException,
-			DMLUnsupportedOperationException {
-		
-		if (config != null) 
-		{
-			total_reducers = config.getIntValue(DMLConfig.NUM_REDUCERS);
-			scratch = config.getTextValue(DMLConfig.SCRATCH_SPACE) + "/";
-		}
-		
-		// hold all nodes in a vector (needed for ordering)
-		ArrayList<N> node_v = new ArrayList<N>();
-		node_v.addAll(nodes);
-		
-		/*
-		 * Sort the nodes by topological order.
-		 * 
-		 * 1) All nodes with level i appear prior to the nodes in level i+1.
-		 * 2) All nodes within a level are ordered by their ID i.e., in the order
-		 * they are created
-		 */
-		doTopologicalSort_strict_order(node_v);
-		
-		// do greedy grouping of operations
-		ArrayList<Instruction> inst = doGreedyGrouping(sb, node_v);
-		
-		return inst;
-
-	}
-
-	/**
-	 * Method to remove transient reads that do not have a transient write
-	 * 
-	 * @param nodeV
-	 * @param inst
-	 * @throws DMLUnsupportedOperationException
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("unused")
-	private void deleteUnwantedTransientReadVariables(ArrayList<N> nodeV,
-			ArrayList<Instruction> inst) throws DMLRuntimeException,
-			DMLUnsupportedOperationException {
-		HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
-		
-		LOG.trace("In delete unwanted variables");
-
-		// first capture all transient read variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
-
-			if (node.getExecLocation() == ExecLocation.Data
-					&& ((Data) node).isTransient()
-					&& ((Data) node).getOperationType() == OperationTypes.READ) {
-				labelNodeMapping.put(node.getOutputParameters().getLabel(),
-						node);
-			}
-		}
-
-		// generate delete instructions for all transient read variables without
-		// a transient write
-		// first capture all transient write variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
-
-			if (node.getExecLocation() == ExecLocation.Data
-					&& ((Data) node).isTransient()
-					&& ((Data) node).getOperationType() == OperationTypes.WRITE ) {
-				if (node.getInputs().get(0).getExecLocation() == ExecLocation.Data) {
-					// this transient write is NOT a result of actual computation, but is simple copy.
-					// in such a case, preserve the input variable so that appropriate copy instruction (cpvar or GMR) is generated
-					// therefore, remove the input label from labelNodeMapping
-					labelNodeMapping.remove(node.getInputs().get(0).getOutputParameters().getLabel());
-				}
-				else {
-					if(labelNodeMapping.containsKey(node.getOutputParameters().getLabel())) 
-						// corresponding transient read exists (i.e., the variable is updated)
-						// in such a case, generate rmvar instruction so that OLD data gets deleted
-						labelNodeMapping.remove(node.getOutputParameters().getLabel());
-				}
-			}
-		}
-
-		// generate RM instructions
-		Instruction rm_inst = null;
-		for( Entry<String, N> e : labelNodeMapping.entrySet() ) {
-			String label = e.getKey();
-			N node = e.getValue();
-
-			if (((Data) node).getDataType() == DataType.SCALAR) {
-				// if(DEBUG)
-				// System.out.println("rmvar" + Lops.OPERAND_DELIMITOR + label);
-				// inst.add(new VariableSimpleInstructions("rmvar" +
-				// Lops.OPERAND_DELIMITOR + label));
-
-			} else {
-				rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
-				rm_inst.setLocation(node);
-				
-				if( LOG.isTraceEnabled() )
-					LOG.trace(rm_inst.toString());
-				inst.add(rm_inst);
-				
-			}
-		}
-
-	}
-
-	private void deleteUpdatedTransientReadVariables(StatementBlock sb, ArrayList<N> nodeV,
-			ArrayList<Instruction> inst) throws DMLRuntimeException,
-			DMLUnsupportedOperationException {
-
-		if ( sb == null ) 
-			return;
-		
-		LOG.trace("In delete updated variables");
-
-		/*
-		Set<String> in = sb.liveIn().getVariables().keySet();
-		Set<String> out = sb.liveOut().getVariables().keySet();
-		Set<String> updated = sb.variablesUpdated().getVariables().keySet();
-		
-		Set<String> intersection = in;
-		intersection.retainAll(out);
-		intersection.retainAll(updated);
-		
-		for (String var : intersection) {
-			inst.add(VariableCPInstruction.prepareRemoveInstruction(var));
-		}
-		*/
-
-		// CANDIDATE list of variables which could have been updated in this statement block 
-		HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
-		
-		// ACTUAL list of variables whose value is updated, AND the old value of the variable 
-		// is no longer accessible/used.
-		HashSet<String> updatedLabels = new HashSet<String>();
-		HashMap<String, N> updatedLabelsLineNum =  new HashMap<String, N>();
-		
-		// first capture all transient read variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
-
-			if (node.getExecLocation() == ExecLocation.Data
-					&& ((Data) node).isTransient()
-					&& ((Data) node).getOperationType() == OperationTypes.READ
-					&& ((Data) node).getDataType() == DataType.MATRIX) {
-				
-				// "node" is considered as updated ONLY IF the old value is not used any more
-				// So, make sure that this READ node does not feed into any (transient/persistent) WRITE
-				boolean hasWriteParent=false;
-				for(Lop p : node.getOutputs()) {
-					if(p.getExecLocation() == ExecLocation.Data) {
-						// if the "p" is of type Data, then it has to be a WRITE
-						hasWriteParent = true;
-						break;
-					}
-				}
-				
-				if ( !hasWriteParent ) {
-					// node has no parent of type WRITE, so this is a CANDIDATE variable 
-					// add it to labelNodeMapping so that it is considered in further processing  
-					labelNodeMapping.put(node.getOutputParameters().getLabel(), node);
-				}
-			}
-		}
-
-		// capture updated transient write variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
-
-			if (node.getExecLocation() == ExecLocation.Data
-					&& ((Data) node).isTransient()
-					&& ((Data) node).getOperationType() == OperationTypes.WRITE
-					&& ((Data) node).getDataType() == DataType.MATRIX
-					&& labelNodeMapping.containsKey(node.getOutputParameters().getLabel()) // check to make sure corresponding (i.e., with the same label/name) transient read is present
-					&& !labelNodeMapping.containsValue(node.getInputs().get(0)) // check to avoid cases where transient read feeds into a transient write 
-				) {
-				updatedLabels.add(node.getOutputParameters().getLabel());
-				updatedLabelsLineNum.put(node.getOutputParameters().getLabel(), node);
-				
-			}
-		}
-		
-		// generate RM instructions
-		Instruction rm_inst = null;
-		for ( String label : updatedLabels ) 
-		{
-			rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
-			rm_inst.setLocation(updatedLabelsLineNum.get(label));
-			
-			
-			LOG.trace(rm_inst.toString());
-			inst.add(rm_inst);
-		}
-
-	}
-	  
-	private void generateRemoveInstructions(StatementBlock sb,
-			ArrayList<Instruction> deleteInst)
-			throws DMLUnsupportedOperationException, DMLRuntimeException {
-		
-		if ( sb == null ) 
-			return;
-		
-		LOG.trace("In generateRemoveInstructions()");
-		
-
-		Instruction inst = null;
-		// RULE 1: if in IN and not in OUT, then there should be an rmvar or rmfilevar inst
-		// (currently required for specific cases of external functions)
-		for (String varName : sb.liveIn().getVariableNames()) {
-			if (!sb.liveOut().containsVariable(varName)) {
-				// DataType dt = in.getVariable(varName).getDataType();
-				// if( !(dt==DataType.MATRIX || dt==DataType.UNKNOWN) )
-				// continue; //skip rm instructions for non-matrix objects
-
-				inst = VariableCPInstruction.prepareRemoveInstruction(varName);
-				inst.setLocation(sb.getEndLine(), sb.getEndLine(), -1, -1);
-				
-				deleteInst.add(inst);
-
-				if( LOG.isTraceEnabled() )
-					LOG.trace("  Adding " + inst.toString());
-			}
-		}
-
-		// RULE 2: if in KILL and not in IN and not in OUT, then there should be an rmvar or rmfilevar inst
-		// (currently required for specific cases of nested loops)
-		// i.e., local variables which are created within the block, and used entirely within the block 
-		/*for (String varName : sb.getKill().getVariableNames()) {
-			if ((!sb.liveIn().containsVariable(varName))
-					&& (!sb.liveOut().containsVariable(varName))) {
-				// DataType dt =
-				// sb.getKill().getVariable(varName).getDataType();
-				// if( !(dt==DataType.MATRIX || dt==DataType.UNKNOWN) )
-				// continue; //skip rm instructions for non-matrix objects
-
-				inst = createCleanupInstruction(varName);
-				deleteInst.add(inst);
-
-				if (DMLScript.DEBUG)
-					System.out.println("Adding instruction (r2) "
-							+ inst.toString());
-			}
-		}*/
-	}
-
-	/**
-	 * Method to add a node to the DAG.
-	 * 
-	 * @param node
-	 * @return true if node was not already present, false if not.
-	 */
-
-	public boolean addNode(N node) {
-		if (nodes.contains(node))
-			return false;
-		else {
-			nodes.add(node);
-			return true;
-		}
-		
-	}
-
-	private ArrayList<ArrayList<N>> createNodeVectors(int size) {
-		ArrayList<ArrayList<N>> arr = new ArrayList<ArrayList<N>>();
-
-		// for each job type, we need to create a vector.
-		// additionally, create another vector for execNodes
-		for (int i = 0; i < size; i++) {
-			arr.add(new ArrayList<N>());
-		}
-		return arr;
-	}
-
-	private void clearNodeVectors(ArrayList<ArrayList<N>> arr) {
-		for (int i = 0; i < arr.size(); i++) {
-			arr.get(i).clear();
-		}
-	}
-
-	private boolean isCompatible(ArrayList<N> nodes, JobType jt, int from,
-			int to) throws LopsException {
-		
-		int base = jt.getBase();
-
-		for (int i = from; i < to; i++) {
-			if ((nodes.get(i).getCompatibleJobs() & base) == 0) {
-				if( LOG.isTraceEnabled() )
-					LOG.trace("Not compatible "+ nodes.get(i).toString());
-				return false;
-			}
-		}
-		return true;
-	}
-
-	/**
-	 * Function that determines if the two input nodes can be executed together 
-	 * in at least on job.
-	 * 
-	 * @param node1
-	 * @param node2
-	 * @return
-	 */
-	private boolean isCompatible(N node1, N node2) {
-		return( (node1.getCompatibleJobs() & node2.getCompatibleJobs()) > 0);
-	}
-	
-	/**
-	 * Function that checks if the given node executes in the job specified by jt.
-	 * 
-	 * @param node
-	 * @param jt
-	 * @return
-	 */
-	private boolean isCompatible(N node, JobType jt) {
-		if ( jt == JobType.GMRCELL )
-			jt = JobType.GMR;
-		return ((node.getCompatibleJobs() & jt.getBase()) > 0);
-	}
-
-	/*
-	 * Add node, and its relevant children to job-specific node vectors.
-	 */
-	private void addNodeByJobType(N node, ArrayList<ArrayList<N>> arr,
-			ArrayList<N> execNodes, boolean eliminate) throws LopsException {
-		
-		if (!eliminate) {
-			// Check if this lop defines a MR job.
-			if ( node.definesMRJob() ) {
-				
-				// find the corresponding JobType
-				JobType jt = JobType.findJobTypeFromLop(node);
-				
-				if ( jt == null ) {
-					throw new LopsException(node.printErrorLocation() + "No matching JobType is found for a the lop type: " + node.getType() + " \n");
-				}
-				
-				// Add "node" to corresponding job vector
-				
-				if ( jt == JobType.GMR ) {
-					if ( node.hasNonBlockedInputs() ) {
-						int gmrcell_index = JobType.GMRCELL.getId();
-						arr.get(gmrcell_index).add(node);
-						int from = arr.get(gmrcell_index).size();
-						addChildren(node, arr.get(gmrcell_index), execNodes);
-						int to = arr.get(gmrcell_index).size();
-						if (!isCompatible(arr.get(gmrcell_index),JobType.GMR, from, to))  // check against GMR only, not against GMRCELL
-							throw new LopsException(node.printErrorLocation() + "Error during compatibility check \n");
-					}
-					else {
-						// if "node" (in this case, a group lop) has any inputs from RAND 
-						// then add it to RAND job. Otherwise, create a GMR job
-						if (hasChildNode(node, arr.get(JobType.DATAGEN.getId()) )) {
-							arr.get(JobType.DATAGEN.getId()).add(node);
-							// we should NOT call 'addChildren' because appropriate
-							// child nodes would have got added to RAND job already
-						} else {
-							int gmr_index = JobType.GMR.getId();
-							arr.get(gmr_index).add(node);
-							int from = arr.get(gmr_index).size();
-							addChildren(node, arr.get(gmr_index), execNodes);
-							int to = arr.get(gmr_index).size();
-							if (!isCompatible(arr.get(gmr_index),JobType.GMR, from, to)) 
-								throw new LopsException(node.printErrorLocation() + "Error during compatibility check \n");
-						}
-					}
-				}
-				else {
-					int index = jt.getId();
-					arr.get(index).add(node);
-					int from = arr.get(index).size();
-					addChildren(node, arr.get(index), execNodes);
-					int to = arr.get(index).size();
-					// check if all added nodes are compatible with current job
-					if (!isCompatible(arr.get(index), jt, from, to)) {
-						throw new LopsException( 
-								"Unexpected error in addNodeByType.");
-					}
-				}
-				return;
-			}
-		}
-
-		if ( eliminate ) {
-			// Eliminated lops are directly added to GMR queue. 
-			// Note that eliminate flag is set only for 'group' lops
-			if ( node.hasNonBlockedInputs() )
-				arr.get(JobType.GMRCELL.getId()).add(node);
-			else
-				arr.get(JobType.GMR.getId()).add(node);
-			return;
-		}
-		
-		/*
-		 * If this lop does not define a job, check if it uses the output of any
-		 * specialized job. i.e., if this lop has a child node in any of the
-		 * job-specific vector, then add it to the vector. Note: This lop must
-		 * be added to ONLY ONE of the job-specific vectors.
-		 */
-
-		int numAdded = 0;
-		for ( JobType j : JobType.values() ) {
-			if ( j.getId() > 0 && hasDirectChildNode(node, arr.get(j.getId()))) {
-				if (isCompatible(node, j)) {
-					arr.get(j.getId()).add(node);
-					numAdded += 1;
-				}
-			}
-		}
-		if (numAdded > 1) {
-			throw new LopsException("Unexpected error in addNodeByJobType(): A given lop can ONLY be added to a single job vector (numAdded = " + numAdded + ")." );
-		}
-	}
-
-	/*
-	 * Remove the node from all job-specific node vectors. This method is
-	 * invoked from removeNodesForNextIteration().
-	 */
-	private void removeNodeByJobType(N node, ArrayList<ArrayList<N>> arr) {
-		for ( JobType jt : JobType.values())
-			if ( jt.getId() > 0 ) 
-				arr.get(jt.getId()).remove(node);
-	}
-
-	/**
-	 * As some jobs only write one output, all operations in the mapper need to
-	 * be redone and cannot be marked as finished.
-	 * 
-	 * @param execNodes
-	 * @param jobNodes
-	 * @throws LopsException
-	 */
-
-	private void handleSingleOutputJobs(ArrayList<N> execNodes,
-			ArrayList<ArrayList<N>> jobNodes, ArrayList<N> finishedNodes)
-			throws LopsException {
-		/*
-		 * If the input of a MMCJ/MMRJ job (must have executed in a Mapper) is used
-		 * by multiple lops then we should mark it as not-finished.
-		 */
-		ArrayList<N> nodesWithUnfinishedOutputs = new ArrayList<N>();
-		int[] jobIndices = {JobType.MMCJ.getId()};
-		Lop.Type[] lopTypes = { Lop.Type.MMCJ};
-		
-		// TODO: SortByValue should be treated similar to MMCJ, since it can
-		// only sort one file now
-		
-		for ( int jobi=0; jobi < jobIndices.length; jobi++ ) {
-			int jindex = jobIndices[jobi];
-			if (!jobNodes.get(jindex).isEmpty()) {
-				ArrayList<N> vec = jobNodes.get(jindex);
-
-				// first find all nodes with more than one parent that is not
-				// finished.
-
-				for (int i = 0; i < vec.size(); i++) {
-					N node = vec.get(i);
-					if (node.getExecLocation() == ExecLocation.MapOrReduce
-							|| node.getExecLocation() == ExecLocation.Map) {
-						N MRparent = getParentNode(node, execNodes, ExecLocation.MapAndReduce);
-						if ( MRparent != null && MRparent.getType() == lopTypes[jobi]) {
-							int numParents = node.getOutputs().size();
-							if (numParents > 1) {
-								for (int j = 0; j < numParents; j++) {
-									if (!finishedNodes.contains(node.getOutputs()
-											.get(j)))
-										nodesWithUnfinishedOutputs.add(node);
-								}
-	
-							}
-						}
-					} 
-					/*
-					// Following condition will not work for MMRJ because execNodes may contain non-MapAndReduce 
-					// lops that are compatible with MMRJ (e.g., Data-WRITE)
-					else if (!(node.getExecLocation() == ExecLocation.MapAndReduce 
-							     && node.getType() == lopTypes[jobi])) {
-						throw new LopsException(
-								"Error in handleSingleOutputJobs(): encountered incompatible execLocation='"
-										+ node.getExecLocation() + "' for lop (ID="
-										+ node.getID() + ").");
-					}
-					*/
-				}
-
-				// need to redo all nodes in nodesWithOutput as well as their
-				// children
-
-				for (int i = 0; i < vec.size(); i++) {
-					N node = vec.get(i);
-					if (node.getExecLocation() == ExecLocation.MapOrReduce
-							|| node.getExecLocation() == ExecLocation.Map) {
-						if (nodesWithUnfinishedOutputs.contains(node))
-							finishedNodes.remove(node);
-
-						if (hasParentNode(node, nodesWithUnfinishedOutputs))
-							finishedNodes.remove(node);
-
-					}
-				}
-			}			
-		}
-		
-	}
-
-	/** Method to check if a lop can be eliminated from checking **/
-	private boolean canEliminateLop(N node, ArrayList<N> execNodes) {
-		// this function can only eliminate "aligner" lops such a group
-		if (!node.isAligner())
-			return false;
-
-		// find the child whose execLoc = 'MapAndReduce'
-		int ret = getChildAlignment(node, execNodes, ExecLocation.MapAndReduce);
-
-		if (ret == CHILD_BREAKS_ALIGNMENT)
-			return false;
-		else if (ret == CHILD_DOES_NOT_BREAK_ALIGNMENT)
-			return true;
-		else if (ret == MRCHILD_NOT_FOUND)
-			return false;
-		else if (ret == MR_CHILD_FOUND_BREAKS_ALIGNMENT)
-			return false;
-		else if (ret == MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT)
-			return true;
-		else
-			throw new RuntimeException("Should not happen. \n");
-	}
-	
-	
-	/**
-	 * Method to generate createvar instructions, which creates a new entry
-	 * in the symbol table. One instruction is generated for every LOP that is 
-	 * 1) type Data and 
-	 * 2) persistent and 
-	 * 3) matrix and 
-	 * 4) read
-	 * 
-	 * Transient reads needn't be considered here since the previous program 
-	 * block would already create appropriate entries in the symbol table.
-	 * 
-	 * @param nodes
-	 * @throws LopsException 
-	 */
-	private void generateInstructionsForInputVariables(ArrayList<N> nodes_v, ArrayList<Instruction> inst) throws LopsException, IOException {
-		for(N n : nodes_v) {
-			if (n.getExecLocation() == ExecLocation.Data && !((Data) n).isTransient() 
-					&& ((Data) n).getOperationType() == OperationTypes.READ 
-					&& (n.getDataType() == DataType.MATRIX || n.getDataType() == DataType.FRAME) ) {
-				
-				if ( !((Data)n).isLiteral() ) {
-					try {
-						String inst_string = n.getInstructions();						
-						CPInstruction currInstr = CPInstructionParser.parseSingleInstruction(inst_string);
-						currInstr.setLocation(n);						
-						inst.add(currInstr);
-					} catch (DMLUnsupportedOperationException e) {
-						throw new LopsException(n.printErrorLocation() + "error generating instructions from input variables in Dag -- \n", e);
-					} catch (DMLRuntimeException e) {
-						throw new LopsException(n.printErrorLocation() + "error generating instructions from input variables in Dag -- \n", e);
-					}
-				}
-			}
-		}
-	}
-	
-	/*private String getOutputFormat(Data node) { 
-		if(node.getFileFormatType() == FileFormatTypes.TEXT)
-			return "textcell";
-		else if ( node.getOutputParameters().isBlocked_representation() )
-			return "binaryblock";
-		else
-			return "binarycell";
-	}*/
-	
-	/**
-	 * Determine whether to send <code>node</code> to MR or to process it in the control program.
-	 * It is sent to MR in the following cases:
-	 * 
-	 * 1) if input lop gets processed in MR then <code>node</code> can be piggybacked
-	 * 
-	 * 2) if the exectype of write lop itself is marked MR i.e., memory estimate > memory budget.
-	 * 
-	 * @param node
-	 * @return
-	 */
-	@SuppressWarnings("unchecked")
-	private boolean sendWriteLopToMR(N node) 
-	{
-		if ( DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE )
-			return false;
-		N in = (N) node.getInputs().get(0);
-		Format nodeFormat = node.getOutputParameters().getFormat();
-		
-		// Case of a transient read feeding into only one output persistent binaryblock write
-		// Move the temporary file on HDFS to required persistent location, insteadof copying.
-		if ( in.getExecLocation() == ExecLocation.Data && in.getOutputs().size() == 1
-				&& !((Data)node).isTransient()
-				&& ((Data)in).isTransient()
-				&& ((Data)in).getOutputParameters().isBlocked()
-				&& node.getOutputParameters().isBlocked() ) {
-			return false;
-		}
-		
-		//send write lop to MR if (1) it is marked with exec type MR (based on its memory estimate), or
-		//(2) if the input lop is in MR and the write format allows to pack it into the same job (this does
-		//not apply to csv write because MR csvwrite is a separate MR job type)
-		if( node.getExecType() == ExecType.MR || (in.getExecType() == ExecType.MR && nodeFormat != Format.CSV ) )
-			return true;
-		else
-			return false;
-	}
-	
-	/**
-	 * Computes the memory footprint required to execute <code>node</code> in the mapper.
-	 * It is used only for those nodes that use inputs from distributed cache. The returned 
-	 * value is utilized in limiting the number of instructions piggybacked onto a single GMR mapper.
-	 */
-	private double computeFootprintInMapper(N node) {
-		// Memory limits must be checked only for nodes that use distributed cache
-		if ( ! node.usesDistributedCache() )
-			// default behavior
-			return 0.0;
-		
-		OutputParameters in1dims = node.getInputs().get(0).getOutputParameters();
-		OutputParameters in2dims = node.getInputs().get(1).getOutputParameters();
-		
-		double footprint = 0;
-		if ( node instanceof MapMult ) {
-			int dcInputIndex = node.distributedCacheInputIndex()[0];
-			footprint = AggBinaryOp.getMapmmMemEstimate(
-					in1dims.getNumRows(), in1dims.getNumCols(), in1dims.getRowsInBlock(), in1dims.getColsInBlock(), in1dims.getNnz(),
-					in2dims.getNumRows(), in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), in2dims.getNnz(),
-					dcInputIndex, false);
-		}
-		else if ( node instanceof PMMJ ) {
-			int dcInputIndex = node.distributedCacheInputIndex()[0];
-			footprint = AggBinaryOp.getMapmmMemEstimate(
-					in1dims.getNumRows(), 1, in1dims.getRowsInBlock(), in1dims.getColsInBlock(), in1dims.getNnz(),
-					in2dims.getNumRows(), in2dims.getNumCols(), in2dims.getRowsInBlock(), in2dims.getColsInBlock(), in2dims.getNnz(), 
-					dcInputIndex, true);
-		}
-		else if ( node instanceof AppendM ) {
-			footprint = BinaryOp.footprintInMapper(
-					in1dims.getNumRows(), in1dims.getNumCols(), 
-					in2dims.getNumRows(), in2dims.getNumCols(), 
-					in1dims.getRowsInBlock(), in1dims.getColsInBlock());
-		}
-		else if ( node instanceof BinaryM ) {
-			footprint = BinaryOp.footprintInMapper(
-					in1dims.getNumRows(), in1dims.getNumCols(), 
-					in2dims.getNumRows(), in2dims.getNumCols(), 
-					in1dims.getRowsInBlock(), in1dims.getColsInBlock());
-		}
-		else {
-			// default behavior
-			return 0.0;
-		}
-		return footprint;
-	}
-	
-	/**
-	 * Determines if <code>node</code> can be executed in current round of MR jobs or if it needs to be queued for later rounds.
-	 * If the total estimated footprint (<code>node</code> and previously added nodes in GMR) is less than available memory on 
-	 * the mappers then <code>node</code> can be executed in current round, and <code>true</code> is returned. Otherwise, 
-	 * <code>node</code> must be queued and <code>false</code> is returned. 
-	 */
-	private boolean checkMemoryLimits(N node, double footprintInMapper) {
-		boolean addNode = true;
-		
-		// Memory limits must be checked only for nodes that use distributed cache
-		if ( ! node.usesDistributedCache() )
-			// default behavior
-			return addNode;
-		
-		double memBudget = Math.min(AggBinaryOp.MAPMULT_MEM_MULTIPLIER, BinaryOp.APPEND_MEM_MULTIPLIER) * OptimizerUtils.getRemoteMemBudgetMap(true);
-		if ( footprintInMapper <= memBudget ) 
-			return addNode;
-		else
-			return !addNode;
-	}
-	
-	/**
-	 * Method to group a vector of sorted lops.
-	 * 
-	 * @param node_v
-	 * @throws LopsException
-	 * @throws DMLUnsupportedOperationException
-	 * @throws DMLRuntimeException
-	 */
-
-	@SuppressWarnings("unchecked")
-	private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, ArrayList<N> node_v)
-			throws LopsException, IOException, DMLRuntimeException,
-			DMLUnsupportedOperationException
-
-	{
-		LOG.trace("Grouping DAG ============");
-
-		// nodes to be executed in current iteration
-		ArrayList<N> execNodes = new ArrayList<N>();
-		// nodes that have already been processed
-		ArrayList<N> finishedNodes = new ArrayList<N>();
-		// nodes that are queued for the following iteration
-		ArrayList<N> queuedNodes = new ArrayList<N>();
-
-		ArrayList<ArrayList<N>> jobNodes = createNodeVectors(JobType.getNumJobTypes());
-		
-
-		// list of instructions
-		ArrayList<Instruction> inst = new ArrayList<Instruction>();
-
-		//ArrayList<Instruction> preWriteDeleteInst = new ArrayList<Instruction>();
-		ArrayList<Instruction> writeInst = new ArrayList<Instruction>();
-		ArrayList<Instruction> deleteInst = new ArrayList<Instruction>();
-		ArrayList<Instruction> endOfBlockInst = new ArrayList<Instruction>();
-
-		// delete transient variables that are no longer needed
-		//deleteUnwantedTransientReadVariables(node_v, deleteInst);
-
-		// remove files for transient reads that are updated.
-		deleteUpdatedTransientReadVariables(sb, node_v, writeInst);
-		
-		generateRemoveInstructions(sb, endOfBlockInst);
-
-		generateInstructionsForInputVariables(node_v, inst);
-		
-		
-		boolean done = false;
-		String indent = "    ";
-
-		while (!done) {
-			LOG.trace("Grouping nodes in DAG");
-
-			execNodes.clear();
-			queuedNodes.clear();
-			clearNodeVectors(jobNodes);
-			gmrMapperFootprint=0;
-
-			for (int i = 0; i < node_v.size(); i++) {
-				N node = node_v.get(i);
-
-				// finished nodes don't need to be processed
-
-				if (finishedNodes.contains(node))
-					continue;
-
-				if( LOG.isTraceEnabled() )
-					LOG.trace("Processing node (" + node.getID()
-							+ ") " + node.toString() + " exec nodes size is " + execNodes.size());
-				
-				
-		        //if node defines MR job, make sure it is compatible with all 
-		        //its children nodes in execNodes 
-		        if(node.definesMRJob() && !compatibleWithChildrenInExecNodes(execNodes, node))
-		        {
-		        	if( LOG.isTraceEnabled() )			
-			          LOG.trace(indent + "Queueing node "
-			                + node.toString() + " (code 1)");
-			
-		          queuedNodes.add(node);
-		          removeNodesForNextIteration(node, finishedNodes, execNodes, queuedNodes, jobNodes);
-		          continue;
-		        }
-
-				// if child is queued, this node will be processed in the later
-				// iteration
-				if (hasChildNode(node,queuedNodes)) {
-
-					if( LOG.isTraceEnabled() )
-						LOG.trace(indent + "Queueing node "
-								+ node.toString() + " (code 2)");
-					//for(N q: queuedNodes) {
-					//	LOG.trace(indent + "  " + q.getType() + "," + q.getID());
-					//}
-					queuedNodes.add(node);
-
-					// if node has more than two inputs,
-					// remove children that will be needed in a future
-					// iterations
-					// may also have to remove parent nodes of these children
-					removeNodesForNextIteration(node, finishedNodes, execNodes,
-							queuedNodes, jobNodes);
-
-					continue;
-				}
-				
-				// if inputs come from different jobs, then queue
-				if ( node.getInputs().size() >= 2) {
-					int jobid = Integer.MIN_VALUE;
-					boolean queueit = false;
-					for(int idx=0; idx < node.getInputs().size(); idx++) {
-						int input_jobid = jobType(node.getInputs().get(idx), jobNodes);
-						if (input_jobid != -1) {
-							if ( jobid == Integer.MIN_VALUE )
-								jobid = input_jobid;
-							else if ( jobid != input_jobid ) { 
-								queueit = true;
-								break;
-							}
-						}
-					}
-					if ( queueit ) {
-						if( LOG.isTraceEnabled() )
-							LOG.trace(indent + "Queueing node " + node.toString() + " (code 3)");
-						queuedNodes.add(node);
-						removeNodesForNextIteration(node, finishedNodes, execNodes, queuedNodes, jobNodes);
-						continue;
-					}
-				}
-
-				/*if (node.getInputs().size() == 2) {
-					int j1 = jobType(node.getInputs().get(0), jobNodes);
-					int j2 = jobType(node.getInputs().get(1), jobNodes);
-					if (j1 != -1 && j2 != -1 && j1 != j2) {
-						LOG.trace(indent + "Queueing node "
-									+ node.toString() + " (code 3)");
-
-						queuedNodes.add(node);
-
-						removeNodesForNextIteration(node, finishedNodes,
-								execNodes, queuedNodes, jobNodes);
-
-						continue;
-					}
-				}*/
-
-				// See if this lop can be eliminated
-				// This check is for "aligner" lops (e.g., group)
-				boolean eliminate = false;
-				eliminate = canEliminateLop(node, execNodes);
-				if (eliminate) {
-					if( LOG.isTraceEnabled() )
-						LOG.trace(indent + "Adding -"+ node.toString());
-					execNodes.add(node);
-					finishedNodes.add(node);
-					addNodeByJobType(node, jobNodes, execNodes, eliminate);
-					continue;
-				}
-
-				// If the node defines a MR Job then make sure none of its
-				// children that defines a MR Job are present in execNodes
-				if (node.definesMRJob()) {
-					if (hasMRJobChildNode(node, execNodes)) {
-						// "node" must NOT be queued when node=group and the child that defines job is Rand
-						// this is because "group" can be pushed into the "Rand" job.
-						if (! (node.getType() == Lop.Type.Grouping && checkDataGenAsChildNode(node,execNodes))  ) {
-							if( LOG.isTraceEnabled() )
-								LOG.trace(indent + "Queueing node " + node.toString() + " (code 4)");
-
-							queuedNodes.add(node);
-
-							removeNodesForNextIteration(node, finishedNodes,
-									execNodes, queuedNodes, jobNodes);
-
-							continue;
-						}
-					}
-				}
-
-				// if "node" has more than one input, and has a descendant lop
-				// in execNodes that is of type RecordReader
-				// then all its inputs must be ancestors of RecordReader. If
-				// not, queue "node"
-				if (node.getInputs().size() > 1
-						&& hasChildNode(node, execNodes, ExecLocation.RecordReader)) {
-					// get the actual RecordReader lop
-					N rr_node = getChildNode(node, execNodes, ExecLocation.RecordReader);
-
-					// all inputs of "node" must be ancestors of rr_node
-					boolean queue_it = false;
-					for (int in = 0; in < node.getInputs().size(); in++) {
-						// each input should be ancestor of RecordReader lop
-						N n = (N) node.getInputs().get(in);
-						if (!n.equals(rr_node) && !isChild(rr_node, n, IDMap)) {
-							queue_it = true; // i.e., "node" must be queued
-							break;
-						}
-					}
-
-					if (queue_it) {
-						// queue node
-						if( LOG.isTraceEnabled() )
-							LOG.trace(indent + "Queueing -" + node.toString() + " (code 5)");
-						queuedNodes.add(node);
-						// TODO: does this have to be modified to handle
-						// recordreader lops?
-						removeNodesForNextIteration(node, finishedNodes,
-								execNodes, queuedNodes, jobNodes);
-						continue;
-					} else {
-						// nothing here.. subsequent checks have to be performed
-						// on "node"
-						;
-					}
-				}
-
-				// data node, always add if child not queued
-				// only write nodes are kept in execnodes
-				if (node.getExecLocation() == ExecLocation.Data) {
-					Data dnode = (Data) node;
-					boolean dnode_queued = false;
-					
-					if ( dnode.getOperationType() == OperationTypes.READ ) {
-						if( LOG.isTraceEnabled() )
-							LOG.trace(indent + "Adding Data -"+ node.toString());
-
-						// TODO: avoid readScalar instruction, and read it on-demand just like the way Matrices are read in control program
-						if ( node.getDataType() == DataType.SCALAR 
-								//TODO: LEO check the following condition is still needed
-								&& node.getOutputParameters().getFile_name() != null ) {
-							// this lop corresponds to reading a scalar from HDFS file
-							// add it to execNodes so that "readScalar" instruction gets generated
-							execNodes.add(node);
-							// note: no need to add it to any job vector
-						}
-					}
-					else if (dnode.getOperationType() == OperationTypes.WRITE) {
-						// Skip the transient write <code>node</code> if the input is a 
-						// transient read with the same variable name. i.e., a dummy copy. 
-						// Hence, <code>node</code> can be avoided.
-						// TODO: this case should ideally be handled in the language layer 
-						//       prior to the construction of Hops Dag 
-						N input = (N) dnode.getInputs().get(0);
-						if ( dnode.isTransient() 
-								&& input.getExecLocation() == ExecLocation.Data 
-								&& ((Data)input).isTransient() 
-								&& dnode.getOutputParameters().getLabel().compareTo(input.getOutputParameters().getLabel()) == 0 ) {
-							// do nothing, <code>node</code> must not processed any further.
-							;
-						}
-						else if ( execNodes.contains(input) && !isCompatible(node, input) && sendWriteLopToMR(node)) {
-							// input is in execNodes but it is not compatible with write lop. So, queue the write lop.
-							if( LOG.isTraceEnabled() )
-								LOG.trace(indent + "Queueing -" + node.toString());
-							queuedNodes.add(node);
-							dnode_queued = true;
-						}
-						else {
-							if( LOG.isTraceEnabled() )
-								LOG.trace(indent + "Adding Data -"+ node.toString());
-
-							execNodes.add(node);
-							if ( sendWriteLopToMR(node) ) {
-								addNodeByJobType(node, jobNodes, execNodes, false);
-							}
-						}
-					}
-					if (!dnode_queued)
-						finishedNodes.add(node);
-
-					continue;
-				}
-
-				// map or reduce node, can always be piggybacked with parent
-				if (node.getExecLocation() == ExecLocation.MapOrReduce) {
-					if( LOG.isTraceEnabled() )
-						LOG.trace(indent + "Adding -"+ node.toString());
-					execNodes.add(node);
-					finishedNodes.add(node);
-					addNodeByJobType(node, jobNodes, execNodes, false);
-
-					continue;
-				}
-
-				// RecordReader node, add, if no parent needs reduce, else queue
-				if (node.getExecLocation() == ExecLocation.RecordReader) {
-					// "node" should not have any children in
-					// execNodes .. it has to be the first one in the job!
-					if (!hasChildNode(node, execNodes, ExecLocation.Map)
-							&& !hasChildNode(node, execNodes,
-									ExecLocation.MapAndReduce)) {
-						if( LOG.isTraceEnabled() )
-							LOG.trace(indent + "Adding -"+ node.toString());
-						execNodes.add(node);
-						finishedNodes.add(node);
-						addNodeByJobType(node, jobNodes, execNodes, false);
-					} else {
-						if( LOG.isTraceEnabled() )
-							LOG.trace(indent + "Queueing -"+ node.toString() + " (code 6)");
-						queuedNodes.add(node);
-						removeNodesForNextIteration(node, finishedNodes,
-								execNodes, queuedNodes, jobNodes);
-
-					}
-					continue;
-				}
-
-				// map node, add, if no parent needs reduce, else queue
-				if (node.getExecLocation() == ExecLocation.Map) {
-					boolean queueThisNode = false;
-					int subcode = -1;
-					if ( node.usesDistributedCache() ) {
-						// if an input to <code>node</code> comes from distributed cache
-						// then that input must get executed in one of the previous jobs.
-						int[] dcInputIndexes = node.distributedCacheInputIndex();
-						for( int dcInputIndex : dcInputIndexes ){
-							N dcInput = (N) node.getInputs().get(dcInputIndex-1);
-							if ( (dcInput.getType() != Lop.Type.Data && dcInput.getExecType()==ExecType.MR)
-								  &&  execNodes.contains(dcInput) )
-							{
-								queueThisNode = true;
-								subcode = 1;
-							}
-						}
-						
-						// Limit the number of distributed cache inputs based on the available memory in mappers
-						double memsize = computeFootprintInMapper(node);
-						//gmrMapperFootprint += computeFootprintInMapper(node);
-						if ( gmrMapperFootprint>0 && !checkMemoryLimits(node, gmrMapperFootprint+memsize ) ) {
-							queueThisNode = true;
-							subcode = 2;
-						}
-						if(!queueThisNode)
-							gmrMapperFootprint += memsize;
-					}
-					if (!queueThisNode && !hasChildNode(node, execNodes,ExecLocation.MapAndReduce)&& !hasMRJobChildNode(node, execNodes)) {
-						if( LOG.isTraceEnabled() )
-							LOG.trace(indent + "Adding -"+ node.toString());
-						execNodes.add(node);
-						finishedNodes.add(node);
-						addNodeByJobType(node, jobNodes, execNodes, false);
-					} else {
-						if( LOG.isTraceEnabled() )
-							LOG.trace(indent + "Queueing -"+ node.toString() + " (code 7 - " + "subcode " + subcode + ")");
-						queuedNodes.add(node);
-						removeNodesForNextIteration(node, finishedNodes,
-								execNodes, queuedNodes, jobNodes);
-
-					}
-					continue;
-				}
-
-				// reduce node, make sure no parent needs reduce, else queue
-				if (node.getExecLocation() == ExecLocation.MapAndReduce) {
-
-					// boolean eliminate = false;
-					// eliminate = canEliminateLop(node, execNodes);
-					// if (eliminate || (!hasChildNode(node, execNodes,
-					// ExecLocation.MapAndReduce)) &&
-					// !hasMRJobChildNode(node,execNodes)) {
-
-					// TODO: statiko -- keep the middle condition
-					// discuss about having a lop that is MapAndReduce but does
-					// not define a job
-					if( LOG.isTraceEnabled() )
-						LOG.trace(indent + "Adding -"+ node.toString());
-					execNodes.add(node);
-					finishedNodes.add(node);
-					addNodeByJobType(node, jobNodes, execNodes, eliminate);
-
-					// } else {
-					// if (DEBUG)
-					// System.out.println("Queueing -" + node.toString());
-					// queuedNodes.add(node);
-					// removeNodesForNextIteration(node, finishedNodes,
-					// execNodes, queuedNodes, jobNodes);
-					// }
-					continue;
-				}
-
-				// aligned reduce, make sure a parent that is reduce exists
-				if (node.getExecLocation() == ExecLocation.Reduce) {
-					if (  compatibleWithChildrenInExecNodes(execNodes, node) && 
-							(hasChildNode(node, execNodes, ExecLocation.MapAndReduce)
-							 || hasChildNode(node, execNodes, ExecLocation.Map) ) ) 
-					{ 
-						if( LOG.isTraceEnabled() )
-							LOG.trace(indent + "Adding -"+ node.toString());
-						execNodes.add(node);
-						finishedNodes.add(node);
-						addNodeByJobType(node, jobNodes, execNodes, false);
-					} else {
-						if( LOG.isTraceEnabled() )
-							LOG.trace(indent + "Queueing -"+ node.toString() + " (code 8)");
-						queuedNodes.add(node);
-						removeNodesForNextIteration(node, finishedNodes,
-								execNodes, queuedNodes, jobNodes);
-					}
-
-					continue;
-
-				}
-
-				// add Scalar to execNodes if it has no child in exec nodes
-				// that will be executed in a MR job.
-				if (node.getExecLocation() == ExecLocation.ControlProgram) {
-					for (int j = 0; j < node.getInputs().size(); j++) {
-						if (execNodes.contains(node.getInputs().get(j))
-								&& !(node.getInputs().get(j).getExecLocation() == ExecLocation.Data)
-								&& !(node.getInputs().get(j).getExecLocation() == ExecLocation.ControlProgram)) {
-							if( LOG.isTraceEnabled() )
-								LOG.trace(indent + "Queueing -"+ node.toString() + " (code 9)");
-
-							queuedNodes.add(node);
-							removeNodesForNextIteration(node, finishedNodes,
-									execNodes, queuedNodes, jobNodes);
-							break;
-						}
-					}
-
-					if (queuedNodes.contains(node))
-						continue;
-					if( LOG.isTraceEnabled() )
-						LOG.trace(indent + "Adding - scalar"+ node.toString());
-					execNodes.add(node);
-					addNodeByJobType(node, jobNodes, execNodes, false);
-					finishedNodes.add(node);
-					continue;
-				}
-
-			}
-
-			// no work to do
-			if ( execNodes.isEmpty() ) {
-			  
-			  if( !queuedNodes.isEmpty() )
-			  {
-			      //System.err.println("Queued nodes should be 0");
-			      throw new LopsException("Queued nodes should not be 0 at this point \n");
-			  }
-			  
-			  if( LOG.isTraceEnabled() )
-				LOG.trace("All done! queuedNodes = "+ queuedNodes.size());
-				
-			  done = true;
-			} else {
-				// work to do
-
-				if( LOG.isTraceEnabled() )
-					LOG.trace("Generating jobs for group -- Node count="+ execNodes.size());
-
-				// first process scalar instructions
-				generateControlProgramJobs(execNodes, inst, writeInst, deleteInst);
-
-				// copy unassigned lops in execnodes to gmrnodes
-				for (int i = 0; i < execNodes.size(); i++) {
-					N node = execNodes.get(i);
-					if (jobType(node, jobNodes) == -1) {
-						if ( isCompatible(node,  JobType.GMR) ) {
-							if ( node.hasNonBlockedInputs() ) {
-								jobNodes.get(JobType.GMRCELL.getId()).add(node);
-								addChildren(node, jobNodes.get(JobType.GMRCELL.getId()), execNodes);
-							}
-							else {
-								jobNodes.get(JobType.GMR.getId()).add(node);
-								addChildren(node, jobNodes.get(JobType.GMR.getId()), execNodes);
-							}
-						}
-						else {
-							if( LOG.isTraceEnabled() )
-								LOG.trace(indent + "Queueing -" + node.toString() + " (code 10)");
-							execNodes.remove(i);
-							finishedNodes.remove(node);
-							queuedNodes.add(node);
-							removeNodesForNextIteration(node, finishedNodes,
-								execNodes, queuedNodes, jobNodes);
-						}
-					}
-				}
-
-				// next generate MR instructions
-				if (!execNodes.isEmpty())
-					generateMRJobs(execNodes, inst, writeInst, deleteInst, jobNodes);
-
-				handleSingleOutputJobs(execNodes, jobNodes, finishedNodes);
-
-			}
-
-		}
-
-		// add write and delete inst at the very end.
-
-		//inst.addAll(preWriteDeleteInst);
-		inst.addAll(writeInst);
-		inst.addAll(deleteInst);
-		inst.addAll(endOfBlockInst);
-
-		return inst;
-
-	}
-
-	private boolean compatibleWithChildrenInExecNodes(ArrayList<N> execNodes, N node) {
-	  for(int i=0; i < execNodes.size(); i++)
-	  {
-	    N tmpNode = execNodes.get(i);
-	    // for lops that execute in control program, compatibleJobs property is set to LopProperties.INVALID
-	    // we should not consider such lops in this check
-	    if (isChild(tmpNode, node, IDMap) 
-	    		&& tmpNode.getExecLocation() != ExecLocation.ControlProgram
-	    		//&& tmpNode.getCompatibleJobs() != LopProperties.INVALID 
-	    		&& (tmpNode.getCompatibleJobs() & node.getCompatibleJobs()) == 0)
-	      return false;
-	  }
-	  return true;
-	}
-
-	/**
-	 * Exclude rmvar instruction for <varname> from deleteInst, if exists
-	 * 
-	 * @param varName
-	 * @param deleteInst
-	 */
-	private void excludeRemoveInstruction(String varName, ArrayList<Instruction> deleteInst) {
-		//for(Instruction inst : deleteInst) {
-		for(int i=0; i < deleteInst.size(); i++) {
-			Instruction inst = deleteInst.get(i);
-			if ((inst.getType() == INSTRUCTION_TYPE.CONTROL_PROGRAM  || inst.getType() == INSTRUCTION_TYPE.SPARK)
-					&& ((CPInstruction)inst).getCPInstructionType() == CPINSTRUCTION_TYPE.Variable 
-					&& ((VariableCPInstruction)inst).isRemoveVariable(varName) ) {
-				deleteInst.remove(i);
-			}
-		}
-	}
-	
-	/**
-	 * Generate rmvar instructions for the inputs, if their consumer count becomes zero.
-	 * 
-	 * @param node
-	 * @param inst
-	 * @throws DMLRuntimeException
-	 * @throws DMLUnsupportedOperationException
-	 */
-	@SuppressWarnings("unchecked")
-	private void processConsumersForInputs(N node, ArrayList<Instruction> inst, ArrayList<Instruction> delteInst) throws DMLRuntimeException, DMLUnsupportedOperationException {
-		// reduce the consumer count for all input lops
-		// if the count becomes zero, then then variable associated w/ input can be removed
-		for(Lop in : node.getInputs() ) {
-			if(DMLScript.ENABLE_DEBUG_MODE) {
-				processConsumers((N)in, inst, delteInst, node);
-			}
-			else {
-				processConsumers((N)in, inst, delteInst, null);
-			}
-			
-			/*if ( in.removeConsumer() == 0 ) {
-				String label = in.getOutputParameters().getLabel();
-				inst.add(VariableCPInstruction.prepareRemoveInstruction(label));
-			}*/
-		}
-	}
-	
-	private void processConsumers(N node, ArrayList<Instruction> inst, ArrayList<Instruction> deleteInst, N locationInfo) throws DMLRuntimeException, DMLUnsupportedOperationException {
-		// reduce the consumer count for all input lops
-		// if the count becomes zero, then then variable associated w/ input can be removed
-		if ( node.removeConsumer() == 0 ) {
-			if ( node.getExecLocation() == ExecLocation.Data && ((Data)node).isLiteral() ) {
-				return;
-			}
-			
-			String label = node.getOutputParameters().getLabel();
-			Instruction currInstr = VariableCPInstruction.prepareRemoveInstruction(label);
-			if (locationInfo != null)
-				currInstr.setLocation(locationInfo);
-			else
-				currInstr.setLocation(node);
-			
-			inst.add(currInstr);
-			excludeRemoveInstruction(label, deleteInst);
-		}
-	}
-	
-	/**
-	 * Method to generate instructions that are executed in Control Program. At
-	 * this point, this DAG has no dependencies on the MR dag. ie. none of the
-	 * inputs are outputs of MR jobs
-	 * 
-	 * @param execNodes
-	 * @param inst
-	 * @param deleteInst
-	 * @throws LopsException
-	 * @throws DMLRuntimeException
-	 * @throws DMLUnsupportedOperationException
-	 */
-
-	@SuppressWarnings("unchecked")
-	private void generateControlProgramJobs(ArrayList<N> execNodes,
-			ArrayList<Instruction> inst, ArrayList<Instruction> writeInst, ArrayList<Instruction> deleteInst) throws LopsException,
-			DMLUnsupportedOperationException, DMLRuntimeException {
-
-		// nodes to be deleted from execnodes
-		ArrayList<N> markedNodes = new ArrayList<N>();
-
-		// variable names to be deleted
-		ArrayList<String> var_deletions = new ArrayList<String>();
-		HashMap<String, N> var_deletionsLineNum =  new HashMap<String, N>();
-		
-		boolean doRmVar = false;
-
-		for (int i = 0; i < execNodes.size(); i++) {
-			N node = execNodes.get(i);
-			doRmVar = false;
-
-			// mark input scalar read nodes for deletion
-			// TODO: statiko -- check if this condition ever evaluated to TRUE
-			if (node.getExecLocation() == ExecLocation.Data
-					&& ((Data) node).getOperationType() == Data.OperationTypes.READ
-					&& ((Data) node).getDataType() == DataType.SCALAR 
-					&& node.getOutputParameters().getFile_name() == null ) {
-				markedNodes.add(node);
-				continue;
-			}
-			
-			// output scalar instructions and mark nodes for deletion
-			if (node.getExecLocation() == ExecLocation.ControlProgram) {
-
-				if (node.getDataType() == DataType.SCALAR) {
-					// Output from lops with SCALAR data type must
-					// go into Temporary Variables (Var0, Var1, etc.)
-					NodeOutput out = setupNodeOutputs(node, ExecType.CP, false, false);
-					inst.addAll(out.getPreInstructions()); // dummy
-					deleteInst.addAll(out.getLastInstructions());
-				} else {
-					// Output from lops with non-SCALAR data type must
-					// go into Temporary Files (temp0, temp1, etc.)
-					
-					NodeOutput out = setupNodeOutputs(node, ExecType.CP, false, false);
-					inst.addAll(out.getPreInstructions());
-					
-					boolean hasTransientWriteParent = false;
-					for ( int pid=0; pid < node.getOutputs().size(); pid++ ) {
-						N parent = (N)node.getOutputs().get(pid); 
-						if ( parent.getExecLocation() == ExecLocation.Data 
-								&& ((Data)parent).getOperationType() == Data.OperationTypes.WRITE 
-								&& ((Data)parent).isTransient() ) {
-							hasTransientWriteParent = true;
-							break;
-						}
-					}
-					
-					if ( !hasTransientWriteParent ) {
-						deleteInst.addAll(out.getLastInstructions());
-					} 
-					else {
-						var_deletions.add(node.getOutputParameters().getLabel());
-						var_deletionsLineNum.put(node.getOutputParameters().getLabel(), node);
-						
-						//System.out.println("    --> skipping " + out.getLastInstructions() + " while processing node " + node.getID());
-					}
-				}
-
-				String inst_string = "";
-
-				// Lops with arbitrary number of inputs (ParameterizedBuiltin, GroupedAggregate, DataGen)
-				// are handled separately, by simply passing ONLY the output variable to getInstructions()
-				if (node.getType() == Lop.Type.ParameterizedBuiltin
-						|| node.getType() == Lop.Type.GroupedAgg 
-						|| node.getType() == Lop.Type.DataGen ){ 
-					inst_string = node.getInstructions(node.getOutputParameters().getLabel());
-				} 
-				
-				// Lops with arbitrary number of inputs and outputs are handled
-				// separately as well by passing arrays of inputs and outputs
-				else if ( node.getType() == Lop.Type.FunctionCallCP )
-				{
-					String[] inputs = new String[node.getInputs().size()];
-					String[] outputs = new String[node.getOutputs().size()];
-					int count = 0;
-					for( Lop in : node.getInputs() )
-						inputs[count++] = in.getOutputParameters().getLabel();
-					count = 0;
-					for( Lop out : node.getOutputs() )
-					{
-						outputs[count++] = out.getOutputParameters().getLabel();
-					}
-					
-					inst_string = node.getInstructions(inputs, outputs);
-				}
-				else {
-					if ( node.getInputs().isEmpty() ) {
-						// currently, such a case exists only for Rand lop
-						inst_string = node.getInstructions(node.getOutputParameters().getLabel());
-					}
-					else if (node.getInputs().size() == 1) {
-						inst_string = node.getInstructions(node.getInputs()
-								.get(0).getOutputParameters().getLabel(),
-								node.getOutputParameters().getLabel());
-					} 
-					else if (node.getInputs().size() == 2) {
-						inst_string = node.getInstructions(
-								node.getInputs().get(0).getOutputParameters().getLabel(),
-								node.getInputs().get(1).getOutputParameters().getLabel(),
-								node.getOutputParameters().getLabel());
-					} 
-					else if (node.getInputs().size() == 3 || node.getType() == Type.Ternary) {
-						inst_string = node.getInstructions(
-								node.getInputs().get(0).getOutputParameters().getLabel(),
-								node.getInputs().get(1).getOutputParameters().getLabel(),
-								node.getInputs().get(2).getOutputParameters().getLabel(),
-								node.getOutputParameters().getLabel());
-					}
-					else if (node.getInputs().size() == 4) {
-						inst_string = node.getInstructions(
-								node.getInputs().get(0).getOutputParameters().getLabel(),
-								node.getInputs().get(1).getOutputParameters().getLabel(),
-								node.getInputs().get(2).getOutputParameters().getLabel(),
-								node.getInputs().get(3).getOutputParameters().getLabel(),
-								node.getOutputParameters().getLabel());
-					}
-					else if (node.getInputs().size() == 5) {
-						inst_string = node.getInstructions(
-								node.getInputs().get(0).getOutputParameters().getLabel(),
-								node.getInputs().get(1).getOutputParameters().getLabel(),
-								node.getInputs().get(2).getOutputParameters().getLabel(),
-								node.getInputs().get(3).getOutputParameters().getLabel(),
-								node.getInputs().get(4).getOutputParameters().getLabel(),
-								node.getOutputParameters().getLabel());
-					}
-					else if (node.getInputs().size() == 6) {
-						inst_string = node.getInstructions(
-								node.getInputs().get(0).getOutputParameters().getLabel(),
-								node.getInputs().get(1).getOutputParameters().getLabel(),
-								node.getInputs().get(2).getOutputParameters().getLabel(),
-								node.getInputs().get(3).getOutputParameters().getLabel(),
-								node.getInputs().get(4).getOutputParameters().getLabel(),
-								node.getInputs().get(5).getOutputParameters().getLabel(),
-								node.getOutputParameters().getLabel());
-					}
-					else if (node.getInputs().size() == 7) {
-						inst_string = node.getInstructions(
-								node.getInputs().get(0).getOutputParameters().getLabel(),
-								node.getInputs().get(1).getOutputParameters().getLabel(),
-								node.getInputs().get(2).getOutputParameters().getLabel(),
-								node.getInputs().get(3).getOutputParameters().getLabel(),
-								node.getInputs().get(4).getOutputParameters().getLabel(),
-								node.getInputs().get(5).getOutputParameters().getLabel(),
-								node.getInputs().get(6).getOutputParameters().getLabel(),
-								node.getOutputParameters().getLabel());
-					}
-
-					else {
-						throw new LopsException(node.printErrorLocation() + "Node with " + node.getInputs().size() + " inputs is not supported in CP yet! \n");
-					}
-				}
-				
-				try {
-					if( LOG.isTraceEnabled() )
-						LOG.trace("Generating instruction - "+ inst_string);
-					Instruction currInstr = InstructionParser.parseSingleInstruction(inst_string);
-					if (node._beginLine != 0)
-						currInstr.setLocation(node);
-					else if ( !node.getOutputs().isEmpty() )
-						currInstr.setLocation(node.getOutputs().get(0));
-					else if ( !node.getInputs().isEmpty() )
-						currInstr.setLocation(node.getInputs().get(0));
-						
-					inst.add(currInstr);
-				} catch (Exception e) {
-					throw new LopsException(node.printErrorLocation() + "Problem generating simple inst - "
-							+ inst_string, e);
-				}
-
-				markedNodes.add(node);
-				doRmVar = true;
-				//continue;
-			}
-			else if (node.getExecLocation() == ExecLocation.Data ) {
-				Data dnode = (Data)node;
-				Data.OperationTypes op = dnode.getOperationType();
-				
-				if ( op == Data.OperationTypes.WRITE ) {
-					NodeOutput out = null;
-					if ( sendWriteLopToMR(node) ) {
-						// In this case, Data WRITE lop goes into MR, and 
-						// we don't have to do anything here
-						doRmVar = false;
-					}
-					else {
-						out = setupNodeOutputs(node, ExecType.CP, false, false);
-						if ( dnode.getDataType() == DataType.SCALAR ) {
-							// processing is same for both transient and persistent scalar writes 
-							writeInst.addAll(out.getLastInstructions());
-							//inst.addAll(out.getLastInstructions());
-							doRmVar = false;
-						}
-						else {
-							// setupNodeOutputs() handles both transient and persistent matrix writes 
-							if ( dnode.isTransient() ) {
-								//inst.addAll(out.getPreInstructions()); // dummy ?
-								deleteInst.addAll(out.getLastInstructions());
-								doRmVar = false;
-							}
-							else {
-								// In case of persistent write lop, write instruction will be generated 
-								// and that instruction must be added to <code>inst</code> so that it gets
-								// executed immediately. If it is added to <code>deleteInst</code> then it
-								// gets executed at the end of program block's execution
-								inst.addAll(out.getLastInstructions());
-								doRmVar = true;
-							}
-						}
-						markedNodes.add(node);
-						//continue;
-					}
-				}
-				else {
-					// generate a temp label to hold the value that is read from HDFS
-					if ( node.getDataType() == DataType.SCALAR ) {
-						node.getOutputParameters().setLabel(Lop.SCALAR_VAR_NAME_PREFIX + var_index.getNextID());
-						String io_inst = node.getInstructions(node.getOutputParameters().getLabel(), 
-								node.getOutputParameters().getFile_name());
-						CPInstruction currInstr = CPInstructionParser.parseSingleInstruction(io_inst);
-						currInstr.setLocation(node);
-						
-						inst.add(currInstr);
-						
-						Instruction tempInstr = VariableCPInstruction.prepareRemoveInstruction(node.getOutputParameters().getLabel());
-						tempInstr.setLocation(node);
-						deleteInst.add(tempInstr);
-					}
-					else {
-						throw new LopsException("Matrix READs are not handled in CP yet!");
-					}
-					markedNodes.add(node);
-					doRmVar = true;
-					//continue;
-				}
-			}
-			
-			// see if rmvar instructions can be generated for node's inputs
-			if(doRmVar)
-				processConsumersForInputs(node, inst, deleteInst);
-			doRmVar = false;
-		}
-		
-		for ( String var : var_deletions ) {
-			Instruction rmInst = VariableCPInstruction.prepareRemoveInstruction(var);
-			if( LOG.isTraceEnabled() )
-				LOG.trace("  Adding var_deletions: " + rmInst.toString());
-			
-			rmInst.setLocation(var_deletionsLineNum.get(var));
-			
-			deleteInst.add(rmInst);
-		}
-
-		// delete all marked nodes
-		for (int i = 0; i < markedNodes.size(); i++) {
-			execNodes.remove(markedNodes.get(i));
-		}
-
-	}
-
-	/**
-	 * Method to remove all child nodes of a queued node that should be executed
-	 * in a following iteration.
-	 * 
-	 * @param node
-	 * @param finishedNodes
-	 * @param execNodes
-	 * @param queuedNodes
-	 * @throws LopsException
-	 */
-
-	private void removeNodesForNextIteration(N node, ArrayList<N> finishedNodes,
-			ArrayList<N> execNodes, ArrayList<N> queuedNodes,
-			ArrayList<ArrayList<N>> jobvec) throws LopsException {
-		
-		// only queued nodes with multiple inputs need to be handled.
-		if (node.getInputs().size() == 1)
-			return;
-		
-		//if all children are queued, then there is nothing to do.
-		int numInputs = node.getInputs().size();
-		boolean allQueued = true;
-		for(int i=0; i < numInputs; i++) {
-			if( !queuedNodes.contains(node.getInputs().get(i)) ) {
-				allQueued = false;
-				break;
-			}
-		}
-		if ( allQueued )
-			return; 
-		
-		if( LOG.isTraceEnabled() )
-			LOG.trace("  Before remove nodes for next iteration -- size of execNodes " + execNodes.size());
-
-		// Determine if <code>node</code> has inputs from the same job or multiple jobs
-	    int jobid = Integer.MIN_VALUE;
-		boolean inputs_in_same_job = true;
-		for(int idx=0; idx < node.getInputs().size(); idx++) {
-			int input_jobid = jobType(node.getInputs().get(idx), jobvec);
-			if ( jobid == Integer.MIN_VALUE )
-				jobid = input_jobid;
-			else if ( jobid != input_jobid ) { 
-				inputs_in_same_job = false;
-				break;
-			}
-		}
-
-		// Determine if there exist any unassigned inputs to <code>node</code>
-		// Evaluate only those lops that execute in MR.
-		boolean unassigned_inputs = false;
-		for(int i=0; i < numInputs; i++) {
-			Lop input = node.getInputs().get(i);
-			//if ( input.getExecLocation() != ExecLocation.ControlProgram && jobType(input, jobvec) == -1 ) {
-			if ( input.getExecType() == ExecType.MR && !execNodes.contains(input)) { //jobType(input, jobvec) == -1 ) {
-				unassigned_inputs = true;
-				break;
-			}
-		}
-
-		// Determine if any node's children are queued
-		boolean child_queued = false;
-		for(int i=0; i < numInputs; i++) {
-			if (queuedNodes.contains(node.getInputs().get(i)) ) {
-				child_queued = true;
-				break;
-			}
-		}
-		if (LOG.isTraceEnabled()) {
-			LOG.trace("  Property Flags:");
-			LOG.trace("    Inputs in same job: " + inputs_in_same_job);
-			LOG.trace("    Unassigned inputs: " + unassigned_inputs);
-			LOG.trace("    Child queued: " + child_queued);
-		}
-
-		// Evaluate each lop in <code>execNodes</code> for removal.
-		// Add lops to be removed to <code>markedNodes</code>.
-		
-		ArrayList<N> markedNodes = new ArrayList<N>();
-		for (int i = 0; i < execNodes.size(); i++) {
-
-			N tmpNode = execNodes.get(i);
-
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("  Checking for removal (" + tmpNode.getID() + ") " + tmpNode.toString());
-			}
-			
-			// if tmpNode is not a descendant of 'node', then there is no advantage in removing tmpNode for later iterations.
-			if(!isChild(tmpNode, node, IDMap))
-				continue;
-			
-			// handle group input lops
-			if(node.getInputs().contains(tmpNode) && tmpNode.isAligner()) {
-			    markedNodes.add(tmpNode);
-			    if( LOG.isTraceEnabled() )
-			    	LOG.trace("    Removing for next iteration (code 1): (" + tmpNode.getID() + ") " + tmpNode.toString());
-			}
-			
-			//if (child_queued) {
-				// if one of the children are queued, 
-				// remove some child nodes on other leg that may be needed later on. 
-				// For e.g. Group lop. 
-				
-				if (!hasOtherQueuedParentNode(tmpNode, queuedNodes, node) 
-					&& branchHasNoOtherUnExecutedParents(tmpNode, node, execNodes, finishedNodes)) {
-					
-					boolean queueit = false;
-					int code = -1;
-					switch(node.getExecLocation()) {
-					case Map:
-						if(branchCanBePiggyBackedMap(tmpNode, node, execNodes, queuedNodes, markedNodes))
-							queueit = true;
-						code=2;
-						break;
-						
-					case MapAndReduce:
-						if(branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, queuedNodes)&& !tmpNode.definesMRJob()) 
-							queueit = true;
-						code=3;
-						break;
-					case Reduce:
-						if(branchCanBePiggyBackedReduce(tmpNode, node, execNodes, queuedNodes))
-							queueit = true;
-						code=4;
-						break;
-					default:
-						//do nothing
-					}
-					
-					if(queueit) {
-						if( LOG.isTraceEnabled() )
-							LOG.trace("    Removing for next iteration (code " + code + "): (" + tmpNode.getID() + ") " + tmpNode.toString());
-			        		
-						markedNodes.add(tmpNode);
-					}
-				}
-				/*
-				 * "node" has no other queued children.
-				 * 
-				 * If inputs are in the same job and "node" is of type
-				 * MapAndReduce, then remove nodes of all types other than
-				 * Reduce, MapAndReduce, and the ones that define a MR job as
-				 * they can be piggybacked later.
-				 * 
-				 * e.g: A=Rand, B=Rand, C=A%*%B Here, both inputs of MMCJ lop
-				 * come from Rand job, and they should not be removed.
-				 * 
-				 * Other examples: -- MMCJ whose children are of type
-				 * MapAndReduce (say GMR) -- Inputs coming from two different
-				 * jobs .. GMR & REBLOCK
-				 */
-				//boolean himr = hasOtherMapAndReduceParentNode(tmpNode, execNodes,node);
-				//boolean bcbp = branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes);
-				//System.out.println("      .. " + inputs_in_same_job + "," + himr + "," + bcbp);
-				if ((inputs_in_same_job || unassigned_inputs)
-						&& node.getExecLocation() == ExecLocation.MapAndReduce
-						&& !hasOtherMapAndReduceParentNode(tmpNode, execNodes,node)  // don't remove since it already piggybacked with a MapReduce node
-						&& branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, queuedNodes)
-						&& !tmpNode.definesMRJob()) {
-					if( LOG.isTraceEnabled() )
-						LOG.trace("    Removing for next iteration (code 5): ("+ tmpNode.getID() + ") " + tmpNode.toString());
-
-					markedNodes.add(tmpNode);
-				}
-		} // for i
-
-		// we also need to delete all parent nodes of marked nodes
-		for (int i = 0; i < execNodes.size(); i++) {
-			LOG.trace("  Checking for removal - ("
-						+ execNodes.get(i).getID() + ") "
-						+ execNodes.get(i).toString());
-
-			if (hasChildNode(execNodes.get(i), markedNodes) && !markedNodes.contains(execNodes.get(i))) {
-				markedNodes.add(execNodes.get(i));
-				LOG.trace("    Removing for next iteration (code 6) (" + execNodes.get(i).getID() + ") " + execNodes.get(i).toString());
-			}
-		}
-
-		if ( execNodes.size() != markedNodes.size() ) {
-			// delete marked nodes from finishedNodes and execNodes
-			// add to queued nodes
-			for(N n : markedNodes) {
-				if ( n.usesDistributedCache() )
-					gmrMapperFootprint -= computeFootprintInMapper(n);
-				finishedNodes.remove(n);
-				execNodes.remove(n);
-				removeNodeByJobType(n, jobvec);
-				queuedNodes.add(n);
-			}
-		}
-		/*for (int i = 0; i < markedNodes.size(); i++) {
-			finishedNodes.remove(markedNodes.elementAt(i));
-			execNodes.remove(markedNodes.elementAt(i));
-			removeNodeByJobType(markedNodes.elementAt(i), jobvec);
-			queuedNodes.add(markedNodes.elementAt(i));
-		}*/
-	}
-
-	@SuppressWarnings("unused")
-	private void removeNodesForNextIterationOLD(N node, ArrayList<N> finishedNodes,
-			ArrayList<N> execNodes, ArrayList<N> queuedNodes,
-			ArrayList<ArrayList<N>> jobvec) throws LopsException {
-		// only queued nodes with two inputs need to be handled.
-
-		// TODO: statiko -- this should be made == 1
-		if (node.getInputs().size() != 2)
-			return;
-		
-		
-		//if both children are queued, nothing to do. 
-	    if (queuedNodes.contains(node.getInputs().get(0))
-	        && queuedNodes.contains(node.getInputs().get(1)))
-	      return;
-	    
-	    if( LOG.isTraceEnabled() )
-	    	LOG.trace("Before remove nodes for next iteration -- size of execNodes " + execNodes.size());
- 
-	    
-
-		boolean inputs_in_same_job = false;
-		// TODO: handle tertiary
-		if (jobType(node.getInputs().get(0), jobvec) != -1
-				&& jobType(node.getInputs().get(0), jobvec) == jobType(node
-						.getInputs().get(1), jobvec)) {
-			inputs_in_same_job = true;
-		}
-
-		boolean unassigned_inputs = false;
-		// here.. scalars shd be ignored
-		if ((node.getInputs().get(0).getExecLocation() != ExecLocation.ControlProgram && jobType(
-				node.getInputs().get(0), jobvec) == -1)
-				|| (node.getInputs().get(1).getExecLocation() != ExecLocation.ControlProgram && jobType(
-						node.getInputs().get(1), jobvec) == -1)) {
-			unassigned_inputs = true;
-		}
-
-		boolean child_queued = false;
-
-		// check if atleast one child was queued.
-		if (queuedNodes.contains(node.getInputs().get(0))
-				|| queuedNodes.contains(node.getInputs().get(1)))
-			child_queued = true;
-
-		// nodes to be dropped
-		ArrayList<N> markedNodes = new ArrayList<N>();
-
-		for (int i = 0; i < execNodes.size(); i++) {
-
-			N tmpNode = execNodes.get(i);
-
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("Checking for removal (" + tmpNode.getID()
-						+ ") " + tmpNode.toString());
-				LOG.trace("Inputs in same job " + inputs_in_same_job);
-				LOG.trace("Unassigned inputs " + unassigned_inputs);
-				LOG.trace("Child queued " + child_queued);
-
-			}
-			
-			//if(node.definesMRJob() && isChild(tmpNode,node) && (tmpNode.getCompatibleJobs() & node.getCompatibleJobs()) == 0)
-			//  continue;
-
-			// TODO: statiko -- check if this is too conservative?
-			if (child_queued) {
-  	     // if one of the children are queued, 
-	       // remove some child nodes on other leg that may be needed later on. 
-	       // For e.g. Group lop. 
- 
-			  if((tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) && 
-			      tmpNode.isAligner())
-			  {
-			    markedNodes.add(tmpNode);
-			    if( LOG.isTraceEnabled() )
-			    	LOG.trace("Removing for next iteration: ("
-			    			+ tmpNode.getID() + ") " + tmpNode.toString());
-			  }
-			  else {
-				if (!hasOtherQueuedParentNode(tmpNode, queuedNodes, node) 
-						&& isChild(tmpNode, node, IDMap)  && branchHasNoOtherUnExecutedParents(tmpNode, node, execNodes, finishedNodes)) {
-
-	
-				    if( 
-				        //e.g. MMCJ
-				        (node.getExecLocation() == ExecLocation.MapAndReduce &&
-						branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes) && !tmpNode.definesMRJob() )
-						||
-						//e.g. Binary
-						(node.getExecLocation() == ExecLocation.Reduce && branchCanBePiggyBackedReduce(tmpNode, node, execNodes, finishedNodes))  )
-					{
-					
-				    	if( LOG.isTraceEnabled() )
-							LOG.trace("Removing for next iteration: ("
-								    + tmpNode.getID() + ") " + tmpNode.toString());
-					
-					
-	        			markedNodes.add(tmpNode);	
-					}
-				 }
-			  }
-			} else {
-				/*
-				 * "node" has no other queued children.
-				 * 
-				 * If inputs are in the same job and "node" is of type
-				 * MapAndReduce, then remove nodes of all types other than
-				 * Reduce, MapAndReduce, and the ones that define a MR job as
-				 * they can be piggybacked later.
-				 * 
-				 * e.g: A=Rand, B=Rand, C=A%*%B Here, both inputs of MMCJ lop
-				 * come from Rand job, and they should not be removed.
-				 * 
-				 * Other examples: -- MMCJ whose children are of type
-				 * MapAndReduce (say GMR) -- Inputs coming from two different
-				 * jobs .. GMR & REBLOCK
-				 */
-				if ((inputs_in_same_job || unassigned_inputs)
-						&& node.getExecLocation() == ExecLocation.MapAndReduce
-						&& !hasOtherMapAndReduceParentNode(tmpNode, execNodes,
-								node)
-						&& isChild(tmpNode, node, IDMap) &&
-						branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes)
-						&& !tmpNode.definesMRJob()) {
-					if( LOG.isTraceEnabled() )
-						LOG.trace("Removing for next iteration:: ("
-								+ tmpNode.getID() + ") " + tmpNode.toString());
-
-					markedNodes.add(tmpNode);
-				}
-
-				// as this node has inputs coming from different jobs, need to
-				// free up everything
-				// below and include the closest MapAndReduce lop if this is of
-				// type Reduce.
-				// if it is of type MapAndReduce, don't need to free any nodes
-
-				if (!inputs_in_same_job && !unassigned_inputs
-						&& isChild(tmpNode, node, IDMap) && 
-						(tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) && 
-		            tmpNode.isAligner()) 
-				{
-					if( LOG.isTraceEnabled() )
-						LOG.trace("Removing for next iteration ("
-										+ tmpNode.getID()
-										+ ") "
-										+ tmpNode.toString());
-
-					markedNodes.add(tmpNode);
-
-				}
-
-			}
-		}
-
-		// we also need to delete all parent nodes of marked nodes
-		for (int i = 0; i < execNodes.size(); i++) {
-			if( LOG.isTraceEnabled() )
-				LOG.trace("Checking for removal - ("
-						+ execNodes.get(i).getID() + ") "
-						+ execNodes.get(i).toString());
-
-			if (hasChildNode(execNodes.get(i), markedNodes)) {
-				markedNodes.add(execNodes.get(i));
-				if( LOG.isTraceEnabled() )
-					LOG.trace("Removing for next iteration - ("
-							+ execNodes.get(i).getID() + ") "
-							+ execNodes.get(i).toString());
-			}
-		}
-
-		// delete marked nodes from finishedNodes and execNodes
-		// add to queued nodes
-		for (int i = 0; i < markedNodes.size(); i++) {
-			finishedNodes.remove(markedNodes.get(i));
-			execNodes.remove(markedNodes.get(i));
-			removeNodeByJobType(markedNodes.get(i), jobvec);
-			queuedNodes.add(markedNodes.get(i));
-		}
-	}
-
-	private boolean branchCanBePiggyBackedReduce(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
-		if(node.getExecLocation() != ExecLocation.Reduce)
-			return false;
-	    
-		// if tmpNode is descendant of any queued child of node, then branch can not be piggybacked
-		for(Lop ni : node.getInputs()) {
-			if(queuedNodes.contains(ni) && isChild(tmpNode, ni, IDMap))
-				return false;
-		}
-		
-		for(int i=0; i < execNodes.size(); i++) {
-		   N n = execNodes.get(i);
-       
-		   if(n.equals(node))
-			   continue;
-       
-		   if(n.equals(tmpNode) && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
-			   return false;
-       
-		   // check if n is on the branch tmpNode->*->node
-		   if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap)) {
-			   if(!node.getInputs().contains(tmpNode) // redundant
-				   && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
-				   return false;
-		   }
-		   /*if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap) 
-				   && !node.getInputs().contains(tmpNode) 
-				   && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
-			   return false;*/
-	   }
-	   return true;
-	}
-
-	@SuppressWarnings("unchecked")
-	private boolean branchCanBePiggyBackedMap(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes, ArrayList<N> markedNodes) {
-		if(node.getExecLocation() != ExecLocation.Map)
-			return false;
-		
-		// if tmpNode is descendant of any queued child of node, then branch can not be piggybacked
-		for(Lop ni : node.getInputs()) {
-			if(queuedNodes != null && queuedNodes.contains(ni) && isChild(tmpNode, ni, IDMap))
-				return false;
-		}
-		
-		// since node.location=Map: only Map & MapOrReduce lops must be considered
-		if( tmpNode.definesMRJob() || (tmpNode.getExecLocation() != ExecLocation.Map && tmpNode.getExecLocation() != ExecLocation.MapOrReduce))
-			return false;
-
-		// if there exist a node "dcInput" that is 
-		//   -- a) parent of tmpNode, and b) feeds into "node" via distributed cache
-		//   then, tmpNode should not be removed.
-		// "dcInput" must be executed prior to "node", and removal of tmpNode does not make that happen.
-		if(node.usesDistributedCache() ) {
-			for(int dcInputIndex : node.distributedCacheInputIndex()) { 
-				N dcInput = (N) node.getInputs().get(dcInputIndex-1);
-				if(isChild(tmpNode, dcInput, IDMap))
-					return false;
-			}
-		}
-		
-		// if tmpNode requires an input from distributed cache,
-		//   remove tmpNode only if that input can fit into mappers' memory. If not, 
-		if ( tmpNode.usesDistributedCache() ) {
-			double memsize = computeFootprintInMapper(tmpNode);
-			if (node.usesDistributedCache() )
-				memsize += computeFootprintInMapper(node);
-			if ( markedNodes != null ) {
-				for(N n : markedNodes) {
-					if ( n.usesDistributedCache() ) 
-						memsize += computeFootprintInMapper(n);
-				}
-			}
-			if ( !checkMemoryLimits(node, memsize ) ) {
-				return false;
-			}
-		}
-		
-		if( (tmpNode.getCompatibleJobs() & node.getCompatibleJobs()) > 0)
-			return true;
-		else
-			return false;
-			
-	}
-	  
-	/**
-	 * Function that checks if <code>tmpNode</code> can be piggybacked with MapAndReduce 
-	 * lop <code>node</code>. 
-	 * 
-	 * Decision depends on the exec location of <code>tmpNode</code>. If the exec location is: 
-	 * MapAndReduce: CAN NOT be piggybacked since it defines its own MR job
-	 * Reduce: CAN NOT be piggybacked since it must execute before <code>node</code>
-	 * Map or MapOrReduce: CAN be piggybacked ONLY IF it is comatible w/ <code>tmpNode</code> 
-	 * 
-	 * @param tmpNode
-	 * @param node
-	 * @param execNodes
-	 * @param finishedNodes
-	 * @return
-	 */
-	private boolean branchCanBePiggyBackedMapAndReduce(N tmpNode, N node,
-			ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
-
-		if (node.getExecLocation() != ExecLocation.MapAndReduce)
-			return false;
-		JobType jt = JobType.findJobTypeFromLop(node);
-
-		for (int i = 0; i < execNodes.size(); i++) {
-			N n = execNodes.get(i);
-
-			if (n.equals(node))
-				continue;
-
-			// Evaluate only nodes on the branch between tmpNode->..->node
-			if (n.equals(tmpNode) || (isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap))) {
-				if ( hasOtherMapAndReduceParentNode(tmpNode, queuedNodes,node) )
-					return false;
-				ExecLocation el = n.getExecLocation();
-				if (el != ExecLocation.Map && el != ExecLocation.MapOrReduce)
-					return false;
-				else if (!isCompatible(n, jt))
-					return false;
-			}
-
-			/*
-			 * if(n.equals(tmpNode) && n.getExecLocation() != ExecLocation.Map
-			 * && n.getExecLocation() != ExecLocation.MapOrReduce) return false;
-			 * 
-			 * if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap) &&
-			 * n.getExecLocation() != ExecLocation.Map && n.getExecLocation() !=
-			 * ExecLocation.MapOrReduce) return false;
-			 */
-
-		}
-		return true;
-	}
-
-  private boolean branchHasNoOtherUnExecutedParents(N tmpNode, N node,
-      ArrayList<N> execNodes, ArrayList<N> finishedNodes) {
-
-	  //if tmpNode has more than one unfinished output, return false 
-	  if(tmpNode.getOutputs().size() > 1)
-	  {
-	    int cnt = 0;
-	    for (int j = 0; j < tmpNode.getOutputs().size(); j++) {
-        if (!finishedNodes.contains(tmpNode.getOutputs().get(j)))
-          cnt++;
-      } 
-	    
-	    if(cnt != 1)
-	      return false;
-	  }
-	  
-	  //check to see if any node between node and tmpNode has more than one unfinished output 
-	  for(int i=0; i < execNodes.size(); i++)
-	  {
-	    N n = execNodes.get(i);
-	    
-	    if(n.equals(node) || n.equals(tmpNode))
-	      continue;
-	    
-	    if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap))
-	    {      
-	      int cnt = 0;
-	      for (int j = 0; j < n.getOutputs().size(); j++) {
-	        if (!finishedNodes.contains(n.getOutputs().get(j)))    
-	          cnt++;
-	      } 
-	      
-	      if(cnt != 1)
-	        return false;
-	    }
-	  }
-	  
-	  return true;
-  }
-
-  /**
-	 * Method to check of there is a node of type MapAndReduce between a child
-	 * (tmpNode) and its parent (node)
-	 * 
-	 * @param tmpNode
-	 * @param execNodes
-	 * @param node
-	 * @return
-	 */
-
-	@SuppressWarnings({ "unchecked", "unused" })
-	private boolean hasMapAndReduceParentNode(N tmpNode, ArrayList<N> execNodes, N node) 
-	{
-		for (int i = 0; i < tmpNode.getOutputs().size(); i++) {
-			N n = (N) tmpNode.getOutputs().get(i);
-
-			if (execNodes.contains(n)
-					&& n.getExecLocation() == ExecLocation.MapAndReduce
-					&& isChild(n, node, IDMap)) {
-				return true;
-			} else {
-				if (hasMapAndReduceParentNode(n, execNodes, node))
-					return true;
-			}
-
-		}
-
-		return false;
-	}
-
-	/**
-	 * Method to return the job index for a lop.
-	 * 
-	 * @param lops
-	 * @param jobvec
-	 * @return
-	 * @throws LopsException
-	 */
-
-	private int jobType(Lop lops, ArrayList<ArrayList<N>> jobvec) throws LopsException {
-		for ( JobType jt : JobType.values()) {
-			int i = jt.getId();
-			if (i > 0 && jobvec.get(i) != null && jobvec.get(i).contains(lops)) {
-				return i;
-			}
-		}
-		return -1;
-	}
-
-	/**
-	 * Method to see if there is a node of type MapAndReduce between tmpNode and node
-	 * in given node collection
-	 * 
-	 * @param tmpNode
-	 * @param nodeList
-	 * @param node
-	 * @return
-	 */
-
-	@SuppressWarnings("unchecked")
-	private boolean hasOtherMapAndReduceParentNode(N tmpNode,
-			ArrayList<N> nodeList, N node) {
-		
-		if ( tmpNode.getExecLocation() == ExecLocation.MapAndReduce)
-			return true;
-		
-		for (int i = 0; i < tmpNode.getOutputs().size(); i++) {
-			N n = (N) tmpNode.getOutputs().get(i);
-
-			if ( nodeList.contains(n) && isChild(n,node,IDMap)) {
-				if(!n.equals(node) && n.getExecLocation() == ExecLocation.MapAndReduce)
-					return true;
-				else
-					return hasOtherMapAndReduceParentNode(n, nodeList, node);
-			}
-		}
-
-		return false;
-	}
-
-	/**
-	 * Method to check if there is a queued node that is a parent of both tmpNode and node
-	 * 
-	 * @param tmpNode
-	 * @param queuedNodes
-	 * @param node
-	 * @return
-	 */
-	private boolean hasOtherQueuedParentNode(N tmpNode, ArrayList<N> queuedNodes, N node) {
-		if ( queuedNodes.isEmpty() )
-			return false;
-		
-		boolean[] nodeMarked = node.get_reachable();
-		boolean[] tmpMarked  = tmpNode.get_reachable();
-		long nodeid = IDMap.get(node.getID());
-		long tmpid = IDMap.get(tmpNode.getID());
-		
-		for ( int i=0; i < queuedNodes.size(); i++ ) {
-			int id = IDMap.get(queuedNodes.get(i).getID());
-			if ((id != nodeid && nodeMarked[id]) && (id != tmpid && tmpMarked[id]) )
-			//if (nodeMarked[id] && tmpMarked[id])
-				return true;
-		}
-		
-		return false;
-	}
-
-	/**
-	 * Method to print the lops grouped by job type
-	 * 
-	 * @param jobNodes
-	 * @throws DMLRuntimeException
-	 */
-	void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
-			throws DMLRuntimeException {
-		if (LOG.isTraceEnabled()){
-			for ( JobType jt : JobType.values() ) {
-				int i = jt.getId();
-				if (i > 0 && jobNodes.get(i) != null && !jobNodes.get(i).isEmpty() ) {
-					LOG.trace(jt.getName() + " Job Nodes:");
-					
-					for (int j = 0; j < jobNodes.get(i).size(); j++) {
-						LOG.trace("    "
-								+ jobNodes.get(i).get(j).getID() + ") "
-								+ jobNodes.get(i).get(j).toString());
-					}
-				}
-			}
-			
-		}
-
-		
-	}
-
-	/**
-	 * Method to check if there exists any lops with ExecLocation=RecordReader
-	 * 
-	 * @param nodes
-	 * @param loc
-	 * @return
-	 */
-	boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
-		for (int i = 0; i < nodes.size(); i++) {
-			if (nodes.get(i).getExecLocation() == ExecLocation.RecordReader)
-				return true;
-		}
-		return false;
-	}
-
-	ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes) {
-
-		// obtain the list of record reader nodes
-		ArrayList<N> rrnodes = new ArrayList<N>();
-		for (int i = 0; i < gmrnodes.size(); i++) {
-			if (gmrnodes.get(i).getExecLocation() == ExecLocation.RecordReader)
-				rrnodes.add(gmrnodes.get(i));
-		}
-
-		/*
-		 * We allocate one extra vector to hold lops that do not depend on any
-		 * recordreader lops
-		 */
-		ArrayList<ArrayList<N>> splitGMR = createNodeVectors(rrnodes.size() + 1);
-
-		// flags to indicate whether a lop has been added to one of the node
-		// vectors
-		boolean[] flags = new boolean[gmrnodes.size()];
-		for (int i = 0; i < gmrnodes.size(); i++) {
-			flags[i] = false;
-		}
-
-		// first, obtain all ancestors of recordreader lops
-		for (int rrid = 0; rrid < rrnodes.size(); rrid++) {
-			// prepare node list for i^th record reader lop
-
-			// add record reader lop
-			splitGMR.get(rrid).add(rrnodes.get(rrid));
-
-			for (int j = 0; j < gmrnodes.size(); j++) {
-				if (rrnodes.get(rrid).equals(gmrnodes.get(j)))
-					flags[j] = true;
-				else if (isChild(rrnodes.get(rrid), gmrnodes.get(j), IDMap)) {
-					splitGMR.get(rrid).add(gmrnodes.get(j));
-					flags[j] = true;
-				}
-			}
-		}
-
-		// add all remaining lops to a separate job
-		int jobindex = rrnodes.size(); // the last node vector
-		for (int i = 0; i < gmrnodes.size(); i++) {
-			if (!flags[i]) {
-				splitGMR.get(jobindex).add(gmrnodes.get(i));
-				flags[i] = true;
-			}
-		}
-
-		return splitGMR;
-	}
-
-	/**
-	 * Method to generate hadoop jobs. Exec nodes can contains a mixture of node
-	 * types requiring different mr jobs. This method breaks the job into
-	 * sub-types and then invokes the appropriate method to generate
-	 * instructions.
-	 * 
-	 * @param execNodes
-	 * @param inst
-	 * @param deleteinst
-	 * @param 

<TRUNCATED>