You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/04 01:01:03 UTC

[1/4] incubator-systemml git commit: Additional improvements naive bayes script (w/o append, cleanups)

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 8e7b6ed3d -> ff2aea542


Additional improvements naive bayes script (w/o append, cleanups)

Results performance testsuite old/new naive bayes scripts (including
invocation overhead):
a) Hybrid Spark (20GB driver)
10k x 1k, dense: 2s -> 2s 
10k x 1k, sparse: 1s -> 1s 
100k x 1k, dense: 4s -> 4s
100k x 1k, sparse: 2s -> 2s
1M x 1k, dense: 42s -> 17s
1M x 1k, sparse: 4s -> 4s
10M x 1k, dense: 81s -> 78s
10M x 1k, sparse: 40s -> 27s
 
b) Hybrid MapReduce (2GB client)
10k x 1k, dense: 3s -> 3s
10k x 1k, sparse: 1s -> 1s
100k x 1k, dense: 23s -> 4s
100k x 1k, sparse: 3s -> 2s
1M x 1k, dense: 84s -> 81s
1M x 1k, sparse: 9s -> 6s
10M x 1k, dense: 139s -> 133s
10M x 1k, sparse: 80s -> 71s

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f514f821
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f514f821
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f514f821

Branch: refs/heads/master
Commit: f514f821ceddc5c6c41fa9a67abf60f16314afef
Parents: 8e7b6ed
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 2 14:16:42 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 2 14:16:42 2016 -0800

----------------------------------------------------------------------
 scripts/algorithms/naive-bayes.dml              | 59 ++++++++------------
 .../applications/naive-bayes/naive-bayes.dml    | 25 +++------
 .../applications/naive-bayes/naive-bayes.pydml  | 31 +++++-----
 3 files changed, 46 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f514f821/scripts/algorithms/naive-bayes.dml
----------------------------------------------------------------------
diff --git a/scripts/algorithms/naive-bayes.dml b/scripts/algorithms/naive-bayes.dml
index f2ad2ed..8284453 100644
--- a/scripts/algorithms/naive-bayes.dml
+++ b/scripts/algorithms/naive-bayes.dml
@@ -26,70 +26,59 @@
 #
 
 # defaults
-cmdLine_laplace = ifdef($laplace, 1)
-cmdLine_fmt = ifdef($fmt, "text")
+laplaceCorrection = ifdef($laplace, 1)
+fmt = ifdef($fmt, "text")
 
 # reading input args
 D = read($X)
-min_feature_val = min(D)
-if(min_feature_val < 0)
-	stop("Stopping due to invalid argument: Multinomial naive Bayes is meant for count-based feature values, minimum value in X is negative")
+C = read($Y)
 numRows = nrow(D)
+numFeatures = ncol(D)
+minFeatureVal = min(D)
+numClasses = max(C)
+minLabelVal = min(C)
+
+# sanity checks of data and arguments
+if(minFeatureVal < 0)
+	stop("Stopping due to invalid argument: Multinomial naive Bayes is meant for count-based feature values, minimum value in X is negative")
 if(numRows < 2)
 	stop("Stopping due to invalid inputs: Not possible to learn a classifier without at least 2 rows")
-
-C = read($Y)
-if(min(C) < 1)
+if(minLabelVal < 1)
 	stop("Stopping due to invalid argument: Label vector (Y) must be recoded")
-numClasses = max(C)
 if(numClasses == 1)
 	stop("Stopping due to invalid argument: Maximum label value is 1, need more than one class to learn a multi-class classifier")	
-mod1 = C %% 1
-mod1_should_be_nrow = sum(abs(ppred(mod1, 0, "==")))
-if(mod1_should_be_nrow != numRows)
+if(sum(abs(C%%1 == 0)) != numRows)
 	stop("Stopping due to invalid argument: Please ensure that Y contains (positive) integral labels")
-	
-laplace_correction = cmdLine_laplace
-if(laplace_correction < 0)
+if(laplaceCorrection < 0)
 	stop("Stopping due to invalid argument: Laplacian correction (laplace) must be non-negative")
 
-numFeatures = ncol(D)
-
 # Compute conditionals
-
 # Compute the feature counts for each class
 classFeatureCounts = aggregate(target=D, groups=C, fn="sum", ngroups=as.integer(numClasses))
 
 # Compute the total feature count for each class 
 # and add the number of features to this sum
 # for subsequent regularization (Laplace's rule)
-classSums = rowSums(classFeatureCounts) + numFeatures*laplace_correction
+classSums = rowSums(classFeatureCounts) + numFeatures*laplaceCorrection
 
 # Compute class conditional probabilities
-#ones = matrix(1, rows=1, cols=numFeatures)
-#repClassSums = classSums %*% ones
-#class_conditionals = (classFeatureCounts + laplace_correction) / repClassSums
-class_conditionals = (classFeatureCounts + laplace_correction) / classSums
+classConditionals = (classFeatureCounts + laplaceCorrection) / classSums
 
 # Compute class priors
-class_counts = aggregate(target=C, groups=C, fn="count", ngroups=as.integer(numClasses))
-class_prior = class_counts / numRows;
+classCounts = aggregate(target=C, groups=C, fn="count", ngroups=as.integer(numClasses))
+classPrior = classCounts / numRows;
 
 # Compute accuracy on training set
-ones = matrix(1, rows=numRows, cols=1)
-D_w_ones = cbind(D, ones)
-model = cbind(class_conditionals, class_prior)
-log_probs = D_w_ones %*% t(log(model))
-pred = rowIndexMax(log_probs)
-acc = sum(ppred(pred, C, "==")) / numRows * 100
+logProbs = D %*% t(log(classConditionals)) + t(log(classPrior));
+acc = sum(rowIndexMax(logProbs) == C) / numRows * 100
 
 acc_str = "Training Accuracy (%): " + acc
 print(acc_str)
 write(acc_str, $accuracy)
 
-extra_model_params = as.matrix(numFeatures)
-class_prior = t(append(t(class_prior), extra_model_params))
+extraModelParams = as.matrix(numFeatures)
+classPrior = rbind(classPrior, extraModelParams)
 
 # write out the model
-write(class_prior, $prior, format=cmdLine_fmt);
-write(class_conditionals, $conditionals, format=cmdLine_fmt);
+write(classPrior, $prior, format=fmt);
+write(classConditionals, $conditionals, format=fmt);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f514f821/src/test/scripts/applications/naive-bayes/naive-bayes.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/applications/naive-bayes/naive-bayes.dml b/src/test/scripts/applications/naive-bayes/naive-bayes.dml
index 9b1558c..1fe1bf4 100644
--- a/src/test/scripts/applications/naive-bayes/naive-bayes.dml
+++ b/src/test/scripts/applications/naive-bayes/naive-bayes.dml
@@ -33,42 +33,35 @@ fmt = ifdef($fmt, "text")
 numClasses = $classes
 D = read($X)
 C = read($Y)
-laplace_correction = ifdef($laplace, 1)
+laplaceCorrection = ifdef($laplace, 1)
 
 numRows = nrow(D)
 numFeatures = ncol(D)
 
 # Compute conditionals
-
 # Compute the feature counts for each class
 classFeatureCounts = aggregate(target=D, groups=C, fn="sum", ngroups=as.integer(numClasses));
 
 # Compute the total feature count for each class 
 # and add the number of features to this sum
 # for subsequent regularization (Laplace's rule)
-classSums = rowSums(classFeatureCounts) + numFeatures*laplace_correction
+classSums = rowSums(classFeatureCounts) + numFeatures*laplaceCorrection
 
 # Compute class conditional probabilities
-ones = matrix(1, rows=1, cols=numFeatures)
-repClassSums = classSums %*% ones
-class_conditionals = (classFeatureCounts + laplace_correction) / repClassSums
+classConditionals = (classFeatureCounts + laplaceCorrection) / classSums
 
 # Compute class priors
-class_counts = aggregate(target=C, groups=C, fn="count", ngroups=as.integer(numClasses))
-class_prior = class_counts / numRows;
+classCounts = aggregate(target=C, groups=C, fn="count", ngroups=as.integer(numClasses))
+classPrior = classCounts / numRows;
 
 # Compute accuracy on training set
-ones = matrix(1, rows=numRows, cols=1)
-D_w_ones = cbind(D, ones)
-model = cbind(class_conditionals, class_prior)
-log_probs = D_w_ones %*% t(log(model))
-pred = rowIndexMax(log_probs)
-acc = sum(ppred(pred, C, "==")) / numRows * 100
+logProbs = D %*% t(log(classConditionals)) + t(log(classPrior));
+acc = sum(rowIndexMax(logProbs) == C) / numRows * 100
 
 acc_str = "Training Accuracy (%): " + acc
 print(acc_str)
 write(acc_str, $accuracy)
 
 # write out the model
-write(class_prior, $prior, format=fmt);
-write(class_conditionals, $conditionals, format=fmt);
+write(classPrior, $prior, format=fmt);
+write(classConditionals, $conditionals, format=fmt);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f514f821/src/test/scripts/applications/naive-bayes/naive-bayes.pydml
----------------------------------------------------------------------
diff --git a/src/test/scripts/applications/naive-bayes/naive-bayes.pydml b/src/test/scripts/applications/naive-bayes/naive-bayes.pydml
index 073e1cb..25fbf84 100644
--- a/src/test/scripts/applications/naive-bayes/naive-bayes.pydml
+++ b/src/test/scripts/applications/naive-bayes/naive-bayes.pydml
@@ -33,44 +33,39 @@ fmt = ifdef($fmt, "text")
 numClasses = $classes
 D = load($X)
 C = load($Y)
-laplace_correction = ifdef($laplace, 1)
+laplaceCorrection = ifdef($laplace, 1)
 
 numRows = nrow(D)
 numFeatures = ncol(D)
 
 # Compute conditionals
-
 # Compute the feature counts for each class
 classFeatureCounts = aggregate(target=D, groups=C, fn="sum", ngroups=numClasses);
 
 # Compute the total feature count for each class 
 # and add the number of features to this sum
 # for subsequent regularization (Laplace's rule)
-classSums = rowSums(classFeatureCounts) + numFeatures*laplace_correction
+classSums = rowSums(classFeatureCounts) + numFeatures*laplaceCorrection
 
 # Compute class conditional probabilities
-ones = full(1, rows=1, cols=numFeatures)
-repClassSums = dot(classSums, ones)
-class_conditionals = (classFeatureCounts + laplace_correction) / repClassSums
+classConditionals = (classFeatureCounts + laplaceCorrection) / classSums
 
 # Compute class priors
-class_counts = aggregate(target=C, groups=C, fn="count", ngroups=numClasses)
-class_prior = class_counts / numRows
+classCounts = aggregate(target=C, groups=C, fn="count", ngroups=numClasses)
+classPrior = classCounts / numRows
 
 # Compute accuracy on training set
-ones = full(1, rows=numRows, cols=1)
-D_w_ones = cbind(D, ones)
-model = cbind(class_conditionals, class_prior)
-log_model = log(model)
-transpose_log_model = log_model.transpose()
-log_probs = dot(D_w_ones, transpose_log_model)
-pred = rowIndexMax(log_probs)
-acc = sum(ppred(pred, C, "==")) / numRows * 100
+lmodel1 = log(classConditionals)
+lmodel2 = log(classPrior)
+tlmodel1 = lmodel1.transpose()
+tlmodel2 = lmodel2.transpose()
+logProbs = dot(D, tlmodel1) + tlmodel2
+acc = sum(rowIndexMax(logProbs) == C) / numRows * 100
 
 acc_str = "Training Accuracy (%): " + acc
 print(acc_str)
 save(acc_str, $accuracy)
 
 # write out the model
-save(class_prior, $prior, format=fmt)
-save(class_conditionals, $conditionals, format=fmt)
+save(classPrior, $prior, format=fmt)
+save(classConditionals, $conditionals, format=fmt)


[4/4] incubator-systemml git commit: New spark map-grouped-aggregate (compiler/runtime), for naive-bayes

Posted by mb...@apache.org.
New spark map-grouped-aggregate (compiler/runtime), for naive-bayes

Incl (1) refactoring for code reuse across spark/mapreduce, and (2)
additional cleanup instruction parsing (sp instruction parser).  

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/ff2aea54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/ff2aea54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/ff2aea54

Branch: refs/heads/master
Commit: ff2aea54251948add5fd17c30a8c53536828d512
Parents: 3ea3cdb
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 2 18:22:44 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 2 18:22:44 2016 -0800

----------------------------------------------------------------------
 .../sysml/hops/ParameterizedBuiltinOp.java      |  22 +-
 .../apache/sysml/lops/GroupedAggregateM.java    |  35 +++-
 .../instructions/SPInstructionParser.java       |  39 ++--
 .../mr/GroupedAggregateMInstruction.java        |  33 +--
 .../spark/AppendGAlignedSPInstruction.java      |   3 +-
 .../spark/AppendGSPInstruction.java             |   3 +-
 .../spark/AppendMSPInstruction.java             |   3 +-
 .../spark/AppendRSPInstruction.java             |   3 +-
 .../spark/BuiltinBinarySPInstruction.java       |   3 +-
 .../spark/BuiltinUnarySPInstruction.java        |   3 +-
 .../spark/MatrixReshapeSPInstruction.java       |   3 +-
 .../ParameterizedBuiltinSPInstruction.java      | 203 +++++++++++++------
 .../spark/QuantilePickSPInstruction.java        |   3 +-
 .../spark/QuantileSortSPInstruction.java        |   3 +-
 .../instructions/spark/RandSPInstruction.java   |   3 +-
 .../instructions/spark/WriteSPInstruction.java  |   3 +-
 .../matrix/data/OperationsOnMatrixValues.java   |  41 ++++
 17 files changed, 267 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 466d008..3a8445f 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -375,7 +375,7 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
 		}
 		else //CP/Spark 
 		{
-			GroupedAggregate grp_agg = null;
+			Lop grp_agg = null;
 			
 			if( et == ExecType.CP) 
 			{
@@ -391,9 +391,23 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
 						OptimizerUtils.checkSparkBroadcastMemoryBudget( groups.getDim1(), groups.getDim2(), 
 								groups.getRowsInBlock(), groups.getColsInBlock(), groups.getNnz()) );
 				
-				grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et, broadcastGroups);						
-				grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, -1, -1, -1);
-				setRequiresReblock( true );
+				if( broadcastGroups //mapgroupedagg
+					&& getInput().get(_paramIndexMap.get(Statement.GAGG_FN)) instanceof LiteralOp
+					&& ((LiteralOp)getInput().get(_paramIndexMap.get(Statement.GAGG_FN))).getStringValue().equals("sum")
+					&& inputlops.get(Statement.GAGG_NUM_GROUPS) != null ) 
+				{
+					Hop target = getInput().get(_paramIndexMap.get(Statement.GAGG_TARGET));
+					
+					grp_agg = new GroupedAggregateM(inputlops, getDataType(), getValueType(), true, ExecType.SPARK);						
+					grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, target.getRowsInBlock(), target.getColsInBlock(), -1);
+					//no reblock required (directly output binary block)
+				}
+				else //groupedagg (w/ or w/o broadcast)
+				{
+					grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et, broadcastGroups);						
+					grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, -1, -1, -1);
+					setRequiresReblock( true );	
+				}
 			}
 			
 			setLineNumbers(grp_agg);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/lops/GroupedAggregateM.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/GroupedAggregateM.java b/src/main/java/org/apache/sysml/lops/GroupedAggregateM.java
index 5edba62..2d73abc 100644
--- a/src/main/java/org/apache/sysml/lops/GroupedAggregateM.java
+++ b/src/main/java/org/apache/sysml/lops/GroupedAggregateM.java
@@ -68,13 +68,25 @@ public class GroupedAggregateM extends Lop
 		addInput(inputParameterLops.get(Statement.GAGG_GROUPS));
 		inputParameterLops.get(Statement.GAGG_GROUPS).addOutput(this);
 		
-		//setup MR parameters
-		boolean breaksAlignment = true;
-		boolean aligner = false;
-		boolean definesMRJob = false;
-		lps.addCompatibility(JobType.GMR);
-		lps.addCompatibility(JobType.DATAGEN);
-		lps.setProperties( inputs, ExecType.MR, ExecLocation.Map, breaksAlignment, aligner, definesMRJob );
+		if( et == ExecType.MR )
+		{
+			//setup MR parameters
+			boolean breaksAlignment = true;
+			boolean aligner = false;
+			boolean definesMRJob = false;
+			lps.addCompatibility(JobType.GMR);
+			lps.addCompatibility(JobType.DATAGEN);
+			lps.setProperties( inputs, ExecType.MR, ExecLocation.Map, breaksAlignment, aligner, definesMRJob );
+		}
+		else //SPARK
+		{
+			//setup Spark parameters 
+			boolean breaksAlignment = false;
+			boolean aligner = false;
+			boolean definesMRJob = false;
+			lps.addCompatibility(JobType.INVALID);
+			lps.setProperties( inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob );
+		}
 	}
 
 	@Override
@@ -85,6 +97,15 @@ public class GroupedAggregateM extends Lop
 	@Override
 	public String getInstructions(int input1, int input2, int output) 
 	{
+		return getInstructions(
+			String.valueOf(input1),
+			String.valueOf(input2),
+			String.valueOf(output) );
+	}
+	
+	@Override
+	public String getInstructions(String input1, String input2, String output) 
+	{
 		StringBuilder sb = new StringBuilder();
 		
 		sb.append( getExecType() );

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index 1f5f961..2683c43 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -201,11 +201,12 @@ public class SPInstructionParser extends InstructionParser
 		String2SPInstructionType.put( "sel+", SPINSTRUCTION_TYPE.BuiltinUnary);
 		
 		// Parameterized Builtin Functions
-		String2SPInstructionType.put( "groupedagg"	, SPINSTRUCTION_TYPE.ParameterizedBuiltin);
-		String2SPInstructionType.put( "rmempty"	    , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
-		String2SPInstructionType.put( "replace"	    , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
-		String2SPInstructionType.put( "rexpand"	    , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
-		String2SPInstructionType.put( "transform"   , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+		String2SPInstructionType.put( "groupedagg"	 , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+		String2SPInstructionType.put( "mapgroupedagg", SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+		String2SPInstructionType.put( "rmempty"	     , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+		String2SPInstructionType.put( "replace"	     , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+		String2SPInstructionType.put( "rexpand"	     , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+		String2SPInstructionType.put( "transform"    , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
 		
 		String2SPInstructionType.put( "mappend", SPINSTRUCTION_TYPE.MAppend);
 		String2SPInstructionType.put( "rappend", SPINSTRUCTION_TYPE.RAppend);
@@ -338,10 +339,10 @@ public class SPInstructionParser extends InstructionParser
 				if ( parts[0].equals("log") || parts[0].equals("log_nz") ) {
 					if ( parts.length == 3 ) {
 						// B=log(A), y=log(x)
-						return (SPInstruction) BuiltinUnarySPInstruction.parseInstruction(str);
+						return BuiltinUnarySPInstruction.parseInstruction(str);
 					} else if ( parts.length == 4 ) {
 						// B=log(A,10), y=log(x,10)
-						return (SPInstruction) BuiltinBinarySPInstruction.parseInstruction(str);
+						return BuiltinBinarySPInstruction.parseInstruction(str);
 					}
 				}
 				else {
@@ -349,40 +350,40 @@ public class SPInstructionParser extends InstructionParser
 				}
 				
 			case BuiltinBinary:
-				return (SPInstruction) BuiltinBinarySPInstruction.parseInstruction(str);
+				return BuiltinBinarySPInstruction.parseInstruction(str);
 				
 			case BuiltinUnary:
-				return (SPInstruction) BuiltinUnarySPInstruction.parseInstruction(str);
+				return BuiltinUnarySPInstruction.parseInstruction(str);
 				
 			case ParameterizedBuiltin:
-				return (SPInstruction) ParameterizedBuiltinSPInstruction.parseInstruction(str);
+				return ParameterizedBuiltinSPInstruction.parseInstruction(str);
 				
 			case MatrixReshape:
-				return (SPInstruction) MatrixReshapeSPInstruction.parseInstruction(str);
+				return MatrixReshapeSPInstruction.parseInstruction(str);
 				
 			case MAppend:
-				return (SPInstruction) AppendMSPInstruction.parseInstruction(str);
+				return AppendMSPInstruction.parseInstruction(str);
 			
 			case GAppend:
-				return (SPInstruction) AppendGSPInstruction.parseInstruction(str);
+				return AppendGSPInstruction.parseInstruction(str);
 			
 			case GAlignedAppend:
-				return (SPInstruction) AppendGAlignedSPInstruction.parseInstruction(str);
+				return AppendGAlignedSPInstruction.parseInstruction(str);
 				
 			case RAppend:
-				return (SPInstruction) AppendRSPInstruction.parseInstruction(str);
+				return AppendRSPInstruction.parseInstruction(str);
 				
 			case Rand:
-				return (SPInstruction) RandSPInstruction.parseInstruction(str);
+				return RandSPInstruction.parseInstruction(str);
 				
 			case QSort: 
-				return (SPInstruction) QuantileSortSPInstruction.parseInstruction(str);
+				return QuantileSortSPInstruction.parseInstruction(str);
 			
 			case QPick: 
-				return (SPInstruction) QuantilePickSPInstruction.parseInstruction(str);
+				return QuantilePickSPInstruction.parseInstruction(str);
 			
 			case Write:
-				return (SPInstruction) WriteSPInstruction.parseInstruction(str);
+				return WriteSPInstruction.parseInstruction(str);
 				
 			case CumsumAggregate:
 				return CumulativeAggregateSPInstruction.parseInstruction(str);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/mr/GroupedAggregateMInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/mr/GroupedAggregateMInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/mr/GroupedAggregateMInstruction.java
index 4130fd9..34f0945 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/mr/GroupedAggregateMInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/mr/GroupedAggregateMInstruction.java
@@ -30,6 +30,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.MatrixValue;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.mapred.CachedValueMap;
 import org.apache.sysml.runtime.matrix.mapred.DistributedCacheInput;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
@@ -90,37 +91,19 @@ public class GroupedAggregateMInstruction extends BinaryMRInstructionBase implem
 			
 			//get all inputs
 			MatrixIndexes ix = in1.getIndexes();
-			MatrixBlock target = (MatrixBlock)in1.getValue();
 			MatrixBlock groups = (MatrixBlock)dcInput.getDataBlock((int)ix.getRowIndex(), 1).getValue();
 			
-			//execute grouped aggregate operations
-			MatrixBlock out = groups.groupedAggOperations(target, null, new MatrixBlock(), _ngroups, getOperator());
-			
 			//output blocked result
 			int brlen = dcInput.getNumRowsPerBlock();
 			int bclen = dcInput.getNumColsPerBlock();
 			
-			if( out.getNumRows()<=brlen && out.getNumColumns()<=bclen )
-			{
-				//single output block
-				cachedValues.add(output, new IndexedMatrixValue(new MatrixIndexes(1,ix.getColumnIndex()), out));	
-			}
-			else
-			{
-				//multiple output blocks (by op def, single column block )				
-				for(int blockRow = 0; blockRow < (int)Math.ceil(out.getNumRows()/(double)brlen); blockRow++)
-				{
-					int maxRow = (blockRow*brlen + brlen < out.getNumRows()) ? brlen : out.getNumRows() - blockRow*brlen;			
-					int row_offset = blockRow*brlen;
-
-					//copy submatrix to block
-					MatrixBlock tmp = out.sliceOperations( row_offset, row_offset+maxRow-1, 
-							             0, out.getNumColumns()-1, new MatrixBlock() );
-					
-					//append block to result cache
-					cachedValues.add(output, new IndexedMatrixValue(
-							new MatrixIndexes(blockRow+1,ix.getColumnIndex()), tmp));			
-				}
+			//execute map grouped aggregate operations
+			ArrayList<IndexedMatrixValue> outlist = new ArrayList<IndexedMatrixValue>();
+			OperationsOnMatrixValues.performMapGroupedAggregate(getOperator(), in1, groups, _ngroups, brlen, bclen, outlist);
+			
+			//output all result blocks
+			for( IndexedMatrixValue out : outlist ) {
+				cachedValues.add(output, out);
 			}			
 		}	
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGAlignedSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGAlignedSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGAlignedSPInstruction.java
index 5a930da..f1ddcdf 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGAlignedSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGAlignedSPInstruction.java
@@ -29,7 +29,6 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -49,7 +48,7 @@ public class AppendGAlignedSPInstruction extends BinarySPInstruction
 		_cbind = cbind;
 	}
 	
-	public static Instruction parseInstruction ( String str ) 
+	public static AppendGAlignedSPInstruction parseInstruction ( String str ) 
 		throws DMLRuntimeException
 	{
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
index ecf2bc3..30ca4f7 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
@@ -33,7 +33,6 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -54,7 +53,7 @@ public class AppendGSPInstruction extends BinarySPInstruction
 		_cbind = cbind;
 	}
 	
-	public static Instruction parseInstruction ( String str ) 
+	public static AppendGSPInstruction parseInstruction ( String str ) 
 		throws DMLRuntimeException 
 	{	
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
index a1a0bf3..245f234 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
@@ -32,7 +32,6 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
@@ -59,7 +58,7 @@ public class AppendMSPInstruction extends BinarySPInstruction
 		_cbind = cbind;
 	}
 	
-	public static Instruction parseInstruction ( String str ) 
+	public static AppendMSPInstruction parseInstruction ( String str ) 
 		throws DMLRuntimeException 
 	{
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
index 7beba7d..e9e6e35 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
@@ -29,7 +29,6 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -48,7 +47,7 @@ public class AppendRSPInstruction extends BinarySPInstruction
 		_cbind = cbind;
 	}
 	
-	public static Instruction parseInstruction ( String str ) 
+	public static AppendRSPInstruction parseInstruction ( String str ) 
 		throws DMLRuntimeException 
 	{	
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinBinarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinBinarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinBinarySPInstruction.java
index da85a89..01657ec 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinBinarySPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinBinarySPInstruction.java
@@ -27,7 +27,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.ValueFunction;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
@@ -50,7 +49,7 @@ public abstract class BuiltinBinarySPInstruction extends BinarySPInstruction
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-	public static Instruction parseInstruction ( String str ) 
+	public static BuiltinBinarySPInstruction parseInstruction ( String str ) 
 		throws DMLRuntimeException, DMLUnsupportedOperationException 
 	{
 		CPOperand in1 = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinUnarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinUnarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinUnarySPInstruction.java
index c9d5ab6..0bd272c 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinUnarySPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinUnarySPInstruction.java
@@ -25,7 +25,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.ValueFunction;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
@@ -47,7 +46,7 @@ public abstract class BuiltinUnarySPInstruction extends UnarySPInstruction
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-	public static Instruction parseInstruction ( String str ) 
+	public static BuiltinUnarySPInstruction parseInstruction ( String str ) 
 			throws DMLRuntimeException, DMLUnsupportedOperationException 
 	{
 		CPOperand in = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
index 5d30c94..c2d180e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
@@ -31,7 +31,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
@@ -70,7 +69,7 @@ public class MatrixReshapeSPInstruction extends UnarySPInstruction
 	 * @return
 	 * @throws DMLRuntimeException
 	 */
-	public static Instruction parseInstruction ( String str ) 
+	public static MatrixReshapeSPInstruction parseInstruction ( String str ) 
 		throws DMLRuntimeException 
 	{
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 505e232..f8e2669 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -29,6 +29,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
 import scala.Tuple2;
 
 import org.apache.sysml.lops.Lop;
+import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.parser.ParameterizedBuiltinFunctionExpression;
 import org.apache.sysml.parser.Statement;
@@ -37,9 +38,9 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.functionobjects.ParameterizedBuiltin;
 import org.apache.sysml.runtime.functionobjects.ValueFunction;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction;
@@ -57,6 +58,7 @@ import org.apache.sysml.runtime.matrix.data.LibMatrixReorg;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.data.WeightedCell;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
@@ -68,10 +70,10 @@ import org.apache.sysml.runtime.transform.DataTransform;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction 
-{
-	
-	private int arity;
+{	
 	protected HashMap<String,String> params;
+	
+	//removeEmpty-specific attributes
 	private boolean _bRmEmptyBC = false;
 	
 	public ParameterizedBuiltinSPInstruction(Operator op, HashMap<String,String> paramsMap, CPOperand out, String opcode, String istr, boolean bRmEmptyBC )
@@ -82,10 +84,6 @@ public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction
 		_bRmEmptyBC = bRmEmptyBC;
 	}
 
-	public int getArity() {
-		return arity;
-	}
-	
 	public HashMap<String,String> getParams() { return params; }
 	
 	public static HashMap<String, String> constructParameterMap(String[] params) {
@@ -102,70 +100,89 @@ public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction
 		return paramMap;
 	}
 	
-	public static Instruction parseInstruction ( String str ) 
+	public static ParameterizedBuiltinSPInstruction parseInstruction ( String str ) 
 		throws DMLRuntimeException, DMLUnsupportedOperationException 
 	{
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
 		// first part is always the opcode
 		String opcode = parts[0];
-		// last part is always the output
-		CPOperand out = new CPOperand( parts[parts.length-1] ); 
 
-		// process remaining parts and build a hash map
-		HashMap<String,String> paramsMap = constructParameterMap(parts);
+		if( opcode.equalsIgnoreCase("mapgroupedagg") )
+		{
+			CPOperand target = new CPOperand( parts[1] ); 
+			CPOperand groups = new CPOperand( parts[2] );
+			CPOperand out = new CPOperand( parts[3] );
 
-		// determine the appropriate value function
-		ValueFunction func = null;
-		if ( opcode.equalsIgnoreCase("groupedagg")) {
-			// check for mandatory arguments
-			String fnStr = paramsMap.get("fn");
-			if ( fnStr == null ) 
-				throw new DMLRuntimeException("Function parameter is missing in groupedAggregate.");
-			if ( fnStr.equalsIgnoreCase("centralmoment") ) {
-				if ( paramsMap.get("order") == null )
-					throw new DMLRuntimeException("Mandatory \"order\" must be specified when fn=\"centralmoment\" in groupedAggregate.");
-			}
+			HashMap<String,String> paramsMap = new HashMap<String, String>();
+			paramsMap.put(Statement.GAGG_TARGET, target.getName());
+			paramsMap.put(Statement.GAGG_GROUPS, groups.getName());
+			paramsMap.put(Statement.GAGG_NUM_GROUPS, parts[4]);
 			
-			Operator op = GroupedAggregateInstruction.parseGroupedAggOperator(fnStr, paramsMap.get("order"));
-			return new ParameterizedBuiltinSPInstruction(op, paramsMap, out, opcode, str, false);
-		}
-		else if(   opcode.equalsIgnoreCase("rmempty") ) 
-		{
-			boolean bRmEmptyBC = false; 
-			if(parts.length > 6)
-				bRmEmptyBC = (parts[5].compareTo("true") == 0)?true:false;
-								
-			func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
-			return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, bRmEmptyBC);
-		}
-		else if(   opcode.equalsIgnoreCase("rexpand") ) 
-		{
-			func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
-			return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
-		}
-		else if(   opcode.equalsIgnoreCase("replace") ) 
-		{
-			func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
-			return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
+			Operator op = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.LASTCOLUMN);
+			
+			return new ParameterizedBuiltinSPInstruction(op, paramsMap, out, opcode, str, false);		
 		}
-		else if ( opcode.equalsIgnoreCase("transform") ) 
+		else
 		{
-			func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
-			String specFile = paramsMap.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_TXSPEC);
-			String applyTxPath = paramsMap.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD);
-			if ( specFile != null && applyTxPath != null)
-				throw new DMLRuntimeException(
-						"Invalid parameters to transform(). Only one of '"
-								+ ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_TXSPEC
-								+ "' or '"
-								+ ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD
-								+ "' can be specified.");
-			return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
-		}
-		else {
-			throw new DMLRuntimeException("Unknown opcode (" + opcode + ") for ParameterizedBuiltin Instruction.");
-		}
+			// last part is always the output
+			CPOperand out = new CPOperand( parts[parts.length-1] ); 
 
+			// process remaining parts and build a hash map
+			HashMap<String,String> paramsMap = constructParameterMap(parts);
+
+			// determine the appropriate value function
+			ValueFunction func = null;
+					
+			if ( opcode.equalsIgnoreCase("groupedagg")) {
+				// check for mandatory arguments
+				String fnStr = paramsMap.get("fn");
+				if ( fnStr == null ) 
+					throw new DMLRuntimeException("Function parameter is missing in groupedAggregate.");
+				if ( fnStr.equalsIgnoreCase("centralmoment") ) {
+					if ( paramsMap.get("order") == null )
+						throw new DMLRuntimeException("Mandatory \"order\" must be specified when fn=\"centralmoment\" in groupedAggregate.");
+				}
+				
+				Operator op = GroupedAggregateInstruction.parseGroupedAggOperator(fnStr, paramsMap.get("order"));
+				return new ParameterizedBuiltinSPInstruction(op, paramsMap, out, opcode, str, false);
+			}
+			else if(   opcode.equalsIgnoreCase("rmempty") ) 
+			{
+				boolean bRmEmptyBC = false; 
+				if(parts.length > 6)
+					bRmEmptyBC = (parts[5].compareTo("true") == 0)?true:false;
+									
+				func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
+				return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, bRmEmptyBC);
+			}
+			else if(   opcode.equalsIgnoreCase("rexpand") ) 
+			{
+				func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
+				return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
+			}
+			else if(   opcode.equalsIgnoreCase("replace") ) 
+			{
+				func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
+				return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
+			}
+			else if ( opcode.equalsIgnoreCase("transform") ) 
+			{
+				func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
+				String specFile = paramsMap.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_TXSPEC);
+				String applyTxPath = paramsMap.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD);
+				if ( specFile != null && applyTxPath != null)
+					throw new DMLRuntimeException(
+							"Invalid parameters to transform(). Only one of '"
+									+ ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_TXSPEC
+									+ "' or '"
+									+ ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD
+									+ "' can be specified.");
+				return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
+			}
+			else {
+				throw new DMLRuntimeException("Unknown opcode (" + opcode + ") for ParameterizedBuiltin Instruction.");
+			}
+		}
 	}
 	
 
@@ -177,7 +194,31 @@ public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction
 		String opcode = getOpcode();
 		
 		//opcode guaranteed to be a valid opcode (see parsing)
-		if ( opcode.equalsIgnoreCase("groupedagg") ) 
+		if( opcode.equalsIgnoreCase("mapgroupedagg") )
+		{		
+			//get input rdd handle
+			String targetVar = params.get(Statement.GAGG_TARGET);
+			String groupsVar = params.get(Statement.GAGG_GROUPS);			
+			JavaPairRDD<MatrixIndexes,MatrixBlock> target = sec.getBinaryBlockRDDHandleForVariable(targetVar);
+			PartitionedBroadcastMatrix groups = sec.getBroadcastForVariable(groupsVar);
+			MatrixCharacteristics mc1 = sec.getMatrixCharacteristics( targetVar );
+			MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
+			CPOperand ngrpOp = new CPOperand(params.get(Statement.GAGG_NUM_GROUPS));
+			int ngroups = (int)sec.getScalarInput(ngrpOp.getName(), ngrpOp.getValueType(), ngrpOp.isLiteral()).getLongValue();
+			
+			//execute map grouped aggregate
+			JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
+					target.flatMapToPair(new RDDMapGroupedAggFunction(groups, _optr, 
+							ngroups, mc1.getRowsPerBlock(), mc1.getColsPerBlock()));
+			out = RDDAggregateUtils.sumByKeyStable(out);
+			
+			//updated characteristics and handle outputs
+			mcOut.set(ngroups, mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1);
+			sec.setRDDHandleForVariable(output.getName(), out);			
+			sec.addLineageRDD( output.getName(), targetVar );
+			sec.addLineageBroadcast( output.getName(), groupsVar );	
+		}
+		else if ( opcode.equalsIgnoreCase("groupedagg") ) 
 		{	
 			boolean broadcastGroups = Boolean.parseBoolean(params.get("broadcast"));
 			
@@ -519,6 +560,44 @@ public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction
 		}
 	}
 	
+	public static class RDDMapGroupedAggFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>,MatrixIndexes,MatrixBlock> 
+	{
+		private static final long serialVersionUID = 6795402640178679851L;
+		
+		private PartitionedBroadcastMatrix _pbm = null;
+		private Operator _op = null;
+		private int _ngroups = -1;
+		private int _brlen = -1;
+		private int _bclen = -1;
+		
+		public RDDMapGroupedAggFunction(PartitionedBroadcastMatrix pbm, Operator op, int ngroups, int brlen, int bclen) 
+		{
+			_pbm = pbm;
+			_op = op;
+			_ngroups = ngroups;
+			_brlen = brlen;
+			_bclen = bclen;
+		}
+
+		@Override
+		public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
+			throws Exception 
+		{
+			//get all inputs
+			MatrixIndexes ix = arg0._1();
+			MatrixBlock target = arg0._2();		
+			MatrixBlock groups = _pbm.getMatrixBlock((int)ix.getRowIndex(), 1);
+			
+			//execute map grouped aggregate operations
+			IndexedMatrixValue in1 = SparkUtils.toIndexedMatrixBlock(ix, target);
+			ArrayList<IndexedMatrixValue> outlist = new ArrayList<IndexedMatrixValue>();
+			OperationsOnMatrixValues.performMapGroupedAggregate(_op, in1, groups, _ngroups, _brlen, _bclen, outlist);
+			
+			//output all result blocks
+			return SparkUtils.fromIndexedMatrixBlock(outlist);
+		}
+	}
+	
 	/**
 	 * 
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
index a4f7a83..5cea24f 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
@@ -32,7 +32,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.cp.DoubleObject;
@@ -66,7 +65,7 @@ public class QuantilePickSPInstruction extends BinarySPInstruction
 	 * @return
 	 * @throws DMLRuntimeException
 	 */
-	public static Instruction parseInstruction ( String str ) 
+	public static QuantilePickSPInstruction parseInstruction ( String str ) 
 		throws DMLRuntimeException 
 	{
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantileSortSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantileSortSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantileSortSPInstruction.java
index abf837b..793354f 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantileSortSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantileSortSPInstruction.java
@@ -28,7 +28,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDSortUtils;
@@ -59,7 +58,7 @@ public class QuantileSortSPInstruction extends UnarySPInstruction
 		_sptype = SPINSTRUCTION_TYPE.QSort;
 	}
 	
-	public static Instruction parseInstruction ( String str ) 
+	public static QuantileSortSPInstruction parseInstruction ( String str ) 
 		throws DMLRuntimeException {
 		CPOperand in1 = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN);
 		CPOperand in2 = null;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
index 3b4f798..b1eca1e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
@@ -45,7 +45,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
@@ -197,7 +196,7 @@ public class RandSPInstruction extends UnarySPInstruction
 	 * @return
 	 * @throws DMLRuntimeException
 	 */
-	public static Instruction parseInstruction(String str) 
+	public static RandSPInstruction parseInstruction(String str) 
 		throws DMLRuntimeException 
 	{
 		String[] s = InstructionUtils.getInstructionPartsWithValueType ( str );

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index 47cdfd1..3a6ad01 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -34,7 +34,6 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction;
@@ -72,7 +71,7 @@ public class WriteSPInstruction extends SPInstruction
 		formatProperties = null; // set in case of csv
 	}
 
-	public static Instruction parseInstruction ( String str ) 
+	public static WriteSPInstruction parseInstruction ( String str ) 
 		throws DMLRuntimeException 
 	{
 		String[] parts = InstructionUtils.getInstructionPartsWithValueType ( str );

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
index 65e2c22..485b559 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
@@ -350,4 +350,45 @@ public class OperationsOnMatrixValues
 		//execute actual slice operation
 		in.getValue().sliceOperations(outlist, tmpRange, rowCut, colCut, brlen, bclen, boundaryRlen, boundaryClen);
 	}
+	
+	/**
+	 * 
+	 * @param target
+	 * @param groups
+	 * @param brlen
+	 * @param bclen
+	 * @param outlist
+	 * @throws DMLRuntimeException 
+	 * @throws DMLUnsupportedOperationException 
+	 */
+	public static void performMapGroupedAggregate( Operator op, IndexedMatrixValue inTarget, MatrixBlock groups, int ngroups, int brlen, int bclen, ArrayList<IndexedMatrixValue> outlist ) throws DMLRuntimeException, DMLUnsupportedOperationException
+	{
+		MatrixIndexes ix = inTarget.getIndexes();
+		MatrixBlock target = (MatrixBlock)inTarget.getValue();
+		
+		//execute grouped aggregate operations
+		MatrixBlock out = groups.groupedAggOperations(target, null, new MatrixBlock(), ngroups, op);
+		
+		if( out.getNumRows()<=brlen && out.getNumColumns()<=bclen )
+		{
+			//single output block
+			outlist.add( new IndexedMatrixValue(new MatrixIndexes(1,ix.getColumnIndex()), out) );	
+		}
+		else
+		{
+			//multiple output blocks (by op def, single column block )				
+			for(int blockRow = 0; blockRow < (int)Math.ceil(out.getNumRows()/(double)brlen); blockRow++)
+			{
+				int maxRow = (blockRow*brlen + brlen < out.getNumRows()) ? brlen : out.getNumRows() - blockRow*brlen;			
+				int row_offset = blockRow*brlen;
+
+				//copy submatrix to block
+				MatrixBlock tmp = out.sliceOperations( row_offset, row_offset+maxRow-1, 
+						             0, out.getNumColumns()-1, new MatrixBlock() );
+				
+				//append block to result cache
+				outlist.add(new IndexedMatrixValue(new MatrixIndexes(blockRow+1,ix.getColumnIndex()), tmp));			
+			}
+		}
+	}
 }



[2/4] incubator-systemml git commit: New static simplification rewrite 'simplify unary-over-ppred operations'

Posted by mb...@apache.org.
New static simplification rewrite 'simplify unary-over-ppred operations'

Examples are abs(ppred()) -> ppred() and similarly ceil, floor, and
round over ppred because ppred guarantees 0/1 indicator outputs.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/ba08b4c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/ba08b4c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/ba08b4c1

Branch: refs/heads/master
Commit: ba08b4c1b25f0a66e24cc8bf5ebf0c5f7f71581a
Parents: f514f82
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 2 16:52:59 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 2 16:52:59 2016 -0800

----------------------------------------------------------------------
 .../RewriteAlgebraicSimplificationStatic.java   | 35 +++++++++++++++++++-
 1 file changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ba08b4c1/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
index adda6a2..5483bed 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
@@ -138,7 +138,8 @@ public class RewriteAlgebraicSimplificationStatic extends HopRewriteRule
  			hi = simplifyDistributiveBinaryOperation(hop, hi, i);//e.g., (X-Y*X) -> (1-Y)*X
  			hi = simplifyBushyBinaryOperation(hop, hi, i);       //e.g., (X*(Y*(Z%*%v))) -> (X*Y)*(Z%*%v)
  			hi = simplifyUnaryAggReorgOperation(hop, hi, i);     //e.g., sum(t(X)) -> sum(X)
-			hi = simplifyTransposedAppend(hop, hi, i);           //e.g., t(cbind(t(A),t(B))) -> rbind(A,B);
+			hi = simplifyUnaryPPredOperation(hop, hi, i);        //e.g., abs(ppred()) -> ppred(), others: round, ceil, floor
+ 			hi = simplifyTransposedAppend(hop, hi, i);           //e.g., t(cbind(t(A),t(B))) -> rbind(A,B);
  			hi = fuseBinarySubDAGToUnaryOperation(hop, hi, i);   //e.g., X*(1-X)-> sprop(X) || 1/(1+exp(-X)) -> sigmoid(X) || X*(X>0) -> selp(X)
 			hi = simplifyTraceMatrixMult(hop, hi, i);            //e.g., trace(X%*%Y)->sum(X*t(Y));  
 			hi = simplifySlicedMatrixMult(hop, hi, i);           //e.g., (X%*%Y)[1,1] -> X[1,] %*% Y[,1];
@@ -744,6 +745,38 @@ public class RewriteAlgebraicSimplificationStatic extends HopRewriteRule
 	 * @param pos
 	 * @return
 	 */
+	private Hop simplifyUnaryPPredOperation( Hop parent, Hop hi, int pos )
+	{
+		if( hi instanceof UnaryOp && hi.getDataType()==DataType.MATRIX  //unaryop
+			&& hi.getInput().get(0) instanceof BinaryOp                 //binaryop - ppred
+			&& ((BinaryOp)hi.getInput().get(0)).isPPredOperation() )
+		{
+			UnaryOp uop = (UnaryOp) hi; //valid unary op
+			if( uop.getOp()==OpOp1.ABS || uop.getOp()==OpOp1.CEIL
+				|| uop.getOp()==OpOp1.FLOOR || uop.getOp()==OpOp1.ROUND )
+			{
+				//clear link unary-binary
+				Hop input = uop.getInput().get(0);
+				HopRewriteUtils.removeAllChildReferences(hi);
+				
+				HopRewriteUtils.removeChildReferenceByPos(parent, hi, pos);
+				HopRewriteUtils.addChildReference(parent, input, pos);
+				hi = input;
+				
+				LOG.debug("Applied simplifyUnaryPPredOperation.");	
+			}
+		}
+		
+		return hi;
+	}
+	
+	/**
+	 * 
+	 * @param parent
+	 * @param hi
+	 * @param pos
+	 * @return
+	 */
 	private Hop simplifyTransposedAppend( Hop parent, Hop hi, int pos )
 	{
 		//e.g., t(cbind(t(A),t(B))) --> rbind(A,B), t(rbind(t(A),t(B))) --> cbind(A,B)		


[3/4] incubator-systemml git commit: Cleanup lop piggybacking (unused code, visibility, logging, iterators)

Posted by mb...@apache.org.
Cleanup lop piggybacking (unused code, visibility, logging, iterators)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/3ea3cdbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/3ea3cdbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/3ea3cdbd

Branch: refs/heads/master
Commit: 3ea3cdbd4cd83b0dc144f232a69b51cc25bc82a9
Parents: ba08b4c
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 2 16:54:33 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 2 16:54:33 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/lops/compile/Dag.java | 872 ++++---------------
 1 file changed, 149 insertions(+), 723 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ea3cdbd/src/main/java/org/apache/sysml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java
index 3b4cfdb..f5693ef 100644
--- a/src/main/java/org/apache/sysml/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -92,7 +91,6 @@ import org.apache.sysml.runtime.matrix.sort.PickFromCompactInputFormat;
  */
 public class Dag<N extends Lop> 
 {
-	
 	private static final Log LOG = LogFactory.getLog(Dag.class.getName());
 
 	private static final int CHILD_BREAKS_ALIGNMENT = 2;
@@ -106,7 +104,7 @@ public class Dag<N extends Lop>
 	
 	private int total_reducers = -1;
 	private String scratch = "";
-	private String scratchFilePath = "";
+	private String scratchFilePath = null;
 	
 	private double gmrMapperFootprint = 0;
 	
@@ -185,7 +183,7 @@ public class Dag<N extends Lop>
 	}
 	
 	private String getFilePath() {
-		if ( scratchFilePath.equalsIgnoreCase("") ) {
+		if ( scratchFilePath == null ) {
 			scratchFilePath = scratch + Lop.FILE_SEPARATOR
 								+ Lop.PROCESS_PREFIX + DMLScript.getUUID()
 								+ Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR
@@ -209,6 +207,20 @@ public class Dag<N extends Lop>
 	}
 
 	/**
+	 * Method to add a node to the DAG.
+	 * 
+	 * @param node
+	 * @return true if node was not already present, false if not.
+	 */
+
+	public boolean addNode(N node) {
+		if (nodes.contains(node))
+			return false;		
+		nodes.add(node);
+		return true;
+	}
+	
+	/**
 	 * 
 	 * @param config
 	 * @return
@@ -262,83 +274,6 @@ public class Dag<N extends Lop>
 
 	}
 
-	/**
-	 * Method to remove transient reads that do not have a transient write
-	 * 
-	 * @param nodeV
-	 * @param inst
-	 * @throws DMLUnsupportedOperationException
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("unused")
-	private void deleteUnwantedTransientReadVariables(ArrayList<N> nodeV,
-			ArrayList<Instruction> inst) throws DMLRuntimeException,
-			DMLUnsupportedOperationException {
-		HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
-		
-		LOG.trace("In delete unwanted variables");
-
-		// first capture all transient read variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
-
-			if (node.getExecLocation() == ExecLocation.Data
-					&& ((Data) node).isTransient()
-					&& ((Data) node).getOperationType() == OperationTypes.READ) {
-				labelNodeMapping.put(node.getOutputParameters().getLabel(),
-						node);
-			}
-		}
-
-		// generate delete instructions for all transient read variables without
-		// a transient write
-		// first capture all transient write variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
-
-			if (node.getExecLocation() == ExecLocation.Data
-					&& ((Data) node).isTransient()
-					&& ((Data) node).getOperationType() == OperationTypes.WRITE ) {
-				if (node.getInputs().get(0).getExecLocation() == ExecLocation.Data) {
-					// this transient write is NOT a result of actual computation, but is simple copy.
-					// in such a case, preserve the input variable so that appropriate copy instruction (cpvar or GMR) is generated
-					// therefore, remove the input label from labelNodeMapping
-					labelNodeMapping.remove(node.getInputs().get(0).getOutputParameters().getLabel());
-				}
-				else {
-					if(labelNodeMapping.containsKey(node.getOutputParameters().getLabel())) 
-						// corresponding transient read exists (i.e., the variable is updated)
-						// in such a case, generate rmvar instruction so that OLD data gets deleted
-						labelNodeMapping.remove(node.getOutputParameters().getLabel());
-				}
-			}
-		}
-
-		// generate RM instructions
-		Instruction rm_inst = null;
-		for( Entry<String, N> e : labelNodeMapping.entrySet() ) {
-			String label = e.getKey();
-			N node = e.getValue();
-
-			if (((Data) node).getDataType() == DataType.SCALAR) {
-				// if(DEBUG)
-				// System.out.println("rmvar" + Lops.OPERAND_DELIMITOR + label);
-				// inst.add(new VariableSimpleInstructions("rmvar" +
-				// Lops.OPERAND_DELIMITOR + label));
-
-			} else {
-				rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
-				rm_inst.setLocation(node);
-				
-				if( LOG.isTraceEnabled() )
-					LOG.trace(rm_inst.toString());
-				inst.add(rm_inst);
-				
-			}
-		}
-
-	}
-
 	private void deleteUpdatedTransientReadVariables(StatementBlock sb, ArrayList<N> nodeV,
 			ArrayList<Instruction> inst) throws DMLRuntimeException,
 			DMLUnsupportedOperationException {
@@ -348,20 +283,6 @@ public class Dag<N extends Lop>
 		
 		LOG.trace("In delete updated variables");
 
-		/*
-		Set<String> in = sb.liveIn().getVariables().keySet();
-		Set<String> out = sb.liveOut().getVariables().keySet();
-		Set<String> updated = sb.variablesUpdated().getVariables().keySet();
-		
-		Set<String> intersection = in;
-		intersection.retainAll(out);
-		intersection.retainAll(updated);
-		
-		for (String var : intersection) {
-			inst.add(VariableCPInstruction.prepareRemoveInstruction(var));
-		}
-		*/
-
 		// CANDIDATE list of variables which could have been updated in this statement block 
 		HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
 		
@@ -371,8 +292,7 @@ public class Dag<N extends Lop>
 		HashMap<String, N> updatedLabelsLineNum =  new HashMap<String, N>();
 		
 		// first capture all transient read variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
+		for ( N node : nodeV ) {
 
 			if (node.getExecLocation() == ExecLocation.Data
 					&& ((Data) node).isTransient()
@@ -399,8 +319,7 @@ public class Dag<N extends Lop>
 		}
 
 		// capture updated transient write variables
-		for (int i = 0; i < nodeV.size(); i++) {
-			N node = nodeV.get(i);
+		for ( N node : nodeV ) {
 
 			if (node.getExecLocation() == ExecLocation.Data
 					&& ((Data) node).isTransient()
@@ -422,8 +341,8 @@ public class Dag<N extends Lop>
 			rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
 			rm_inst.setLocation(updatedLabelsLineNum.get(label));
 			
-			
-			LOG.trace(rm_inst.toString());
+			if( LOG.isTraceEnabled() )
+				LOG.trace(rm_inst.toString());
 			inst.add(rm_inst);
 		}
 
@@ -479,23 +398,6 @@ public class Dag<N extends Lop>
 		}*/
 	}
 
-	/**
-	 * Method to add a node to the DAG.
-	 * 
-	 * @param node
-	 * @return true if node was not already present, false if not.
-	 */
-
-	public boolean addNode(N node) {
-		if (nodes.contains(node))
-			return false;
-		else {
-			nodes.add(node);
-			return true;
-		}
-		
-	}
-
 	private ArrayList<ArrayList<N>> createNodeVectors(int size) {
 		ArrayList<ArrayList<N>> arr = new ArrayList<ArrayList<N>>();
 
@@ -508,8 +410,8 @@ public class Dag<N extends Lop>
 	}
 
 	private void clearNodeVectors(ArrayList<ArrayList<N>> arr) {
-		for (int i = 0; i < arr.size(); i++) {
-			arr.get(i).clear();
+		for (ArrayList<N> tmp : arr) {
+			tmp.clear();
 		}
 	}
 
@@ -519,9 +421,10 @@ public class Dag<N extends Lop>
 		int base = jt.getBase();
 
 		for (int i = from; i < to; i++) {
-			if ((nodes.get(i).getCompatibleJobs() & base) == 0) {
+			N node = nodes.get(i);
+			if ((node.getCompatibleJobs() & base) == 0) {
 				if( LOG.isTraceEnabled() )
-					LOG.trace("Not compatible "+ nodes.get(i).toString());
+					LOG.trace("Not compatible "+ node.toString());
 				return false;
 			}
 		}
@@ -665,7 +568,6 @@ public class Dag<N extends Lop>
 	 * @param jobNodes
 	 * @throws LopsException
 	 */
-
 	private void handleSingleOutputJobs(ArrayList<N> execNodes,
 			ArrayList<ArrayList<N>> jobNodes, ArrayList<N> finishedNodes)
 			throws LopsException {
@@ -685,9 +587,7 @@ public class Dag<N extends Lop>
 			if (!jobNodes.get(jindex).isEmpty()) {
 				ArrayList<N> vec = jobNodes.get(jindex);
 
-				// first find all nodes with more than one parent that is not
-				// finished.
-
+				// first find all nodes with more than one parent that is not finished.
 				for (int i = 0; i < vec.size(); i++) {
 					N node = vec.get(i);
 					if (node.getExecLocation() == ExecLocation.MapOrReduce
@@ -705,24 +605,10 @@ public class Dag<N extends Lop>
 							}
 						}
 					} 
-					/*
-					// Following condition will not work for MMRJ because execNodes may contain non-MapAndReduce 
-					// lops that are compatible with MMRJ (e.g., Data-WRITE)
-					else if (!(node.getExecLocation() == ExecLocation.MapAndReduce 
-							     && node.getType() == lopTypes[jobi])) {
-						throw new LopsException(
-								"Error in handleSingleOutputJobs(): encountered incompatible execLocation='"
-										+ node.getExecLocation() + "' for lop (ID="
-										+ node.getID() + ").");
-					}
-					*/
 				}
 
-				// need to redo all nodes in nodesWithOutput as well as their
-				// children
-
-				for (int i = 0; i < vec.size(); i++) {
-					N node = vec.get(i);
+				// need to redo all nodes in nodesWithOutput as well as their children
+				for ( N node : vec ) {
 					if (node.getExecLocation() == ExecLocation.MapOrReduce
 							|| node.getExecLocation() == ExecLocation.Map) {
 						if (nodesWithUnfinishedOutputs.contains(node))
@@ -798,14 +684,6 @@ public class Dag<N extends Lop>
 		}
 	}
 	
-	/*private String getOutputFormat(Data node) { 
-		if(node.getFileFormatType() == FileFormatTypes.TEXT)
-			return "textcell";
-		else if ( node.getOutputParameters().isBlocked_representation() )
-			return "binaryblock";
-		else
-			return "binarycell";
-	}*/
 	
 	/**
 	 * Determine whether to send <code>node</code> to MR or to process it in the control program.
@@ -922,7 +800,6 @@ public class Dag<N extends Lop>
 	 * @throws DMLUnsupportedOperationException
 	 * @throws DMLRuntimeException
 	 */
-
 	@SuppressWarnings("unchecked")
 	private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, ArrayList<N> node_v)
 			throws LopsException, IOException, DMLRuntimeException,
@@ -940,7 +817,6 @@ public class Dag<N extends Lop>
 
 		ArrayList<ArrayList<N>> jobNodes = createNodeVectors(JobType.getNumJobTypes());
 		
-
 		// list of instructions
 		ArrayList<Instruction> inst = new ArrayList<Instruction>();
 
@@ -949,9 +825,6 @@ public class Dag<N extends Lop>
 		ArrayList<Instruction> deleteInst = new ArrayList<Instruction>();
 		ArrayList<Instruction> endOfBlockInst = new ArrayList<Instruction>();
 
-		// delete transient variables that are no longer needed
-		//deleteUnwantedTransientReadVariables(node_v, deleteInst);
-
 		// remove files for transient reads that are updated.
 		deleteUpdatedTransientReadVariables(sb, node_v, writeInst);
 		
@@ -1004,9 +877,6 @@ public class Dag<N extends Lop>
 					if( LOG.isTraceEnabled() )
 						LOG.trace(indent + "Queueing node "
 								+ node.toString() + " (code 2)");
-					//for(N q: queuedNodes) {
-					//	LOG.trace(indent + "  " + q.getType() + "," + q.getID());
-					//}
 					queuedNodes.add(node);
 
 					// if node has more than two inputs,
@@ -1043,22 +913,6 @@ public class Dag<N extends Lop>
 					}
 				}
 
-				/*if (node.getInputs().size() == 2) {
-					int j1 = jobType(node.getInputs().get(0), jobNodes);
-					int j2 = jobType(node.getInputs().get(1), jobNodes);
-					if (j1 != -1 && j2 != -1 && j1 != j2) {
-						LOG.trace(indent + "Queueing node "
-									+ node.toString() + " (code 3)");
-
-						queuedNodes.add(node);
-
-						removeNodesForNextIteration(node, finishedNodes,
-								execNodes, queuedNodes, jobNodes);
-
-						continue;
-					}
-				}*/
-
 				// See if this lop can be eliminated
 				// This check is for "aligner" lops (e.g., group)
 				boolean eliminate = false;
@@ -1267,13 +1121,6 @@ public class Dag<N extends Lop>
 
 				// reduce node, make sure no parent needs reduce, else queue
 				if (node.getExecLocation() == ExecLocation.MapAndReduce) {
-
-					// boolean eliminate = false;
-					// eliminate = canEliminateLop(node, execNodes);
-					// if (eliminate || (!hasChildNode(node, execNodes,
-					// ExecLocation.MapAndReduce)) &&
-					// !hasMRJobChildNode(node,execNodes)) {
-
 					// TODO: statiko -- keep the middle condition
 					// discuss about having a lop that is MapAndReduce but does
 					// not define a job
@@ -1282,14 +1129,7 @@ public class Dag<N extends Lop>
 					execNodes.add(node);
 					finishedNodes.add(node);
 					addNodeByJobType(node, jobNodes, execNodes, eliminate);
-
-					// } else {
-					// if (DEBUG)
-					// System.out.println("Queueing -" + node.toString());
-					// queuedNodes.add(node);
-					// removeNodesForNextIteration(node, finishedNodes,
-					// execNodes, queuedNodes, jobNodes);
-					// }
+					
 					continue;
 				}
 
@@ -1319,10 +1159,10 @@ public class Dag<N extends Lop>
 				// add Scalar to execNodes if it has no child in exec nodes
 				// that will be executed in a MR job.
 				if (node.getExecLocation() == ExecLocation.ControlProgram) {
-					for (int j = 0; j < node.getInputs().size(); j++) {
-						if (execNodes.contains(node.getInputs().get(j))
-								&& !(node.getInputs().get(j).getExecLocation() == ExecLocation.Data)
-								&& !(node.getInputs().get(j).getExecLocation() == ExecLocation.ControlProgram)) {
+					for ( Lop lop : node.getInputs() ) {
+						if (execNodes.contains(lop)
+								&& !(lop.getExecLocation() == ExecLocation.Data)
+								&& !(lop.getExecLocation() == ExecLocation.ControlProgram)) {
 							if( LOG.isTraceEnabled() )
 								LOG.trace(indent + "Queueing -"+ node.toString() + " (code 9)");
 
@@ -1396,11 +1236,8 @@ public class Dag<N extends Lop>
 				// next generate MR instructions
 				if (!execNodes.isEmpty())
 					generateMRJobs(execNodes, inst, writeInst, deleteInst, jobNodes);
-
 				handleSingleOutputJobs(execNodes, jobNodes, finishedNodes);
-
 			}
-
 		}
 
 		// add write and delete inst at the very end.
@@ -1415,9 +1252,7 @@ public class Dag<N extends Lop>
 	}
 
 	private boolean compatibleWithChildrenInExecNodes(ArrayList<N> execNodes, N node) {
-	  for(int i=0; i < execNodes.size(); i++)
-	  {
-	    N tmpNode = execNodes.get(i);
+	  for( N tmpNode : execNodes ) {
 	    // for lops that execute in control program, compatibleJobs property is set to LopProperties.INVALID
 	    // we should not consider such lops in this check
 	    if (isChild(tmpNode, node, IDMap) 
@@ -1466,11 +1301,6 @@ public class Dag<N extends Lop>
 			else {
 				processConsumers((N)in, inst, delteInst, null);
 			}
-			
-			/*if ( in.removeConsumer() == 0 ) {
-				String label = in.getOutputParameters().getLabel();
-				inst.add(VariableCPInstruction.prepareRemoveInstruction(label));
-			}*/
 		}
 	}
 	
@@ -1506,7 +1336,6 @@ public class Dag<N extends Lop>
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-
 	@SuppressWarnings("unchecked")
 	private void generateControlProgramJobs(ArrayList<N> execNodes,
 			ArrayList<Instruction> inst, ArrayList<Instruction> writeInst, ArrayList<Instruction> deleteInst) throws LopsException,
@@ -1568,8 +1397,6 @@ public class Dag<N extends Lop>
 					else {
 						var_deletions.add(node.getOutputParameters().getLabel());
 						var_deletionsLineNum.put(node.getOutputParameters().getLabel(), node);
-						
-						//System.out.println("    --> skipping " + out.getLastInstructions() + " while processing node " + node.getID());
 					}
 				}
 
@@ -1768,10 +1595,9 @@ public class Dag<N extends Lop>
 		}
 
 		// delete all marked nodes
-		for (int i = 0; i < markedNodes.size(); i++) {
-			execNodes.remove(markedNodes.get(i));
+		for ( N node : markedNodes ) {
+			execNodes.remove(node);
 		}
-
 	}
 
 	/**
@@ -1784,7 +1610,6 @@ public class Dag<N extends Lop>
 	 * @param queuedNodes
 	 * @throws LopsException
 	 */
-
 	private void removeNodesForNextIteration(N node, ArrayList<N> finishedNodes,
 			ArrayList<N> execNodes, ArrayList<N> queuedNodes,
 			ArrayList<ArrayList<N>> jobvec) throws LopsException {
@@ -1794,10 +1619,9 @@ public class Dag<N extends Lop>
 			return;
 		
 		//if all children are queued, then there is nothing to do.
-		int numInputs = node.getInputs().size();
 		boolean allQueued = true;
-		for(int i=0; i < numInputs; i++) {
-			if( !queuedNodes.contains(node.getInputs().get(i)) ) {
+		for( Lop input : node.getInputs() ) {
+			if( !queuedNodes.contains(input) ) {
 				allQueued = false;
 				break;
 			}
@@ -1811,8 +1635,8 @@ public class Dag<N extends Lop>
 		// Determine if <code>node</code> has inputs from the same job or multiple jobs
 	    int jobid = Integer.MIN_VALUE;
 		boolean inputs_in_same_job = true;
-		for(int idx=0; idx < node.getInputs().size(); idx++) {
-			int input_jobid = jobType(node.getInputs().get(idx), jobvec);
+		for( Lop input : node.getInputs() ) {
+			int input_jobid = jobType(input, jobvec);
 			if ( jobid == Integer.MIN_VALUE )
 				jobid = input_jobid;
 			else if ( jobid != input_jobid ) { 
@@ -1824,8 +1648,7 @@ public class Dag<N extends Lop>
 		// Determine if there exist any unassigned inputs to <code>node</code>
 		// Evaluate only those lops that execute in MR.
 		boolean unassigned_inputs = false;
-		for(int i=0; i < numInputs; i++) {
-			Lop input = node.getInputs().get(i);
+		for( Lop input : node.getInputs() ) {
 			//if ( input.getExecLocation() != ExecLocation.ControlProgram && jobType(input, jobvec) == -1 ) {
 			if ( input.getExecType() == ExecType.MR && !execNodes.contains(input)) { //jobType(input, jobvec) == -1 ) {
 				unassigned_inputs = true;
@@ -1835,8 +1658,8 @@ public class Dag<N extends Lop>
 
 		// Determine if any node's children are queued
 		boolean child_queued = false;
-		for(int i=0; i < numInputs; i++) {
-			if (queuedNodes.contains(node.getInputs().get(i)) ) {
+		for( Lop input : node.getInputs() ) {
+			if (queuedNodes.contains(input) ) {
 				child_queued = true;
 				break;
 			}
@@ -1940,14 +1763,16 @@ public class Dag<N extends Lop>
 		} // for i
 
 		// we also need to delete all parent nodes of marked nodes
-		for (int i = 0; i < execNodes.size(); i++) {
-			LOG.trace("  Checking for removal - ("
-						+ execNodes.get(i).getID() + ") "
-						+ execNodes.get(i).toString());
-
-			if (hasChildNode(execNodes.get(i), markedNodes) && !markedNodes.contains(execNodes.get(i))) {
-				markedNodes.add(execNodes.get(i));
-				LOG.trace("    Removing for next iteration (code 6) (" + execNodes.get(i).getID() + ") " + execNodes.get(i).toString());
+		for ( N enode : execNodes ) {
+			if( LOG.isTraceEnabled() ) {
+				LOG.trace("  Checking for removal - ("
+							+ enode.getID() + ") " + enode.toString());
+			}
+			
+			if (hasChildNode(enode, markedNodes) && !markedNodes.contains(enode)) {
+				markedNodes.add(enode);
+				if( LOG.isTraceEnabled() )
+					LOG.trace("    Removing for next iteration (code 6) (" + enode.getID() + ") " + enode.toString());
 			}
 		}
 
@@ -1963,193 +1788,6 @@ public class Dag<N extends Lop>
 				queuedNodes.add(n);
 			}
 		}
-		/*for (int i = 0; i < markedNodes.size(); i++) {
-			finishedNodes.remove(markedNodes.elementAt(i));
-			execNodes.remove(markedNodes.elementAt(i));
-			removeNodeByJobType(markedNodes.elementAt(i), jobvec);
-			queuedNodes.add(markedNodes.elementAt(i));
-		}*/
-	}
-
-	@SuppressWarnings("unused")
-	private void removeNodesForNextIterationOLD(N node, ArrayList<N> finishedNodes,
-			ArrayList<N> execNodes, ArrayList<N> queuedNodes,
-			ArrayList<ArrayList<N>> jobvec) throws LopsException {
-		// only queued nodes with two inputs need to be handled.
-
-		// TODO: statiko -- this should be made == 1
-		if (node.getInputs().size() != 2)
-			return;
-		
-		
-		//if both children are queued, nothing to do. 
-	    if (queuedNodes.contains(node.getInputs().get(0))
-	        && queuedNodes.contains(node.getInputs().get(1)))
-	      return;
-	    
-	    if( LOG.isTraceEnabled() )
-	    	LOG.trace("Before remove nodes for next iteration -- size of execNodes " + execNodes.size());
- 
-	    
-
-		boolean inputs_in_same_job = false;
-		// TODO: handle tertiary
-		if (jobType(node.getInputs().get(0), jobvec) != -1
-				&& jobType(node.getInputs().get(0), jobvec) == jobType(node
-						.getInputs().get(1), jobvec)) {
-			inputs_in_same_job = true;
-		}
-
-		boolean unassigned_inputs = false;
-		// here.. scalars shd be ignored
-		if ((node.getInputs().get(0).getExecLocation() != ExecLocation.ControlProgram && jobType(
-				node.getInputs().get(0), jobvec) == -1)
-				|| (node.getInputs().get(1).getExecLocation() != ExecLocation.ControlProgram && jobType(
-						node.getInputs().get(1), jobvec) == -1)) {
-			unassigned_inputs = true;
-		}
-
-		boolean child_queued = false;
-
-		// check if atleast one child was queued.
-		if (queuedNodes.contains(node.getInputs().get(0))
-				|| queuedNodes.contains(node.getInputs().get(1)))
-			child_queued = true;
-
-		// nodes to be dropped
-		ArrayList<N> markedNodes = new ArrayList<N>();
-
-		for (int i = 0; i < execNodes.size(); i++) {
-
-			N tmpNode = execNodes.get(i);
-
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("Checking for removal (" + tmpNode.getID()
-						+ ") " + tmpNode.toString());
-				LOG.trace("Inputs in same job " + inputs_in_same_job);
-				LOG.trace("Unassigned inputs " + unassigned_inputs);
-				LOG.trace("Child queued " + child_queued);
-
-			}
-			
-			//if(node.definesMRJob() && isChild(tmpNode,node) && (tmpNode.getCompatibleJobs() & node.getCompatibleJobs()) == 0)
-			//  continue;
-
-			// TODO: statiko -- check if this is too conservative?
-			if (child_queued) {
-  	     // if one of the children are queued, 
-	       // remove some child nodes on other leg that may be needed later on. 
-	       // For e.g. Group lop. 
- 
-			  if((tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) && 
-			      tmpNode.isAligner())
-			  {
-			    markedNodes.add(tmpNode);
-			    if( LOG.isTraceEnabled() )
-			    	LOG.trace("Removing for next iteration: ("
-			    			+ tmpNode.getID() + ") " + tmpNode.toString());
-			  }
-			  else {
-				if (!hasOtherQueuedParentNode(tmpNode, queuedNodes, node) 
-						&& isChild(tmpNode, node, IDMap)  && branchHasNoOtherUnExecutedParents(tmpNode, node, execNodes, finishedNodes)) {
-
-	
-				    if( 
-				        //e.g. MMCJ
-				        (node.getExecLocation() == ExecLocation.MapAndReduce &&
-						branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes) && !tmpNode.definesMRJob() )
-						||
-						//e.g. Binary
-						(node.getExecLocation() == ExecLocation.Reduce && branchCanBePiggyBackedReduce(tmpNode, node, execNodes, finishedNodes))  )
-					{
-					
-				    	if( LOG.isTraceEnabled() )
-							LOG.trace("Removing for next iteration: ("
-								    + tmpNode.getID() + ") " + tmpNode.toString());
-					
-					
-	        			markedNodes.add(tmpNode);	
-					}
-				 }
-			  }
-			} else {
-				/*
-				 * "node" has no other queued children.
-				 * 
-				 * If inputs are in the same job and "node" is of type
-				 * MapAndReduce, then remove nodes of all types other than
-				 * Reduce, MapAndReduce, and the ones that define a MR job as
-				 * they can be piggybacked later.
-				 * 
-				 * e.g: A=Rand, B=Rand, C=A%*%B Here, both inputs of MMCJ lop
-				 * come from Rand job, and they should not be removed.
-				 * 
-				 * Other examples: -- MMCJ whose children are of type
-				 * MapAndReduce (say GMR) -- Inputs coming from two different
-				 * jobs .. GMR & REBLOCK
-				 */
-				if ((inputs_in_same_job || unassigned_inputs)
-						&& node.getExecLocation() == ExecLocation.MapAndReduce
-						&& !hasOtherMapAndReduceParentNode(tmpNode, execNodes,
-								node)
-						&& isChild(tmpNode, node, IDMap) &&
-						branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes)
-						&& !tmpNode.definesMRJob()) {
-					if( LOG.isTraceEnabled() )
-						LOG.trace("Removing for next iteration:: ("
-								+ tmpNode.getID() + ") " + tmpNode.toString());
-
-					markedNodes.add(tmpNode);
-				}
-
-				// as this node has inputs coming from different jobs, need to
-				// free up everything
-				// below and include the closest MapAndReduce lop if this is of
-				// type Reduce.
-				// if it is of type MapAndReduce, don't need to free any nodes
-
-				if (!inputs_in_same_job && !unassigned_inputs
-						&& isChild(tmpNode, node, IDMap) && 
-						(tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) && 
-		            tmpNode.isAligner()) 
-				{
-					if( LOG.isTraceEnabled() )
-						LOG.trace("Removing for next iteration ("
-										+ tmpNode.getID()
-										+ ") "
-										+ tmpNode.toString());
-
-					markedNodes.add(tmpNode);
-
-				}
-
-			}
-		}
-
-		// we also need to delete all parent nodes of marked nodes
-		for (int i = 0; i < execNodes.size(); i++) {
-			if( LOG.isTraceEnabled() )
-				LOG.trace("Checking for removal - ("
-						+ execNodes.get(i).getID() + ") "
-						+ execNodes.get(i).toString());
-
-			if (hasChildNode(execNodes.get(i), markedNodes)) {
-				markedNodes.add(execNodes.get(i));
-				if( LOG.isTraceEnabled() )
-					LOG.trace("Removing for next iteration - ("
-							+ execNodes.get(i).getID() + ") "
-							+ execNodes.get(i).toString());
-			}
-		}
-
-		// delete marked nodes from finishedNodes and execNodes
-		// add to queued nodes
-		for (int i = 0; i < markedNodes.size(); i++) {
-			finishedNodes.remove(markedNodes.get(i));
-			execNodes.remove(markedNodes.get(i));
-			removeNodeByJobType(markedNodes.get(i), jobvec);
-			queuedNodes.add(markedNodes.get(i));
-		}
 	}
 
 	private boolean branchCanBePiggyBackedReduce(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
@@ -2162,9 +1800,7 @@ public class Dag<N extends Lop>
 				return false;
 		}
 		
-		for(int i=0; i < execNodes.size(); i++) {
-		   N n = execNodes.get(i);
-       
+		for( N n : execNodes ) {
 		   if(n.equals(node))
 			   continue;
        
@@ -2177,10 +1813,6 @@ public class Dag<N extends Lop>
 				   && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
 				   return false;
 		   }
-		   /*if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap) 
-				   && !node.getInputs().contains(tmpNode) 
-				   && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
-			   return false;*/
 	   }
 	   return true;
 	}
@@ -2258,9 +1890,7 @@ public class Dag<N extends Lop>
 			return false;
 		JobType jt = JobType.findJobTypeFromLop(node);
 
-		for (int i = 0; i < execNodes.size(); i++) {
-			N n = execNodes.get(i);
-
+		for ( N n : execNodes ) {
 			if (n.equals(node))
 				continue;
 
@@ -2274,16 +1904,6 @@ public class Dag<N extends Lop>
 				else if (!isCompatible(n, jt))
 					return false;
 			}
-
-			/*
-			 * if(n.equals(tmpNode) && n.getExecLocation() != ExecLocation.Map
-			 * && n.getExecLocation() != ExecLocation.MapOrReduce) return false;
-			 * 
-			 * if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap) &&
-			 * n.getExecLocation() != ExecLocation.Map && n.getExecLocation() !=
-			 * ExecLocation.MapOrReduce) return false;
-			 */
-
 		}
 		return true;
 	}
@@ -2305,10 +1925,7 @@ public class Dag<N extends Lop>
 	  }
 	  
 	  //check to see if any node between node and tmpNode has more than one unfinished output 
-	  for(int i=0; i < execNodes.size(); i++)
-	  {
-	    N n = execNodes.get(i);
-	    
+	  for( N n : execNodes ) {
 	    if(n.equals(node) || n.equals(tmpNode))
 	      continue;
 	    
@@ -2328,36 +1945,6 @@ public class Dag<N extends Lop>
 	  return true;
   }
 
-  /**
-	 * Method to check of there is a node of type MapAndReduce between a child
-	 * (tmpNode) and its parent (node)
-	 * 
-	 * @param tmpNode
-	 * @param execNodes
-	 * @param node
-	 * @return
-	 */
-
-	@SuppressWarnings({ "unchecked", "unused" })
-	private boolean hasMapAndReduceParentNode(N tmpNode, ArrayList<N> execNodes, N node) 
-	{
-		for (int i = 0; i < tmpNode.getOutputs().size(); i++) {
-			N n = (N) tmpNode.getOutputs().get(i);
-
-			if (execNodes.contains(n)
-					&& n.getExecLocation() == ExecLocation.MapAndReduce
-					&& isChild(n, node, IDMap)) {
-				return true;
-			} else {
-				if (hasMapAndReduceParentNode(n, execNodes, node))
-					return true;
-			}
-
-		}
-
-		return false;
-	}
-
 	/**
 	 * Method to return the job index for a lop.
 	 * 
@@ -2366,7 +1953,6 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-
 	private int jobType(Lop lops, ArrayList<ArrayList<N>> jobvec) throws LopsException {
 		for ( JobType jt : JobType.values()) {
 			int i = jt.getId();
@@ -2386,7 +1972,6 @@ public class Dag<N extends Lop>
 	 * @param node
 	 * @return
 	 */
-
 	@SuppressWarnings("unchecked")
 	private boolean hasOtherMapAndReduceParentNode(N tmpNode,
 			ArrayList<N> nodeList, N node) {
@@ -2425,10 +2010,9 @@ public class Dag<N extends Lop>
 		long nodeid = IDMap.get(node.getID());
 		long tmpid = IDMap.get(tmpNode.getID());
 		
-		for ( int i=0; i < queuedNodes.size(); i++ ) {
-			int id = IDMap.get(queuedNodes.get(i).getID());
+		for ( N qnode : queuedNodes ) {
+			int id = IDMap.get(qnode.getID());
 			if ((id != nodeid && nodeMarked[id]) && (id != tmpid && tmpMarked[id]) )
-			//if (nodeMarked[id] && tmpMarked[id])
 				return true;
 		}
 		
@@ -2441,8 +2025,9 @@ public class Dag<N extends Lop>
 	 * @param jobNodes
 	 * @throws DMLRuntimeException
 	 */
-	void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
-			throws DMLRuntimeException {
+	private void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
+		throws DMLRuntimeException 
+	{
 		if (LOG.isTraceEnabled()){
 			for ( JobType jt : JobType.values() ) {
 				int i = jt.getId();
@@ -2458,8 +2043,6 @@ public class Dag<N extends Lop>
 			}
 			
 		}
-
-		
 	}
 
 	/**
@@ -2469,7 +2052,7 @@ public class Dag<N extends Lop>
 	 * @param loc
 	 * @return
 	 */
-	boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
+	private boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
 		for (int i = 0; i < nodes.size(); i++) {
 			if (nodes.get(i).getExecLocation() == ExecLocation.RecordReader)
 				return true;
@@ -2477,8 +2060,8 @@ public class Dag<N extends Lop>
 		return false;
 	}
 
-	ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes) {
-
+	private ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes) 
+	{
 		// obtain the list of record reader nodes
 		ArrayList<N> rrnodes = new ArrayList<N>();
 		for (int i = 0; i < gmrnodes.size(); i++) {
@@ -2542,8 +2125,7 @@ public class Dag<N extends Lop>
 	 * @throws DMLRuntimeException
 	 * @throws DMLUnsupportedOperationException
 	 */
-
-	public void generateMRJobs(ArrayList<N> execNodes,
+	private void generateMRJobs(ArrayList<N> execNodes,
 			ArrayList<Instruction> inst,
 			ArrayList<Instruction> writeinst,
 			ArrayList<Instruction> deleteinst, ArrayList<ArrayList<N>> jobNodes)
@@ -2551,17 +2133,6 @@ public class Dag<N extends Lop>
 			DMLRuntimeException
 
 	{
-
-		/*// copy unassigned lops in execnodes to gmrnodes
-		for (int i = 0; i < execNodes.size(); i++) {
-			N node = execNodes.elementAt(i);
-			if (jobType(node, jobNodes) == -1) {
-				jobNodes.get(JobType.GMR.getId()).add(node);
-				addChildren(node, jobNodes.get(JobType.GMR.getId()),
-						execNodes);
-			}
-		}*/
-
 		printJobNodes(jobNodes);
 		
 		ArrayList<Instruction> rmvarinst = new ArrayList<Instruction>();
@@ -2632,61 +2203,22 @@ public class Dag<N extends Lop>
 	}
 
 	/**
-	 * Method to get the input format for a lop
-	 * 
-	 * @param elementAt
-	 * @return
-	 * @throws LopsException
-	 */
-
-	// This code is replicated in ReBlock.java
-	@SuppressWarnings("unused")
-	private Format getChildFormat(N node) 
-		throws LopsException 
-	{
-		if (node.getOutputParameters().getFile_name() != null
-				|| node.getOutputParameters().getLabel() != null) {
-			return node.getOutputParameters().getFormat();
-		} else {
-			if (node.getInputs().size() > 1)
-				throw new LopsException(node.printErrorLocation() + "Should only have one child! \n");
-			/*
-			 * Return the format of the child node (i.e., input lop) No need of
-			 * recursion here.. because 1) Reblock lop's input can either be
-			 * DataLop or some intermediate computation If it is Data then we
-			 * just take its format (TEXT or BINARY) If it is intermediate lop
-			 * then it is always BINARY since we assume that all intermediate
-			 * computations will be in Binary format 2) Note that Reblock job
-			 * will never have any instructions in the mapper => the input lop
-			 * (if it is other than Data) is always executed in a different job
-			 */
-			return node.getInputs().get(0).getOutputParameters().getFormat();
-			// return getChildFormat((N) node.getInputs().get(0));
-		}
-
-	}
-
-	/**
 	 * Method to add all parents of "node" in exec_n to node_v.
 	 * 
 	 * @param node
 	 * @param node_v
 	 * @param exec_n
 	 */
-
 	private void addParents(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
-		for (int i = 0; i < exec_n.size(); i++) {
-			if (isChild(node, exec_n.get(i), IDMap)) {
-				if (!node_v.contains(exec_n.get(i))) {
+		for (N enode : exec_n ) {
+			if (isChild(node, enode, IDMap)) {
+				if (!node_v.contains(enode)) {
 					if( LOG.isTraceEnabled() )
-						LOG.trace("Adding parent - "
-								+ exec_n.get(i).toString());
-					node_v.add(exec_n.get(i));
+						LOG.trace("Adding parent - " + enode.toString());
+					node_v.add(enode);
 				}
 			}
-
 		}
-
 	}
 
 	/**
@@ -2696,11 +2228,10 @@ public class Dag<N extends Lop>
 	 * @param node_v
 	 * @param exec_n
 	 */
-
 	@SuppressWarnings("unchecked")
 	private void addChildren(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
 
-		/** add child in exec nodes that is not of type scalar **/
+		// add child in exec nodes that is not of type scalar
 		if (exec_n.contains(node)
 				&& node.getExecLocation() != ExecLocation.ControlProgram) {
 			if (!node_v.contains(node)) {
@@ -2714,9 +2245,7 @@ public class Dag<N extends Lop>
 		if (!exec_n.contains(node))
 			return;
 
-		/**
-		 * recurse
-		 */
+		// recurse
 		for (int i = 0; i < node.getInputs().size(); i++) {
 			N n = (N) node.getInputs().get(i);
 			addChildren(n, node_v, exec_n);
@@ -2730,7 +2259,7 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException 
 	 */
-	OutputInfo getOutputInfo(N node, boolean cellModeOverride) 
+	private OutputInfo getOutputInfo(N node, boolean cellModeOverride) 
 		throws LopsException 
 	{
 		if ( (node.getDataType() == DataType.SCALAR && node.getExecType() == ExecType.CP) 
@@ -2797,22 +2326,8 @@ public class Dag<N extends Lop>
 
 		return oinfo;
 	}
-
-	/** 
-	 * Method that determines the output format for a given node, 
-	 * and returns a string representation of OutputInfo. This 
-	 * method is primarily used while generating instructions that
-	 * execute in the control program.
-	 * 
-	 * @param node
-	 * @return
-	 */
-/*  	String getOutputFormat(N node) {
-  		return OutputInfo.outputInfoToString(getOutputInfo(node, false));
-  	}
-*/  	
 	
-	public String prepareAssignVarInstruction(Lop input, Lop node) {
+	private String prepareAssignVarInstruction(Lop input, Lop node) {
 		StringBuilder sb = new StringBuilder();
 		
 		sb.append(ExecType.CP);
@@ -3295,7 +2810,7 @@ public class Dag<N extends Lop>
 	 * @throws DMLRuntimeException
 	 */
 	@SuppressWarnings("unchecked")
-	public void generateMapReduceInstructions(ArrayList<N> execNodes,
+	private void generateMapReduceInstructions(ArrayList<N> execNodes,
 			ArrayList<Instruction> inst, ArrayList<Instruction> writeinst, ArrayList<Instruction> deleteinst, ArrayList<Instruction> rmvarinst, 
 			JobType jt) throws LopsException,
 			DMLUnsupportedOperationException, DMLRuntimeException
@@ -3339,19 +2854,18 @@ public class Dag<N extends Lop>
 		if (jt == JobType.GMR || jt == JobType.GMRCELL) {
 			ArrayList<N> markedNodes = new ArrayList<N>();
 			// only keep data nodes that are results of some computation.
-			for (int i = 0; i < rootNodes.size(); i++) {
-				N node = rootNodes.get(i);
-				if (node.getExecLocation() == ExecLocation.Data
-						&& ((Data) node).isTransient()
-						&& ((Data) node).getOperationType() == OperationTypes.WRITE
-						&& ((Data) node).getDataType() == DataType.MATRIX) {
+			for ( N rnode : rootNodes ) {
+				if (rnode.getExecLocation() == ExecLocation.Data
+						&& ((Data) rnode).isTransient()
+						&& ((Data) rnode).getOperationType() == OperationTypes.WRITE
+						&& ((Data) rnode).getDataType() == DataType.MATRIX) {
 					// no computation, just a copy
-					if (node.getInputs().get(0).getExecLocation() == ExecLocation.Data
-							&& ((Data) node.getInputs().get(0)).isTransient()
-							&& node.getOutputParameters().getLabel().compareTo(
-									node.getInputs().get(0)
-											.getOutputParameters().getLabel()) == 0) {
-						markedNodes.add(node);
+					if (rnode.getInputs().get(0).getExecLocation() == ExecLocation.Data
+							&& ((Data) rnode.getInputs().get(0)).isTransient()
+							&& rnode.getOutputParameters().getLabel().compareTo(
+								rnode.getInputs().get(0).getOutputParameters().getLabel()) == 0) 
+					{
+						markedNodes.add(rnode);
 					}
 				}
 			}
@@ -3367,10 +2881,9 @@ public class Dag<N extends Lop>
 		
 		/* Determine all input data files */
 		
-		for (int i = 0; i < rootNodes.size(); i++) {
-			getInputPathsAndParameters(rootNodes.get(i), execNodes,
-					inputs, inputInfos, numRows, numCols, numRowsPerBlock,
-					numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
+		for ( N rnode : rootNodes ) {
+			getInputPathsAndParameters(rnode, execNodes, inputs, inputInfos, numRows, numCols, 
+				numRowsPerBlock, numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
 		}
 		
 		// In case of RAND job, instructions are defined in the input file
@@ -3384,10 +2897,9 @@ public class Dag<N extends Lop>
 		
 		// currently, recordreader instructions are allowed only in GMR jobs
 		if (jt == JobType.GMR || jt == JobType.GMRCELL) {
-			for (int i = 0; i < rootNodes.size(); i++) {
-				getRecordReaderInstructions(rootNodes.get(i), execNodes,
-						inputs, recordReaderInstructions, nodeIndexMapping,
-						start_index, inputLabels, inputLops, MRJobLineNumbers);
+			for ( N rnode : rootNodes ) {
+				getRecordReaderInstructions(rnode, execNodes, inputs, recordReaderInstructions, 
+					nodeIndexMapping, start_index, inputLabels, inputLops, MRJobLineNumbers);
 				if ( recordReaderInstructions.size() > 1 )
 					throw new LopsException("MapReduce job can only have a single recordreader instruction: " + recordReaderInstructions.toString());
 			}
@@ -3544,23 +3056,6 @@ public class Dag<N extends Lop>
 				processConsumers((N)l, rmvarinst, deleteinst, null);
 			}
 		}
-
-	}
-
-	/**
-	 * Convert a byte array into string
-	 * 
-	 * @param arr
-	 * @return none
-	 */
-	public String getString(byte[] arr) {
-		StringBuilder sb = new StringBuilder();
-		for (int i = 0; i < arr.length; i++) {
-			sb.append(",");
-			sb.append(Byte.toString(arr[i]));
-		}
-
-		return sb.toString();
 	}
 
 	/**
@@ -3571,16 +3066,14 @@ public class Dag<N extends Lop>
 	 */
 	private String getCSVString(ArrayList<String> inputStrings) {
 		StringBuilder sb = new StringBuilder();
-		for (int i = 0; i < inputStrings.size(); i++) {
-			String tmp = inputStrings.get(i);
-			if( tmp != null ) {
+		for ( String str : inputStrings ) {
+			if( str != null ) {
 				if( sb.length()>0 )
 					sb.append(Lop.INSTRUCTION_DELIMITOR);
-				sb.append( tmp ); 
+				sb.append( str ); 
 			}
 		}
 		return sb.toString();
-
 	}
 
 	/**
@@ -3589,7 +3082,6 @@ public class Dag<N extends Lop>
 	 * @param list
 	 * @return
 	 */
-
 	private String[] getStringArray(ArrayList<String> list) {
 		String[] arr = new String[list.size()];
 
@@ -3615,7 +3107,6 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-
 	@SuppressWarnings("unchecked")
 	private int getAggAndOtherInstructions(N node, ArrayList<N> execNodes,
 			ArrayList<String> shuffleInstructions,
@@ -3624,9 +3115,7 @@ public class Dag<N extends Lop>
 			HashMap<N, Integer> nodeIndexMapping, int[] start_index,
 			ArrayList<String> inputLabels, ArrayList<Lop> inputLops, 
 			ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
 	{
-
 		int ret_val = -1;
 
 		if (nodeIndexMapping.containsKey(node))
@@ -3813,9 +3302,6 @@ public class Dag<N extends Lop>
 				start_index[0]++;
 
 				if (node.getType() == Type.Ternary ) {
-					//Tertiary.OperationTypes op = ((Tertiary<?, ?, ?, ?>) node).getOperationType();
-					//if ( op == Tertiary.OperationTypes.CTABLE_TRANSFORM ) {
-					
 					// in case of CTABLE_TRANSFORM_SCALAR_WEIGHT: inputIndices.get(2) would be -1
 					otherInstructionsReducer.add(node.getInstructions(
 							inputIndices.get(0), inputIndices.get(1),
@@ -3824,7 +3310,6 @@ public class Dag<N extends Lop>
 						MRJobLineNumbers.add(node._beginLine);
 					}
 					nodeIndexMapping.put(node, output_index);
-					//}
 				}
 				else if( node.getType() == Type.ParameterizedBuiltin ){
 					otherInstructionsReducer.add(node.getInstructions(
@@ -3848,7 +3333,6 @@ public class Dag<N extends Lop>
 				}
 
 				return output_index;
-
 			}
 			else if (inputIndices.size() == 4) {
 				int output_index = start_index[0];
@@ -3868,7 +3352,6 @@ public class Dag<N extends Lop>
 		}
 
 		return -1;
-
 	}
 
 	/**
@@ -3885,7 +3368,6 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-
 	@SuppressWarnings("unchecked")
 	private int getRecordReaderInstructions(N node, ArrayList<N> execNodes,
 			ArrayList<String> inputStrings,
@@ -3893,9 +3375,7 @@ public class Dag<N extends Lop>
 			HashMap<N, Integer> nodeIndexMapping, int[] start_index,
 			ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
 			ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
 	{
-
 		// if input source, return index
 		if (nodeIndexMapping.containsKey(node))
 			return nodeIndexMapping.get(node);
@@ -3910,7 +3390,6 @@ public class Dag<N extends Lop>
 
 		// get mapper instructions
 		for (int i = 0; i < node.getInputs().size(); i++) {
-
 			// recurse
 			N childNode = (N) node.getInputs().get(i);
 			int ret_val = getRecordReaderInstructions(childNode, execNodes,
@@ -3932,17 +3411,14 @@ public class Dag<N extends Lop>
 
 			// cannot reuse index if this is true
 			// need to add better indexing schemes
-			// if (child_for_max_input_index.getOutputs().size() > 1) {
 			output_index = start_index[0];
 			start_index[0]++;
-			// }
 
 			nodeIndexMapping.put(node, output_index);
 
 			// populate list of input labels.
 			// only Ranagepick lop can contribute to labels
 			if (node.getType() == Type.PickValues) {
-
 				PickByCount pbc = (PickByCount) node;
 				if (pbc.getOperationType() == PickByCount.OperationTypes.RANGEPICK) {
 					int scalarIndex = 1; // always the second input is a scalar
@@ -3975,11 +3451,9 @@ public class Dag<N extends Lop>
 						"Unexpected number of inputs while generating a RecordReader Instruction");
 
 			return output_index;
-
 		}
 
 		return -1;
-
 	}
 
 	/**
@@ -3996,7 +3470,6 @@ public class Dag<N extends Lop>
 	 * @return
 	 * @throws LopsException
 	 */
-
 	@SuppressWarnings("unchecked")
 	private int getMapperInstructions(N node, ArrayList<N> execNodes,
 			ArrayList<String> inputStrings,
@@ -4004,9 +3477,7 @@ public class Dag<N extends Lop>
 			HashMap<N, Integer> nodeIndexMapping, int[] start_index,
 			ArrayList<String> inputLabels, ArrayList<Lop> inputLops, 
 			ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
 	{
-
 		// if input source, return index
 		if (nodeIndexMapping.containsKey(node))
 			return nodeIndexMapping.get(node);
@@ -4134,11 +3605,9 @@ public class Dag<N extends Lop>
 				MRJobLineNumbers.add(node._beginLine);
 			}
 			return output_index;
-
 		}
 
 		return -1;
-
 	}
 
 	// Method to populate inputs and also populates node index mapping.
@@ -4155,12 +3624,9 @@ public class Dag<N extends Lop>
 				&& !nodeIndexMapping.containsKey(node)) {
 			numRows.add(node.getOutputParameters().getNumRows());
 			numCols.add(node.getOutputParameters().getNumCols());
-			numRowsPerBlock.add(node.getOutputParameters()
-					.getRowsInBlock());
-			numColsPerBlock.add(node.getOutputParameters()
-					.getColsInBlock());
-			inputStrings.add(node.getInstructions(inputStrings.size(),
-					inputStrings.size()));
+			numRowsPerBlock.add(node.getOutputParameters().getRowsInBlock());
+			numColsPerBlock.add(node.getOutputParameters().getColsInBlock());
+			inputStrings.add(node.getInstructions(inputStrings.size(), inputStrings.size()));
 			if(DMLScript.ENABLE_DEBUG_MODE) {
 				MRJobLineNumbers.add(node._beginLine);
 			}
@@ -4170,10 +3636,6 @@ public class Dag<N extends Lop>
 			return;
 		}
 
-		// && ( !(node.getExecLocation() == ExecLocation.ControlProgram)
-		// || (node.getExecLocation() == ExecLocation.ControlProgram &&
-		// node.getDataType() != DataType.SCALAR )
-		// )
 		// get input file names
 		if (!execNodes.contains(node)
 				&& !nodeIndexMapping.containsKey(node)
@@ -4192,19 +3654,14 @@ public class Dag<N extends Lop>
 				inputStrings.add(Lop.VARIABLE_NAME_PLACEHOLDER + node.getOutputParameters().getLabel()
 						               + Lop.VARIABLE_NAME_PLACEHOLDER);
 			}
-			//if ( node.getType() == Lops.Type.Data && ((Data)node).isTransient())
-			//	inputStrings.add("##" + node.getOutputParameters().getLabel() + "##");
-			//else
-			//	inputStrings.add(node.getOutputParameters().getLabel());
+			
 			inputLabels.add(node.getOutputParameters().getLabel());
 			inputLops.add(node);
 
 			numRows.add(node.getOutputParameters().getNumRows());
 			numCols.add(node.getOutputParameters().getNumCols());
-			numRowsPerBlock.add(node.getOutputParameters()
-					.getRowsInBlock());
-			numColsPerBlock.add(node.getOutputParameters()
-					.getColsInBlock());
+			numRowsPerBlock.add(node.getOutputParameters().getRowsInBlock());
+			numColsPerBlock.add(node.getOutputParameters().getColsInBlock());
 
 			InputInfo nodeInputInfo = null;
 			// Check if file format type is binary or text and update infos
@@ -4262,15 +3719,12 @@ public class Dag<N extends Lop>
 		}
 
 		// if exec nodes does not contain node at this point, return.
-
 		if (!execNodes.contains(node))
 			return;
 
 		// process children recursively
-
-		for (int i = 0; i < node.getInputs().size(); i++) {
-			N childNode = (N) node.getInputs().get(i);
-			getInputPathsAndParameters(childNode, execNodes, inputStrings,
+		for ( Lop lop : node.getInputs() ) {
+			getInputPathsAndParameters((N)lop, execNodes, inputStrings,
 					inputInfos, numRows, numCols, numRowsPerBlock,
 					numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
 		}
@@ -4283,39 +3737,32 @@ public class Dag<N extends Lop>
 	 * @param execNodes
 	 * @param rootNodes
 	 */
-
 	private void getOutputNodes(ArrayList<N> execNodes, ArrayList<N> rootNodes, JobType jt) {
-		for (int i = 0; i < execNodes.size(); i++) {
-			N node = execNodes.get(i);
-
+		for ( N node : execNodes ) {
 			// terminal node
 			if (node.getOutputs().isEmpty() && !rootNodes.contains(node)) {
 				rootNodes.add(node);
-			} else {
+			} 
+			else {
 				// check for nodes with at least one child outside execnodes
 				int cnt = 0;
-				for (int j = 0; j < node.getOutputs().size(); j++) {
-					if (!execNodes.contains(node.getOutputs().get(j))) {
-						cnt++;
-					}
+				for (Lop lop : node.getOutputs() ) {
+					cnt += (!execNodes.contains(lop)) ? 1 : 0; 
 				}
 
-				if (cnt > 0 //!= 0
-						//&& cnt <= node.getOutputs().size()
-						&& !rootNodes.contains(node) // not already a rootnode
-						&& !(node.getExecLocation() == ExecLocation.Data
-								&& ((Data) node).getOperationType() == OperationTypes.READ && ((Data) node)
-								.getDataType() == DataType.MATRIX)  // Not a matrix Data READ 
-					) {
-					
-
+				if (cnt > 0 && !rootNodes.contains(node) // not already a rootnode
+					&& !(node.getExecLocation() == ExecLocation.Data
+					&& ((Data) node).getOperationType() == OperationTypes.READ 
+					&& ((Data) node).getDataType() == DataType.MATRIX) ) // Not a matrix Data READ 
+				{
 					if ( jt.allowsSingleShuffleInstruction() && node.getExecLocation() != ExecLocation.MapAndReduce)
 						continue;
 					
 					if (cnt < node.getOutputs().size()) {
 						if(!node.getProducesIntermediateOutput())	
 							rootNodes.add(node);
-					} else
+					} 
+					else
 						rootNodes.add(node);
 				}
 			}
@@ -4328,9 +3775,7 @@ public class Dag<N extends Lop>
 	 * @param a
 	 * @param b
 	 */
-
-	public static boolean isChild(Lop a, Lop b, HashMap<Long, Integer> IDMap) {
-		//int aID = IDMap.get(a.getID());
+	private static boolean isChild(Lop a, Lop b, HashMap<Long, Integer> IDMap) {
 		int bID = IDMap.get(b.getID());
 		return a.get_reachable()[bID];
 	}
@@ -4381,17 +3826,15 @@ public class Dag<N extends Lop>
 
 		// print the nodes in sorted order
 		if (LOG.isTraceEnabled()) {
-			for (int i = 0; i < v.size(); i++) {
-				// System.out.print(sortedNodes.get(i).getID() + "("
-				// + levelmap.get(sortedNodes.get(i).getID()) + "), ");
+			for ( N vnode : v ) {
 				StringBuilder sb = new StringBuilder();
-				sb.append(v.get(i).getID());
+				sb.append(vnode.getID());
 				sb.append("(");
-				sb.append(v.get(i).getLevel());
+				sb.append(vnode.getLevel());
 				sb.append(") ");
-				sb.append(v.get(i).getType());
+				sb.append(vnode.getType());
 				sb.append("(");
-				for(Lop vin : v.get(i).getInputs()) {
+				for(Lop vin : vnode.getInputs()) {
 					sb.append(vin.getID());
 					sb.append(",");
 				}
@@ -4413,7 +3856,7 @@ public class Dag<N extends Lop>
 	 * @param marked
 	 */
 	@SuppressWarnings("unchecked")
-	void dagDFS(N root, boolean[] marked) {
+	private void dagDFS(N root, boolean[] marked) {
 		//contains check currently required for globalopt, will be removed when cleaned up
 		if( !IDMap.containsKey(root.getID()) )
 			return;
@@ -4422,33 +3865,32 @@ public class Dag<N extends Lop>
 		if ( marked[mapID] )
 			return;
 		marked[mapID] = true;
-		for(int i=0; i < root.getOutputs().size(); i++) {
-			//System.out.println("CALLING DFS "+root);
-			dagDFS((N)root.getOutputs().get(i), marked);
+		for( Lop lop : root.getOutputs() ) {
+			dagDFS((N)lop, marked);
 		}
 	}
 	
 	private boolean hasDirectChildNode(N node, ArrayList<N> childNodes) {
 		if ( childNodes.isEmpty() ) 
 			return false;
-		
-		for(int i=0; i < childNodes.size(); i++) {
-			if ( childNodes.get(i).getOutputs().contains(node))
+		for( N cnode : childNodes ) {
+			if ( cnode.getOutputs().contains(node))
 				return true;
 		}
 		return false;
 	}
 	
+	private boolean hasChildNode(N node, ArrayList<N> nodes) {
+		return hasChildNode(node, nodes, ExecLocation.INVALID);
+	}
+
 	private boolean hasChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
 		if ( childNodes.isEmpty() ) 
 			return false;
-		
 		int index = IDMap.get(node.getID());
-		for(int i=0; i < childNodes.size(); i++) {
-			N cn = childNodes.get(i);
-			if ( (type == ExecLocation.INVALID || cn.getExecLocation() == type) && cn.get_reachable()[index]) {
+		for( N cnode : childNodes ) {
+			if ( (type == ExecLocation.INVALID || cnode.getExecLocation() == type) && cnode.get_reachable()[index])
 				return true;
-			}
 		}
 		return false;
 	}
@@ -4456,13 +3898,10 @@ public class Dag<N extends Lop>
 	private N getChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
 		if ( childNodes.isEmpty() )
 			return null;
-		
 		int index = IDMap.get(node.getID());
-		for(int i=0; i < childNodes.size(); i++) {
-			N cn = childNodes.get(i);
-			if ( cn.getExecLocation() == type && cn.get_reachable()[index]) {
-				return cn;
-			}
+		for( N cnode : childNodes ) {
+			if ( cnode.getExecLocation() == type && cnode.get_reachable()[index])
+				return cnode;
 		}
 		return null;
 	}
@@ -4479,8 +3918,7 @@ public class Dag<N extends Lop>
 	private N getParentNode(N node, ArrayList<N> parentNodes, ExecLocation type) {
 		if ( parentNodes.isEmpty() )
 			return null;
-		for(int i=0; i < parentNodes.size(); i++ ) {
-			N pn = parentNodes.get(i);
+		for( N pn : parentNodes ) {
 			int index = IDMap.get( pn.getID() );
 			if ( pn.getExecLocation() == type && node.get_reachable()[index])
 				return pn;
@@ -4495,9 +3933,7 @@ public class Dag<N extends Lop>
 			return false;
 		
 		int index = IDMap.get(node.getID());
-		
-		for (int i = 0; i < nodesVec.size(); i++) {
-			N n = nodesVec.get(i);
+		for( N n : nodesVec ) {
 			if ( n.definesMRJob() && n.get_reachable()[index]) 
 				return true;
 		}
@@ -4510,8 +3946,7 @@ public class Dag<N extends Lop>
 		
 		int index = IDMap.get(node.getID());
 		boolean onlyDatagen = true;
-		for (int i = 0; i < nodesVec.size(); i++) {
-			N n = nodesVec.get(i);
+		for( N n : nodesVec ) {
 			if ( n.definesMRJob() && n.get_reachable()[index] &&  JobType.findJobTypeFromLop(n) != JobType.DATAGEN )
 				onlyDatagen = false;
 		}
@@ -4520,11 +3955,10 @@ public class Dag<N extends Lop>
 	}
 
 	@SuppressWarnings("unchecked")
-	private int getChildAlignment(N node, ArrayList<N> execNodes, ExecLocation type) {
-
-		for (int i = 0; i < node.getInputs().size(); i++) {
-			N n = (N) node.getInputs().get(i);
-
+	private int getChildAlignment(N node, ArrayList<N> execNodes, ExecLocation type) 
+	{
+		for (Lop lop : node.getInputs() ) {
+			N n = (N) lop;
 			if (!execNodes.contains(n))
 				continue;
 
@@ -4533,44 +3967,36 @@ public class Dag<N extends Lop>
 					return MR_CHILD_FOUND_BREAKS_ALIGNMENT;
 				else
 					return MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT;
-			} else {
+			} 
+			else {
 				int ret = getChildAlignment(n, execNodes, type);
 				if (ret == MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT
-						|| ret == CHILD_DOES_NOT_BREAK_ALIGNMENT) {
+					|| ret == CHILD_DOES_NOT_BREAK_ALIGNMENT) {
 					if (n.getBreaksAlignment())
 						return CHILD_BREAKS_ALIGNMENT;
 					else
 						return CHILD_DOES_NOT_BREAK_ALIGNMENT;
 				}
-
 				else if (ret == MRCHILD_NOT_FOUND
 						|| ret == CHILD_BREAKS_ALIGNMENT
 						|| ret == MR_CHILD_FOUND_BREAKS_ALIGNMENT)
 					return ret;
 				else
-					throw new RuntimeException(
-							"Something wrong in getChildAlignment().");
+					throw new RuntimeException("Something wrong in getChildAlignment().");
 			}
 		}
 
 		return MRCHILD_NOT_FOUND;
-
-	}
-
-	private boolean hasChildNode(N node, ArrayList<N> nodes) {
-		return hasChildNode(node, nodes, ExecLocation.INVALID);
 	}
 
 	private boolean hasParentNode(N node, ArrayList<N> parentNodes) {
 		if ( parentNodes.isEmpty() )
-			return false;
-		
-		for( int i=0; i < parentNodes.size(); i++ ) {
-			int index = IDMap.get( parentNodes.get(i).getID() );
+			return false;		
+		for( N pnode : parentNodes ) {
+			int index = IDMap.get( pnode.getID() );
 			if ( node.get_reachable()[index])
 				return true;
 		}
 		return false;
 	}
-	
 }