You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2016/01/22 17:34:14 UTC

[38/51] [partial] incubator-systemml git commit: [SYSTEMML-482] [SYSTEMML-480] Adding a Git attributes file to enfore Unix-styled line endings, and normalizing all of the line endings.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/scripts/utils/project.dml
----------------------------------------------------------------------
diff --git a/scripts/utils/project.dml b/scripts/utils/project.dml
index dc69bd0..ee6cd80 100644
--- a/scripts/utils/project.dml
+++ b/scripts/utils/project.dml
@@ -1,80 +1,80 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-# Utility script to project columns from input matrix.
-#
-# Parameters:
-#    X       : (input)  filename of data matrix
-#    P       : (input)  filename of 1-column projection matrix containing columnIDs
-#    o       : (output) filename of output matrix with projected columns
-#    exclude : (default FALSE) TRUE means P contains columnIds to be projected
-#                              FALSE means P contains columnsIDS to be excluded
-#    ofmt    : (default binary) format of output matrix
-#
-# Example:
-#   hadoop jar SystemML.jar -f algorithms/utils/project.dml -nvargs X="/tmp/M.mtx" P="/tmp/P.mtx" o="/tmp/PX.mtx" 
-#
-# Assumptions:
-# The order of colIDs in P is preserved. Order of columns in result is same as order of columns in P.
-#      i.e. projecting columns 4 and 2 of X results in a matrix with columns 4 and 2.
-# If P specifies the exclude list, then projected columns are order preserved.
-
-exclude = ifdef ($exclude, FALSE);
-ofmt = ifdef ($ofmt, "binary");
-
-X = read ($X)
-P = read ($P)
-
-# create projection matrix using projection list and sequence matrix, and pad with 0s. The size of
-# PP is nbrOfColsInX x nbrOfColsToKeep
-
-if (exclude==FALSE)
-{
-   # create projection matrix using projection list and sequence matrix, and pad with 0s. The size
-   # of PP is nbrOfColsInX x nbrOfColsToKeep
-   PP = table(P, seq(1, nrow(P), 1), ncol(X), nrow(P)) 
-
- } else {
-   # create new vector P with list of columns to keep using original vector P containing exclude
-   # columns. These are all small vector operations.
-   C = table(P, seq(1, nrow(P), 1))
-   E = rowSums(C);
-      
-   # Row pad w/ 0s
-   EE = matrix (0, rows=ncol(X), cols=1)
-   EE[1:nrow(E),1] = E
-
-   # Convert exclude column list to include column list, and create column indices
-   EE = ppred(EE, 0, "==")
-   EE = EE * seq(1, ncol(X), 1)
-   P = removeEmpty(target=EE, margin="rows")
-
-   PP = table(P, seq(1, nrow(P), 1), ncol(X), nrow(P))
-
-}
-
-# Perform projection using permutation matrix
-PX = X %*% PP
-
-# Write output
-write (PX, $o, format=ofmt)
-
-
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Utility script to project columns from input matrix.
+#
+# Parameters:
+#    X       : (input)  filename of data matrix
+#    P       : (input)  filename of 1-column projection matrix containing columnIDs
+#    o       : (output) filename of output matrix with projected columns
+#    exclude : (default FALSE) TRUE means P contains columnIds to be projected
+#                              FALSE means P contains columnsIDS to be excluded
+#    ofmt    : (default binary) format of output matrix
+#
+# Example:
+#   hadoop jar SystemML.jar -f algorithms/utils/project.dml -nvargs X="/tmp/M.mtx" P="/tmp/P.mtx" o="/tmp/PX.mtx" 
+#
+# Assumptions:
+# The order of colIDs in P is preserved. Order of columns in result is same as order of columns in P.
+#      i.e. projecting columns 4 and 2 of X results in a matrix with columns 4 and 2.
+# If P specifies the exclude list, then projected columns are order preserved.
+
+exclude = ifdef ($exclude, FALSE);
+ofmt = ifdef ($ofmt, "binary");
+
+X = read ($X)
+P = read ($P)
+
+# create projection matrix using projection list and sequence matrix, and pad with 0s. The size of
+# PP is nbrOfColsInX x nbrOfColsToKeep
+
+if (exclude==FALSE)
+{
+   # create projection matrix using projection list and sequence matrix, and pad with 0s. The size
+   # of PP is nbrOfColsInX x nbrOfColsToKeep
+   PP = table(P, seq(1, nrow(P), 1), ncol(X), nrow(P)) 
+
+ } else {
+   # create new vector P with list of columns to keep using original vector P containing exclude
+   # columns. These are all small vector operations.
+   C = table(P, seq(1, nrow(P), 1))
+   E = rowSums(C);
+      
+   # Row pad w/ 0s
+   EE = matrix (0, rows=ncol(X), cols=1)
+   EE[1:nrow(E),1] = E
+
+   # Convert exclude column list to include column list, and create column indices
+   EE = ppred(EE, 0, "==")
+   EE = EE * seq(1, ncol(X), 1)
+   P = removeEmpty(target=EE, margin="rows")
+
+   PP = table(P, seq(1, nrow(P), 1), ncol(X), nrow(P))
+
+}
+
+# Perform projection using permutation matrix
+PX = X %*% PP
+
+# Write output
+write (PX, $o, format=ofmt)
+
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/scripts/utils/rowIndexMax.dml
----------------------------------------------------------------------
diff --git a/scripts/utils/rowIndexMax.dml b/scripts/utils/rowIndexMax.dml
index 80af2e1..1e5cbc7 100644
--- a/scripts/utils/rowIndexMax.dml
+++ b/scripts/utils/rowIndexMax.dml
@@ -1,38 +1,38 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-# Utility script to return for each row the column nbr with the largest value. If all the values in
-# a row are the same, then the largest column nbr is returned.
-#
-# Parameters:
-#    I       : (input)  filename of input
-#    O       : (output) filename of output
-#    ofmt    : default "csv"; format of O: "csv", "binary"
-#
-# Example:
-#   hadoop jar SystemML.jar -f algorithms/utils/rowIndexMax.dml -nvargs I="/tmp/X.mtx" O="/tmp/X2.mtx"
-#
-
-ofmt = ifdef($ofmt, "csv")
-
-M = read($I)
-C = rowIndexMax(M)
-write(C, $O, format=ofmt)
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Utility script to return for each row the column nbr with the largest value. If all the values in
+# a row are the same, then the largest column nbr is returned.
+#
+# Parameters:
+#    I       : (input)  filename of input
+#    O       : (output) filename of output
+#    ofmt    : default "csv"; format of O: "csv", "binary"
+#
+# Example:
+#   hadoop jar SystemML.jar -f algorithms/utils/rowIndexMax.dml -nvargs I="/tmp/X.mtx" O="/tmp/X2.mtx"
+#
+
+ofmt = ifdef($ofmt, "csv")
+
+M = read($I)
+C = rowIndexMax(M)
+write(C, $O, format=ofmt)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/scripts/utils/splitXY.dml
----------------------------------------------------------------------
diff --git a/scripts/utils/splitXY.dml b/scripts/utils/splitXY.dml
index 7d5fc24..82027a4 100644
--- a/scripts/utils/splitXY.dml
+++ b/scripts/utils/splitXY.dml
@@ -1,62 +1,62 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-# Utility script to split X into new X and Y.
-#
-# Parameters:
-#    X       : (input)  filename of data matrix
-#    y       : (default ncol(X))  colIndex
-#    OX      : (output) filename of output matrix with all columns except y
-#    OY      : (output) filename of output matrix with y column
-#    ofmt    : (default binary) format of OX and OY output matrix
-#
-# Example:
-#   hadoop jar SystemML.jar -f algorithms/utils/splitXY.dml -nvargs X="/tmp/X.mtx" y=50 OX="/tmp/OX.mtx  OY="/tmp/OY.mtx  
-#
-
-ofmt = ifdef($ofmt, "binary")
-y = ifdef($y, ncol($X))
-
-X = read ($X)
-
-if (y == 1)
-{
-   OX = X[,y+1:ncol(X)]
-   OY = X[,y]
-} 
-else if (y == ncol(X))
-{
-   OX = X[,1:y-1]
-   OY = X[,y]
-} 
-else 
-{
-   OX1 = X[,1:y-1]
-   OX2 = X[,y+1:ncol(X)]
-   OX = append (OX1, OX2)
-   OY = X[,y]
-}
-
-# Write output
-write (OX, $OX, format=ofmt)
-write (OY, $OY, format=ofmt)
-
-
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Utility script to split X into new X and Y.
+#
+# Parameters:
+#    X       : (input)  filename of data matrix
+#    y       : (default ncol(X))  colIndex
+#    OX      : (output) filename of output matrix with all columns except y
+#    OY      : (output) filename of output matrix with y column
+#    ofmt    : (default binary) format of OX and OY output matrix
+#
+# Example:
+#   hadoop jar SystemML.jar -f algorithms/utils/splitXY.dml -nvargs X="/tmp/X.mtx" y=50 OX="/tmp/OX.mtx  OY="/tmp/OY.mtx  
+#
+
+ofmt = ifdef($ofmt, "binary")
+y = ifdef($y, ncol($X))
+
+X = read ($X)
+
+if (y == 1)
+{
+   OX = X[,y+1:ncol(X)]
+   OY = X[,y]
+} 
+else if (y == ncol(X))
+{
+   OX = X[,1:y-1]
+   OY = X[,y]
+} 
+else 
+{
+   OX1 = X[,1:y-1]
+   OX2 = X[,y+1:ncol(X)]
+   OX = append (OX1, OX2)
+   OY = X[,y]
+}
+
+# Write output
+write (OX, $OX, format=ofmt)
+write (OY, $OY, format=ofmt)
+
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/scripts/utils/write.dml
----------------------------------------------------------------------
diff --git a/scripts/utils/write.dml b/scripts/utils/write.dml
index 7861a3a..f1c81e4 100644
--- a/scripts/utils/write.dml
+++ b/scripts/utils/write.dml
@@ -1,39 +1,39 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-# Utility script to change format of X.
-#
-# Parameters:
-#    I       : (input)  filename of input
-#    O       : (output) filename of output
-#    ofmt    : format of O: "csv", "binary"
-#    sep     : default ","; CSV separator in output
-#    header  : default "FALSE"; CSV header: TRUE | FALSE
-#
-# Example:
-#   hadoop jar SystemML.jar -f algorithms/utils/write.dml -nvargs I="/tmp/X.mtx" O="/tmp/X2.mtx" ofmt="binary" sep="|" header=TRUE
-#
-
-sep = ifdef($sep, ",")
-header = ifdef($header, FALSE )
-
-M = read($I)
-write(M, $O, format=$ofmt, sep=sep, header=header)
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Utility script to change format of X.
+#
+# Parameters:
+#    I       : (input)  filename of input
+#    O       : (output) filename of output
+#    ofmt    : format of O: "csv", "binary"
+#    sep     : default ","; CSV separator in output
+#    header  : default "FALSE"; CSV header: TRUE | FALSE
+#
+# Example:
+#   hadoop jar SystemML.jar -f algorithms/utils/write.dml -nvargs I="/tmp/X.mtx" O="/tmp/X2.mtx" ofmt="binary" sep="|" header=TRUE
+#
+
+sep = ifdef($sep, ",")
+header = ifdef($header, FALSE )
+
+M = read($I)
+write(M, $O, format=$ofmt, sep=sep, header=header)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/lops/BinaryScalar.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/BinaryScalar.java b/src/main/java/org/apache/sysml/lops/BinaryScalar.java
index 0f423e3..f61c145 100644
--- a/src/main/java/org/apache/sysml/lops/BinaryScalar.java
+++ b/src/main/java/org/apache/sysml/lops/BinaryScalar.java
@@ -1,197 +1,197 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.lops;
-
-
- 
-import org.apache.sysml.lops.LopProperties.ExecLocation;
-import org.apache.sysml.lops.LopProperties.ExecType;
-import org.apache.sysml.lops.compile.JobType;
-import org.apache.sysml.parser.Expression.*;
-
-/**
- * Lop to perform binary scalar operations. Both inputs must be scalars.
- * Example i = j + k, i = i + 1. 
- */
-
-public class BinaryScalar extends Lop 
-{	
-	
-	public enum OperationTypes {
-		ADD, SUBTRACT, SUBTRACTRIGHT, MULTIPLY, DIVIDE, MODULUS, INTDIV,
-		LESS_THAN, LESS_THAN_OR_EQUALS, GREATER_THAN, GREATER_THAN_OR_EQUALS, EQUALS, NOT_EQUALS,
-		AND, OR, 
-		LOG,POW,MAX,MIN,PRINT,
-		IQSIZE,
-		Over,
-	}
-	
-	OperationTypes operation;
-
-	/**
-	 * This overloaded constructor is used for setting exec type in case of spark backend
-	 */
-	public BinaryScalar(Lop input1, Lop input2, OperationTypes op, DataType dt, ValueType vt, ExecType et) 
-	{
-		super(Lop.Type.BinaryCP, dt, vt);		
-		operation = op;		
-		this.addInput(input1);
-		this.addInput(input2);
-		input1.addOutput(this);
-		input2.addOutput(this);
-
-		boolean breaksAlignment = false; // this field does not carry any meaning for this lop
-		boolean aligner = false;
-		boolean definesMRJob = false;
-		lps.addCompatibility(JobType.INVALID);
-		this.lps.setProperties(inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob );
-	}
-	
-	/**
-	 * Constructor to perform a scalar operation
-	 * @param input
-	 * @param op
-	 */
-
-	public BinaryScalar(Lop input1, Lop input2, OperationTypes op, DataType dt, ValueType vt) 
-	{
-		super(Lop.Type.BinaryCP, dt, vt);		
-		operation = op;		
-		this.addInput(input1);
-		this.addInput(input2);
-		input1.addOutput(this);
-		input2.addOutput(this);
-
-		boolean breaksAlignment = false; // this field does not carry any meaning for this lop
-		boolean aligner = false;
-		boolean definesMRJob = false;
-		lps.addCompatibility(JobType.INVALID);
-		this.lps.setProperties(inputs, ExecType.CP, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob );
-	}
-
-	@Override
-	public String toString() {
-		return "Operation: " + operation;
-	}
-	
-	public OperationTypes getOperationType(){
-		return operation;
-	}
-
-	@Override
-	public String getInstructions(String input1, String input2, String output) throws LopsException
-	{
-		String opString = getOpcode( operation );
-		
-		
-		
-		StringBuilder sb = new StringBuilder();
-		
-		sb.append(getExecType());
-		sb.append(Lop.OPERAND_DELIMITOR);
-		
-		sb.append( opString );
-		sb.append( OPERAND_DELIMITOR );
-		
-		sb.append( getInputs().get(0).prepScalarInputOperand(getExecType()) );
-		sb.append( OPERAND_DELIMITOR );
-		
-		sb.append( getInputs().get(1).prepScalarInputOperand(getExecType()));
-		sb.append( OPERAND_DELIMITOR );
-		
-		sb.append( prepOutputOperand(output));
-
-		return sb.toString();
-	}
-	
-	@Override
-	public Lop.SimpleInstType getSimpleInstructionType()
-	{
-		switch (operation){
- 
-		default:
-			return SimpleInstType.Scalar;
-		}
-	}
-	
-	/**
-	 * 
-	 * @param op
-	 * @return
-	 */
-	public static String getOpcode( OperationTypes op )
-	{
-		switch ( op ) 
-		{
-			/* Arithmetic */
-			case ADD:
-				return "+";
-			case SUBTRACT:
-				return "-";
-			case MULTIPLY:
-				return "*";
-			case DIVIDE:
-				return "/";
-			case MODULUS:
-				return "%%";	
-			case INTDIV:
-				return "%/%";	
-			case POW:	
-				return "^";
-				
-			/* Relational */
-			case LESS_THAN:
-				return "<";
-			case LESS_THAN_OR_EQUALS:
-				return "<=";
-			case GREATER_THAN:
-				return ">";
-			case GREATER_THAN_OR_EQUALS:
-				return ">=";
-			case EQUALS:
-				return "==";
-			case NOT_EQUALS:
-				return "!=";
-			
-			/* Boolean */
-			case AND:
-				return "&&";
-			case OR:
-				return "||";
-			
-			/* Builtin Functions */
-			case LOG:
-				return "log";
-			case MIN:
-				return "min"; 
-			case MAX:
-				return "max"; 
-			
-			case PRINT:
-				return "print";
-				
-			case IQSIZE:
-				return "iqsize"; 
-				
-			default:
-				throw new UnsupportedOperationException("Instruction is not defined for BinaryScalar operator: " + op);
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.lops;
+
+
+ 
+import org.apache.sysml.lops.LopProperties.ExecLocation;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.lops.compile.JobType;
+import org.apache.sysml.parser.Expression.*;
+
+/**
+ * Lop to perform binary scalar operations. Both inputs must be scalars.
+ * Example i = j + k, i = i + 1. 
+ */
+
+public class BinaryScalar extends Lop 
+{	
+	
+	public enum OperationTypes {
+		ADD, SUBTRACT, SUBTRACTRIGHT, MULTIPLY, DIVIDE, MODULUS, INTDIV,
+		LESS_THAN, LESS_THAN_OR_EQUALS, GREATER_THAN, GREATER_THAN_OR_EQUALS, EQUALS, NOT_EQUALS,
+		AND, OR, 
+		LOG,POW,MAX,MIN,PRINT,
+		IQSIZE,
+		Over,
+	}
+	
+	OperationTypes operation;
+
+	/**
+	 * This overloaded constructor is used for setting exec type in case of spark backend
+	 */
+	public BinaryScalar(Lop input1, Lop input2, OperationTypes op, DataType dt, ValueType vt, ExecType et) 
+	{
+		super(Lop.Type.BinaryCP, dt, vt);		
+		operation = op;		
+		this.addInput(input1);
+		this.addInput(input2);
+		input1.addOutput(this);
+		input2.addOutput(this);
+
+		boolean breaksAlignment = false; // this field does not carry any meaning for this lop
+		boolean aligner = false;
+		boolean definesMRJob = false;
+		lps.addCompatibility(JobType.INVALID);
+		this.lps.setProperties(inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob );
+	}
+	
+	/**
+	 * Constructor to perform a scalar operation
+	 * @param input
+	 * @param op
+	 */
+
+	public BinaryScalar(Lop input1, Lop input2, OperationTypes op, DataType dt, ValueType vt) 
+	{
+		super(Lop.Type.BinaryCP, dt, vt);		
+		operation = op;		
+		this.addInput(input1);
+		this.addInput(input2);
+		input1.addOutput(this);
+		input2.addOutput(this);
+
+		boolean breaksAlignment = false; // this field does not carry any meaning for this lop
+		boolean aligner = false;
+		boolean definesMRJob = false;
+		lps.addCompatibility(JobType.INVALID);
+		this.lps.setProperties(inputs, ExecType.CP, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob );
+	}
+
+	@Override
+	public String toString() {
+		return "Operation: " + operation;
+	}
+	
+	public OperationTypes getOperationType(){
+		return operation;
+	}
+
+	@Override
+	public String getInstructions(String input1, String input2, String output) throws LopsException
+	{
+		String opString = getOpcode( operation );
+		
+		
+		
+		StringBuilder sb = new StringBuilder();
+		
+		sb.append(getExecType());
+		sb.append(Lop.OPERAND_DELIMITOR);
+		
+		sb.append( opString );
+		sb.append( OPERAND_DELIMITOR );
+		
+		sb.append( getInputs().get(0).prepScalarInputOperand(getExecType()) );
+		sb.append( OPERAND_DELIMITOR );
+		
+		sb.append( getInputs().get(1).prepScalarInputOperand(getExecType()));
+		sb.append( OPERAND_DELIMITOR );
+		
+		sb.append( prepOutputOperand(output));
+
+		return sb.toString();
+	}
+	
+	@Override
+	public Lop.SimpleInstType getSimpleInstructionType()
+	{
+		switch (operation){
+ 
+		default:
+			return SimpleInstType.Scalar;
+		}
+	}
+	
+	/**
+	 * 
+	 * @param op
+	 * @return
+	 */
+	public static String getOpcode( OperationTypes op )
+	{
+		switch ( op ) 
+		{
+			/* Arithmetic */
+			case ADD:
+				return "+";
+			case SUBTRACT:
+				return "-";
+			case MULTIPLY:
+				return "*";
+			case DIVIDE:
+				return "/";
+			case MODULUS:
+				return "%%";	
+			case INTDIV:
+				return "%/%";	
+			case POW:	
+				return "^";
+				
+			/* Relational */
+			case LESS_THAN:
+				return "<";
+			case LESS_THAN_OR_EQUALS:
+				return "<=";
+			case GREATER_THAN:
+				return ">";
+			case GREATER_THAN_OR_EQUALS:
+				return ">=";
+			case EQUALS:
+				return "==";
+			case NOT_EQUALS:
+				return "!=";
+			
+			/* Boolean */
+			case AND:
+				return "&&";
+			case OR:
+				return "||";
+			
+			/* Builtin Functions */
+			case LOG:
+				return "log";
+			case MIN:
+				return "min"; 
+			case MAX:
+				return "max"; 
+			
+			case PRINT:
+				return "print";
+				
+			case IQSIZE:
+				return "iqsize"; 
+				
+			default:
+				throw new UnsupportedOperationException("Instruction is not defined for BinaryScalar operator: " + op);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/parser/dml/Dml.g4
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/dml/Dml.g4 b/src/main/java/org/apache/sysml/parser/dml/Dml.g4
index 9d07dc9..bada11e 100644
--- a/src/main/java/org/apache/sysml/parser/dml/Dml.g4
+++ b/src/main/java/org/apache/sysml/parser/dml/Dml.g4
@@ -1,201 +1,201 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-grammar Dml;
-
-@header
-{
-	// Commenting the package name and explicitly passing it in build.xml to maintain compatibility with maven plugin
-    // package org.apache.sysml.parser.dml;
-}
-
-// DML Program is a list of expression
-// For now, we only allow global function definitions (not nested or inside a while block)
-dmlprogram: (blocks+=statement | functionBlocks+=functionStatement)* EOF;
-
-statement returns [ StatementInfo info ]
-@init {
-       // This actions occurs regardless of how many alternatives in this rule
-       $info = new StatementInfo();
-} :
-    // ------------------------------------------
-    // ImportStatement
-    'source' '(' filePath = STRING ')'  'as' namespace=ID ';'*       # ImportStatement
-    | 'setwd'  '(' pathValue = STRING ')' ';'*                          # PathStatement
-    // ------------------------------------------
-    // Treat function call as AssignmentStatement or MultiAssignmentStatement
-    // For backward compatibility and also since the behavior of foo() * A + foo() ... where foo returns A
-    // Convert FunctionCallIdentifier(paramExprs, ..) -> source
-    | // TODO: Throw an informative error if user doesnot provide the optional assignment
-    ( targetList+=dataIdentifier ('='|'<-') )? name=ID '(' (paramExprs+=parameterizedExpression (',' paramExprs+=parameterizedExpression)* )? ')' ';'*  # FunctionCallAssignmentStatement
-    | '[' targetList+=dataIdentifier (',' targetList+=dataIdentifier)* ']' ('='|'<-') name=ID '(' (paramExprs+=parameterizedExpression (',' paramExprs+=parameterizedExpression)* )? ')' ';'*  # FunctionCallMultiAssignmentStatement
-    // {notifyErrorListeners("Too many parentheses");}
-    // ------------------------------------------
-    // AssignmentStatement
-    | targetList+=dataIdentifier op=('<-'|'=') 'ifdef' '(' commandLineParam=dataIdentifier ','  source=expression ')' ';'*   # IfdefAssignmentStatement
-    | targetList+=dataIdentifier op=('<-'|'=') source=expression ';'*   # AssignmentStatement
-    // ------------------------------------------
-    // We don't support block statement
-    // | '{' body+=expression ';'* ( body+=expression ';'* )*  '}' # BlockStatement
-    // ------------------------------------------
-    // IfStatement
-    | 'if' '(' predicate=expression ')' (ifBody+=statement ';'* | '{' (ifBody+=statement ';'*)*  '}')  ('else' (elseBody+=statement ';'* | '{' (elseBody+=statement ';'*)*  '}'))?  # IfStatement
-    // ------------------------------------------
-    // ForStatement & ParForStatement
-    | 'for' '(' iterVar=ID 'in' iterPred=iterablePredicate (',' parForParams+=strictParameterizedExpression)* ')' (body+=statement ';'* | '{' (body+=statement ';'* )*  '}')  # ForStatement
-    // Convert strictParameterizedExpression to HashMap<String, String> for parForParams
-    | 'parfor' '(' iterVar=ID 'in' iterPred=iterablePredicate (',' parForParams+=strictParameterizedExpression)* ')' (body+=statement ';'* | '{' (body+=statement ';'*)*  '}')  # ParForStatement
-    | 'while' '(' predicate=expression ')' (body+=statement ';'* | '{' (body+=statement ';'*)* '}')  # WhileStatement
-    // ------------------------------------------
-;
-
-iterablePredicate returns [ ExpressionInfo info ]
-  @init {
-         // This actions occurs regardless of how many alternatives in this rule
-         $info = new ExpressionInfo();
-  } :
-    from=expression ':' to=expression #IterablePredicateColonExpression
-    | ID '(' from=expression ',' to=expression ',' increment=expression ')' #IterablePredicateSeqExpression
-    ;
-
-functionStatement returns [ StatementInfo info ]
-@init {
-       // This actions occurs regardless of how many alternatives in this rule
-       $info = new StatementInfo();
-} :
-    // ------------------------------------------
-    // FunctionStatement & ExternalFunctionStatement
-    // small change: only allow typed arguments here ... instead of data identifier
-    name=ID ('<-'|'=') 'function' '(' ( inputParams+=typedArgNoAssign (',' inputParams+=typedArgNoAssign)* )? ')'  ( 'return' '(' ( outputParams+=typedArgNoAssign (',' outputParams+=typedArgNoAssign)* )? ')' )? '{' (body+=statement ';'*)* '}' # InternalFunctionDefExpression
-    | name=ID ('<-'|'=') 'externalFunction' '(' ( inputParams+=typedArgNoAssign (',' inputParams+=typedArgNoAssign)* )? ')'  ( 'return' '(' ( outputParams+=typedArgNoAssign (',' outputParams+=typedArgNoAssign)* )? ')' )?   'implemented' 'in' '(' ( otherParams+=strictParameterizedKeyValueString (',' otherParams+=strictParameterizedKeyValueString)* )? ')' ';'*    # ExternalFunctionDefExpression
-    // ------------------------------------------
-;
-
-
-// Other data identifiers are typedArgNoAssign, parameterizedExpression and strictParameterizedExpression
-dataIdentifier returns [ ExpressionInfo dataInfo ]
-@init {
-       // This actions occurs regardless of how many alternatives in this rule
-       $dataInfo = new ExpressionInfo();
-       // $dataInfo.expr = new org.apache.sysml.parser.DataIdentifier();
-} :
-    // ------------------------------------------
-    // IndexedIdentifier
-    name=ID '[' (rowLower=expression (':' rowUpper=expression)?)? ',' (colLower=expression (':' colUpper=expression)?)? ']' # IndexedExpression
-    // ------------------------------------------
-    | ID                                            # SimpleDataIdentifierExpression
-    | COMMANDLINE_NAMED_ID                          # CommandlineParamExpression
-    | COMMANDLINE_POSITION_ID                       # CommandlinePositionExpression
-;
-expression returns [ ExpressionInfo info ]
-@init {
-       // This actions occurs regardless of how many alternatives in this rule
-       $info = new ExpressionInfo();
-       // $info.expr = new org.apache.sysml.parser.BinaryExpression(org.apache.sysml.parser.Expression.BinaryOp.INVALID);
-} :
-    // ------------------------------------------
-    // BinaryExpression
-    // power
-    <assoc=right> left=expression op='^' right=expression  # PowerExpression
-    // unary plus and minus
-    | op=('-'|'+') left=expression                        # UnaryExpression
-    // sequence - since we are only using this into for
-    //| left=expression op=':' right=expression             # SequenceExpression
-    // matrix multiply
-    | left=expression op='%*%' right=expression           # MatrixMulExpression
-    // modulus and integer division
-    | left=expression op=('%/%' | '%%' ) right=expression # ModIntDivExpression
-    // arithmetic multiply and divide
-    | left=expression op=('*'|'/') right=expression       # MultDivExpression
-    // arithmetic addition and subtraction
-    | left=expression op=('+'|'-') right=expression       # AddSubExpression
-    // ------------------------------------------
-    // RelationalExpression
-    | left=expression op=('>'|'>='|'<'|'<='|'=='|'!=') right=expression # RelationalExpression
-    // ------------------------------------------
-    // BooleanExpression
-    // boolean not
-    | op='!' left=expression # BooleanNotExpression
-    // boolean and
-    | left=expression op=('&'|'&&') right=expression # BooleanAndExpression
-    // boolean or
-    | left=expression op=('|'|'||') right=expression # BooleanOrExpression
-
-    // ---------------------------------
-    // only applicable for builtin function expressions
-    | name=ID '(' (paramExprs+=parameterizedExpression (',' paramExprs+=parameterizedExpression)* )? ')' ';'*  # BuiltinFunctionExpression
-
-    // 4. Atomic
-    | '(' left=expression ')'                       # AtomicExpression
-
-    // Should you allow indexed expression here ?
-    // | '[' targetList+=expression (',' targetList+=expression)* ']'  # MultiIdExpression
-
-    // | BOOLEAN                                       # ConstBooleanIdExpression
-    | 'TRUE'                                        # ConstTrueExpression
-    | 'FALSE'                                       # ConstFalseExpression
-    | INT                                           # ConstIntIdExpression
-    | DOUBLE                                        # ConstDoubleIdExpression
-    | STRING                                        # ConstStringIdExpression
-    | dataIdentifier                                # DataIdExpression
-    // Special
-    // | 'NULL' | 'NA' | 'Inf' | 'NaN'
-;
-
-typedArgNoAssign : paramType=ml_type paramName=ID;
-parameterizedExpression : (paramName=ID '=')? paramVal=expression;
-strictParameterizedExpression : paramName=ID '=' paramVal=expression ;
-strictParameterizedKeyValueString : paramName=ID '=' paramVal=STRING ;
-ID : (ALPHABET (ALPHABET|DIGIT|'_')*  '::')? ALPHABET (ALPHABET|DIGIT|'_')*
-    // Special ID cases:
-   // | 'matrix' // --> This is a special case which causes lot of headache
-   | 'as.scalar' | 'as.matrix' | 'as.double' | 'as.integer' | 'as.logical' | 'index.return' | 'lower.tail'
-;
-// Unfortunately, we have datatype name clashing with builtin function name: matrix :(
-// Therefore, ugly work around for checking datatype
-ml_type :  valueType | dataType '[' valueType ']';
-// Note to reduce number of keywords, these are case-sensitive,
-// To allow case-insenstive,  'int' becomes: ('i' | 'I') ('n' | 'N') ('t' | 'T')
-valueType: 'int' | 'integer' | 'string' | 'boolean' | 'double'
-            | 'Int' | 'Integer' | 'String' | 'Boolean' | 'Double';
-dataType:
-        // 'scalar' # ScalarDataTypeDummyCheck
-        // |
-        ID # MatrixDataTypeCheck //{ if($ID.text.compareTo("matrix") != 0) { notifyErrorListeners("incorrect datatype"); } }
-        //|  'matrix' //---> See ID, this causes lot of headache
-        ;
-INT : DIGIT+  [Ll]?;
-// BOOLEAN : 'TRUE' | 'FALSE';
-DOUBLE: DIGIT+ '.' DIGIT* EXP? [Ll]?
-| DIGIT+ EXP? [Ll]?
-| '.' DIGIT+ EXP? [Ll]?
-;
-DIGIT: '0'..'9';
-ALPHABET : [a-zA-Z] ;
-fragment EXP : ('E' | 'e') ('+' | '-')? INT ;
-COMMANDLINE_NAMED_ID: '$' ALPHABET (ALPHABET|DIGIT|'_')*;
-COMMANDLINE_POSITION_ID: '$' DIGIT+;
-
-// supports single and double quoted string with escape characters
-STRING: '"' ( ESC | ~[\\"] )*? '"' | '\'' ( ESC | ~[\\'] )*? '\'';
-fragment ESC : '\\' [abtnfrv"'\\] ;
-// Comments, whitespaces and new line
-LINE_COMMENT : '#' .*? '\r'? '\n' -> skip ;
-MULTILINE_BLOCK_COMMENT : '/*' .*? '*/' -> skip ;
-WHITESPACE : (' ' | '\t' | '\r' | '\n')+ -> skip ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+grammar Dml;
+
+@header
+{
+	// Commenting the package name and explicitly passing it in build.xml to maintain compatibility with maven plugin
+    // package org.apache.sysml.parser.dml;
+}
+
+// DML Program is a list of expression
+// For now, we only allow global function definitions (not nested or inside a while block)
+dmlprogram: (blocks+=statement | functionBlocks+=functionStatement)* EOF;
+
+statement returns [ StatementInfo info ]
+@init {
+       // This actions occurs regardless of how many alternatives in this rule
+       $info = new StatementInfo();
+} :
+    // ------------------------------------------
+    // ImportStatement
+    'source' '(' filePath = STRING ')'  'as' namespace=ID ';'*       # ImportStatement
+    | 'setwd'  '(' pathValue = STRING ')' ';'*                          # PathStatement
+    // ------------------------------------------
+    // Treat function call as AssignmentStatement or MultiAssignmentStatement
+    // For backward compatibility and also since the behavior of foo() * A + foo() ... where foo returns A
+    // Convert FunctionCallIdentifier(paramExprs, ..) -> source
+    | // TODO: Throw an informative error if user doesnot provide the optional assignment
+    ( targetList+=dataIdentifier ('='|'<-') )? name=ID '(' (paramExprs+=parameterizedExpression (',' paramExprs+=parameterizedExpression)* )? ')' ';'*  # FunctionCallAssignmentStatement
+    | '[' targetList+=dataIdentifier (',' targetList+=dataIdentifier)* ']' ('='|'<-') name=ID '(' (paramExprs+=parameterizedExpression (',' paramExprs+=parameterizedExpression)* )? ')' ';'*  # FunctionCallMultiAssignmentStatement
+    // {notifyErrorListeners("Too many parentheses");}
+    // ------------------------------------------
+    // AssignmentStatement
+    | targetList+=dataIdentifier op=('<-'|'=') 'ifdef' '(' commandLineParam=dataIdentifier ','  source=expression ')' ';'*   # IfdefAssignmentStatement
+    | targetList+=dataIdentifier op=('<-'|'=') source=expression ';'*   # AssignmentStatement
+    // ------------------------------------------
+    // We don't support block statement
+    // | '{' body+=expression ';'* ( body+=expression ';'* )*  '}' # BlockStatement
+    // ------------------------------------------
+    // IfStatement
+    | 'if' '(' predicate=expression ')' (ifBody+=statement ';'* | '{' (ifBody+=statement ';'*)*  '}')  ('else' (elseBody+=statement ';'* | '{' (elseBody+=statement ';'*)*  '}'))?  # IfStatement
+    // ------------------------------------------
+    // ForStatement & ParForStatement
+    | 'for' '(' iterVar=ID 'in' iterPred=iterablePredicate (',' parForParams+=strictParameterizedExpression)* ')' (body+=statement ';'* | '{' (body+=statement ';'* )*  '}')  # ForStatement
+    // Convert strictParameterizedExpression to HashMap<String, String> for parForParams
+    | 'parfor' '(' iterVar=ID 'in' iterPred=iterablePredicate (',' parForParams+=strictParameterizedExpression)* ')' (body+=statement ';'* | '{' (body+=statement ';'*)*  '}')  # ParForStatement
+    | 'while' '(' predicate=expression ')' (body+=statement ';'* | '{' (body+=statement ';'*)* '}')  # WhileStatement
+    // ------------------------------------------
+;
+
+iterablePredicate returns [ ExpressionInfo info ]
+  @init {
+         // This actions occurs regardless of how many alternatives in this rule
+         $info = new ExpressionInfo();
+  } :
+    from=expression ':' to=expression #IterablePredicateColonExpression
+    | ID '(' from=expression ',' to=expression ',' increment=expression ')' #IterablePredicateSeqExpression
+    ;
+
+functionStatement returns [ StatementInfo info ]
+@init {
+       // This actions occurs regardless of how many alternatives in this rule
+       $info = new StatementInfo();
+} :
+    // ------------------------------------------
+    // FunctionStatement & ExternalFunctionStatement
+    // small change: only allow typed arguments here ... instead of data identifier
+    name=ID ('<-'|'=') 'function' '(' ( inputParams+=typedArgNoAssign (',' inputParams+=typedArgNoAssign)* )? ')'  ( 'return' '(' ( outputParams+=typedArgNoAssign (',' outputParams+=typedArgNoAssign)* )? ')' )? '{' (body+=statement ';'*)* '}' # InternalFunctionDefExpression
+    | name=ID ('<-'|'=') 'externalFunction' '(' ( inputParams+=typedArgNoAssign (',' inputParams+=typedArgNoAssign)* )? ')'  ( 'return' '(' ( outputParams+=typedArgNoAssign (',' outputParams+=typedArgNoAssign)* )? ')' )?   'implemented' 'in' '(' ( otherParams+=strictParameterizedKeyValueString (',' otherParams+=strictParameterizedKeyValueString)* )? ')' ';'*    # ExternalFunctionDefExpression
+    // ------------------------------------------
+;
+
+
+// Other data identifiers are typedArgNoAssign, parameterizedExpression and strictParameterizedExpression
+dataIdentifier returns [ ExpressionInfo dataInfo ]
+@init {
+       // This actions occurs regardless of how many alternatives in this rule
+       $dataInfo = new ExpressionInfo();
+       // $dataInfo.expr = new org.apache.sysml.parser.DataIdentifier();
+} :
+    // ------------------------------------------
+    // IndexedIdentifier
+    name=ID '[' (rowLower=expression (':' rowUpper=expression)?)? ',' (colLower=expression (':' colUpper=expression)?)? ']' # IndexedExpression
+    // ------------------------------------------
+    | ID                                            # SimpleDataIdentifierExpression
+    | COMMANDLINE_NAMED_ID                          # CommandlineParamExpression
+    | COMMANDLINE_POSITION_ID                       # CommandlinePositionExpression
+;
+expression returns [ ExpressionInfo info ]
+@init {
+       // This actions occurs regardless of how many alternatives in this rule
+       $info = new ExpressionInfo();
+       // $info.expr = new org.apache.sysml.parser.BinaryExpression(org.apache.sysml.parser.Expression.BinaryOp.INVALID);
+} :
+    // ------------------------------------------
+    // BinaryExpression
+    // power
+    <assoc=right> left=expression op='^' right=expression  # PowerExpression
+    // unary plus and minus
+    | op=('-'|'+') left=expression                        # UnaryExpression
+    // sequence - since we are only using this into for
+    //| left=expression op=':' right=expression             # SequenceExpression
+    // matrix multiply
+    | left=expression op='%*%' right=expression           # MatrixMulExpression
+    // modulus and integer division
+    | left=expression op=('%/%' | '%%' ) right=expression # ModIntDivExpression
+    // arithmetic multiply and divide
+    | left=expression op=('*'|'/') right=expression       # MultDivExpression
+    // arithmetic addition and subtraction
+    | left=expression op=('+'|'-') right=expression       # AddSubExpression
+    // ------------------------------------------
+    // RelationalExpression
+    | left=expression op=('>'|'>='|'<'|'<='|'=='|'!=') right=expression # RelationalExpression
+    // ------------------------------------------
+    // BooleanExpression
+    // boolean not
+    | op='!' left=expression # BooleanNotExpression
+    // boolean and
+    | left=expression op=('&'|'&&') right=expression # BooleanAndExpression
+    // boolean or
+    | left=expression op=('|'|'||') right=expression # BooleanOrExpression
+
+    // ---------------------------------
+    // only applicable for builtin function expressions
+    | name=ID '(' (paramExprs+=parameterizedExpression (',' paramExprs+=parameterizedExpression)* )? ')' ';'*  # BuiltinFunctionExpression
+
+    // 4. Atomic
+    | '(' left=expression ')'                       # AtomicExpression
+
+    // Should you allow indexed expression here ?
+    // | '[' targetList+=expression (',' targetList+=expression)* ']'  # MultiIdExpression
+
+    // | BOOLEAN                                       # ConstBooleanIdExpression
+    | 'TRUE'                                        # ConstTrueExpression
+    | 'FALSE'                                       # ConstFalseExpression
+    | INT                                           # ConstIntIdExpression
+    | DOUBLE                                        # ConstDoubleIdExpression
+    | STRING                                        # ConstStringIdExpression
+    | dataIdentifier                                # DataIdExpression
+    // Special
+    // | 'NULL' | 'NA' | 'Inf' | 'NaN'
+;
+
+typedArgNoAssign : paramType=ml_type paramName=ID;
+parameterizedExpression : (paramName=ID '=')? paramVal=expression;
+strictParameterizedExpression : paramName=ID '=' paramVal=expression ;
+strictParameterizedKeyValueString : paramName=ID '=' paramVal=STRING ;
+ID : (ALPHABET (ALPHABET|DIGIT|'_')*  '::')? ALPHABET (ALPHABET|DIGIT|'_')*
+    // Special ID cases:
+   // | 'matrix' // --> This is a special case which causes lot of headache
+   | 'as.scalar' | 'as.matrix' | 'as.double' | 'as.integer' | 'as.logical' | 'index.return' | 'lower.tail'
+;
+// Unfortunately, we have datatype name clashing with builtin function name: matrix :(
+// Therefore, ugly work around for checking datatype
+ml_type :  valueType | dataType '[' valueType ']';
+// Note to reduce number of keywords, these are case-sensitive,
+// To allow case-insenstive,  'int' becomes: ('i' | 'I') ('n' | 'N') ('t' | 'T')
+valueType: 'int' | 'integer' | 'string' | 'boolean' | 'double'
+            | 'Int' | 'Integer' | 'String' | 'Boolean' | 'Double';
+dataType:
+        // 'scalar' # ScalarDataTypeDummyCheck
+        // |
+        ID # MatrixDataTypeCheck //{ if($ID.text.compareTo("matrix") != 0) { notifyErrorListeners("incorrect datatype"); } }
+        //|  'matrix' //---> See ID, this causes lot of headache
+        ;
+INT : DIGIT+  [Ll]?;
+// BOOLEAN : 'TRUE' | 'FALSE';
+DOUBLE: DIGIT+ '.' DIGIT* EXP? [Ll]?
+| DIGIT+ EXP? [Ll]?
+| '.' DIGIT+ EXP? [Ll]?
+;
+DIGIT: '0'..'9';
+ALPHABET : [a-zA-Z] ;
+fragment EXP : ('E' | 'e') ('+' | '-')? INT ;
+COMMANDLINE_NAMED_ID: '$' ALPHABET (ALPHABET|DIGIT|'_')*;
+COMMANDLINE_POSITION_ID: '$' DIGIT+;
+
+// supports single and double quoted string with escape characters
+STRING: '"' ( ESC | ~[\\"] )*? '"' | '\'' ( ESC | ~[\\'] )*? '\'';
+fragment ESC : '\\' [abtnfrv"'\\] ;
+// Comments, whitespaces and new line
+LINE_COMMENT : '#' .*? '\r'? '\n' -> skip ;
+MULTILINE_BLOCK_COMMENT : '/*' .*? '*/' -> skip ;
+WHITESPACE : (' ' | '\t' | '\r' | '\n')+ -> skip ;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index b2d37f9..77d5282 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -1,292 +1,292 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.Counters.Group;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.conf.DMLConfig;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
-import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
-import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
-import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
-import org.apache.sysml.runtime.instructions.cp.Data;
-import org.apache.sysml.runtime.io.MatrixReader;
-import org.apache.sysml.runtime.matrix.data.InputInfo;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.MapReduceTool;
-import org.apache.sysml.utils.Statistics;
-import org.apache.sysml.yarn.DMLAppMasterUtils;
-
-/**
- * MR job class for submitting parfor remote MR jobs, controlling its execution and obtaining results.
- * 
- *
- */
-public class RemoteDPParForMR
-{
-	
-	protected static final Log LOG = LogFactory.getLog(RemoteDPParForMR.class.getName());
-	
-	/**
-	 * 
-	 * @param pfid
-	 * @param program
-	 * @param taskFile
-	 * @param resultFile
-	 * @param enableCPCaching 
-	 * @param mode
-	 * @param numMappers
-	 * @param replication
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, String resultFile, MatrixObject input, 
-			                                   PDataPartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params
-			                                   boolean enableCPCaching, int numReducers, int replication, int max_retry)  //opt params
-		throws DMLRuntimeException
-	{
-		RemoteParForJobReturn ret = null;
-		String jobname = "ParFor-DPEMR";
-		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-		
-		JobConf job;
-		job = new JobConf( RemoteDPParForMR.class );
-		job.setJobName(jobname+pfid);
-		
-		//maintain dml script counters
-		Statistics.incrementNoOfCompiledMRJobs();
-	
-		try
-		{
-			/////
-			//configure the MR job
-		
-			//set arbitrary CP program blocks that will perform in the reducers
-			MRJobConfiguration.setProgramBlocks(job, program); 
-
-			//enable/disable caching
-			MRJobConfiguration.setParforCachingConfig(job, enableCPCaching);
-		
-			//setup input matrix
-			Path path = new Path( input.getFileName() );
-			long rlen = input.getNumRows();
-			long clen = input.getNumColumns();
-			int brlen = (int) input.getNumRowsPerBlock();
-			int bclen = (int) input.getNumColumnsPerBlock();
-			MRJobConfiguration.setPartitioningInfo(job, rlen, clen, brlen, bclen, InputInfo.BinaryBlockInputInfo, oi, dpf, 1, input.getFileName(), itervar, matrixvar, tSparseCol);
-			job.setInputFormat(InputInfo.BinaryBlockInputInfo.inputFormatClass);
-			FileInputFormat.setInputPaths(job, path);
-			
-			//set mapper and reducers classes
-			job.setMapperClass(DataPartitionerRemoteMapper.class); 
-			job.setReducerClass(RemoteDPParWorkerReducer.class); 
-			
-		    //set output format
-		    job.setOutputFormat(SequenceFileOutputFormat.class);
-		    
-		    //set output path
-		    MapReduceTool.deleteFileIfExistOnHDFS(resultFile);
-		    FileOutputFormat.setOutputPath(job, new Path(resultFile));
-		    
-			//set the output key, value schema
-		    
-		    //parfor partitioning outputs (intermediates)
-		    job.setMapOutputKeyClass(LongWritable.class);
-		    if( oi == OutputInfo.BinaryBlockOutputInfo )
-		    	job.setMapOutputValueClass(PairWritableBlock.class); 
-		    else if( oi == OutputInfo.BinaryCellOutputInfo )
-		    	job.setMapOutputValueClass(PairWritableCell.class);
-		    else 
-		    	throw new DMLRuntimeException("Unsupported intermrediate output info: "+oi);
-		    //parfor exec output
-		    job.setOutputKeyClass(LongWritable.class);
-			job.setOutputValueClass(Text.class);
-			
-			//////
-			//set optimization parameters
-
-			//set the number of mappers and reducers 
-			job.setNumReduceTasks( numReducers );			
-			
-			//disable automatic tasks timeouts and speculative task exec
-			job.setInt("mapred.task.timeout", 0);			
-			job.setMapSpeculativeExecution(false);
-			
-			//set up preferred custom serialization framework for binary block format
-			if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
-				MRJobConfiguration.addBinaryBlockSerializationFramework( job );
-	
-			//set up map/reduce memory configurations (if in AM context)
-			DMLConfig config = ConfigurationManager.getConfig();
-			DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, config);
-			
-			//disable JVM reuse
-			job.setNumTasksToExecutePerJvm( 1 ); //-1 for unlimited 
-			
-			//set the replication factor for the results
-			job.setInt("dfs.replication", replication);
-			
-			//set the max number of retries per map task
-			//note: currently disabled to use cluster config
-			//job.setInt("mapreduce.map.maxattempts", max_retry);
-			
-			//set unique working dir
-			MRJobConfiguration.setUniqueWorkingDir(job);
-			
-			/////
-			// execute the MR job			
-			RunningJob runjob = JobClient.runJob(job);
-			
-			// Process different counters 
-			Statistics.incrementNoOfExecutedMRJobs();
-			Group pgroup = runjob.getCounters().getGroup(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME);
-			int numTasks = (int)pgroup.getCounter( Stat.PARFOR_NUMTASKS.toString() );
-			int numIters = (int)pgroup.getCounter( Stat.PARFOR_NUMITERS.toString() );
-			if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode() ) {
-				Statistics.incrementJITCompileTime( pgroup.getCounter( Stat.PARFOR_JITCOMPILE.toString() ) );
-				Statistics.incrementJVMgcCount( pgroup.getCounter( Stat.PARFOR_JVMGC_COUNT.toString() ) );
-				Statistics.incrementJVMgcTime( pgroup.getCounter( Stat.PARFOR_JVMGC_TIME.toString() ) );
-				Group cgroup = runjob.getCounters().getGroup(CacheableData.CACHING_COUNTER_GROUP_NAME.toString());
-				CacheStatistics.incrementMemHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_MEM.toString() ));
-				CacheStatistics.incrementFSBuffHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString() ));
-				CacheStatistics.incrementFSHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_FS.toString() ));
-				CacheStatistics.incrementHDFSHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_HDFS.toString() ));
-				CacheStatistics.incrementFSBuffWrites((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString() ));
-				CacheStatistics.incrementFSWrites((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_FS.toString() ));
-				CacheStatistics.incrementHDFSWrites((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_HDFS.toString() ));
-				CacheStatistics.incrementAcquireRTime(cgroup.getCounter( CacheStatistics.Stat.CACHE_TIME_ACQR.toString() ));
-				CacheStatistics.incrementAcquireMTime(cgroup.getCounter( CacheStatistics.Stat.CACHE_TIME_ACQM.toString() ));
-				CacheStatistics.incrementReleaseTime(cgroup.getCounter( CacheStatistics.Stat.CACHE_TIME_RLS.toString() ));
-				CacheStatistics.incrementExportTime(cgroup.getCounter( CacheStatistics.Stat.CACHE_TIME_EXP.toString() ));
-			}
-				
-			// read all files of result variables and prepare for return
-			LocalVariableMap[] results = readResultFile(job, resultFile); 
-
-			ret = new RemoteParForJobReturn(runjob.isSuccessful(), 
-					                        numTasks, numIters, 
-					                        results);  	
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-		finally
-		{
-			// remove created files 
-			try
-			{
-				MapReduceTool.deleteFileIfExistOnHDFS(new Path(resultFile), job);
-			}
-			catch(IOException ex)
-			{
-				throw new DMLRuntimeException(ex);
-			}
-		}
-		
-		if( DMLScript.STATISTICS ){
-			long t1 = System.nanoTime();
-			Statistics.maintainCPHeavyHitters("MR-Job_"+jobname, t1-t0);
-		}
-		
-		return ret;
-	}
-	
-
-	/**
-	 * Result file contains hierarchy of workerID-resultvar(incl filename). We deduplicate
-	 * on the workerID. Without JVM reuse each task refers to a unique workerID, so we
-	 * will not find any duplicates. With JVM reuse, however, each slot refers to a workerID, 
-	 * and there are duplicate filenames due to partial aggregation and overwrite of fname 
-	 * (the RemoteParWorkerMapper ensures uniqueness of those files independent of the 
-	 * runtime implementation). 
-	 * 
-	 * @param job 
-	 * @param fname
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("deprecation")
-	public static LocalVariableMap [] readResultFile( JobConf job, String fname )
-		throws DMLRuntimeException, IOException
-	{
-		HashMap<Long,LocalVariableMap> tmp = new HashMap<Long,LocalVariableMap>();
-
-		FileSystem fs = FileSystem.get(job);
-		Path path = new Path(fname);
-		LongWritable key = new LongWritable(); //workerID
-		Text value = new Text();               //serialized var header (incl filename)
-		
-		int countAll = 0;
-		for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
-		{
-			SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(job),lpath,job);
-			try
-			{
-				while( reader.next(key, value) )
-				{
-					//System.out.println("key="+key.get()+", value="+value.toString());
-					if( !tmp.containsKey( key.get() ) )
-		        		tmp.put(key.get(), new LocalVariableMap ());	   
-					Object[] dat = ProgramConverter.parseDataObject( value.toString() );
-		        	tmp.get( key.get() ).put((String)dat[0], (Data)dat[1]);
-		        	countAll++;
-				}
-			}	
-			finally
-			{
-				if( reader != null )
-					reader.close();
-			}
-		}		
-
-		LOG.debug("Num remote worker results (before deduplication): "+countAll);
-		LOG.debug("Num remote worker results: "+tmp.size());
-
-		//create return array
-		return tmp.values().toArray(new LocalVariableMap[0]);	
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.parfor;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
+import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
+import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
+import org.apache.sysml.runtime.instructions.cp.Data;
+import org.apache.sysml.runtime.io.MatrixReader;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.utils.Statistics;
+import org.apache.sysml.yarn.DMLAppMasterUtils;
+
+/**
+ * MR job class for submitting parfor remote MR jobs, controlling its execution and obtaining results.
+ * 
+ *
+ */
+public class RemoteDPParForMR
+{
+	
+	protected static final Log LOG = LogFactory.getLog(RemoteDPParForMR.class.getName());
+	
+	/**
+	 * 
+	 * @param pfid
+	 * @param program
+	 * @param taskFile
+	 * @param resultFile
+	 * @param enableCPCaching 
+	 * @param mode
+	 * @param numMappers
+	 * @param replication
+	 * @return
+	 * @throws DMLRuntimeException
+	 */
+	public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, String resultFile, MatrixObject input, 
+			                                   PDataPartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params
+			                                   boolean enableCPCaching, int numReducers, int replication, int max_retry)  //opt params
+		throws DMLRuntimeException
+	{
+		RemoteParForJobReturn ret = null;
+		String jobname = "ParFor-DPEMR";
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+		
+		JobConf job;
+		job = new JobConf( RemoteDPParForMR.class );
+		job.setJobName(jobname+pfid);
+		
+		//maintain dml script counters
+		Statistics.incrementNoOfCompiledMRJobs();
+	
+		try
+		{
+			/////
+			//configure the MR job
+		
+			//set arbitrary CP program blocks that will perform in the reducers
+			MRJobConfiguration.setProgramBlocks(job, program); 
+
+			//enable/disable caching
+			MRJobConfiguration.setParforCachingConfig(job, enableCPCaching);
+		
+			//setup input matrix
+			Path path = new Path( input.getFileName() );
+			long rlen = input.getNumRows();
+			long clen = input.getNumColumns();
+			int brlen = (int) input.getNumRowsPerBlock();
+			int bclen = (int) input.getNumColumnsPerBlock();
+			MRJobConfiguration.setPartitioningInfo(job, rlen, clen, brlen, bclen, InputInfo.BinaryBlockInputInfo, oi, dpf, 1, input.getFileName(), itervar, matrixvar, tSparseCol);
+			job.setInputFormat(InputInfo.BinaryBlockInputInfo.inputFormatClass);
+			FileInputFormat.setInputPaths(job, path);
+			
+			//set mapper and reducers classes
+			job.setMapperClass(DataPartitionerRemoteMapper.class); 
+			job.setReducerClass(RemoteDPParWorkerReducer.class); 
+			
+		    //set output format
+		    job.setOutputFormat(SequenceFileOutputFormat.class);
+		    
+		    //set output path
+		    MapReduceTool.deleteFileIfExistOnHDFS(resultFile);
+		    FileOutputFormat.setOutputPath(job, new Path(resultFile));
+		    
+			//set the output key, value schema
+		    
+		    //parfor partitioning outputs (intermediates)
+		    job.setMapOutputKeyClass(LongWritable.class);
+		    if( oi == OutputInfo.BinaryBlockOutputInfo )
+		    	job.setMapOutputValueClass(PairWritableBlock.class); 
+		    else if( oi == OutputInfo.BinaryCellOutputInfo )
+		    	job.setMapOutputValueClass(PairWritableCell.class);
+		    else 
+		    	throw new DMLRuntimeException("Unsupported intermrediate output info: "+oi);
+		    //parfor exec output
+		    job.setOutputKeyClass(LongWritable.class);
+			job.setOutputValueClass(Text.class);
+			
+			//////
+			//set optimization parameters
+
+			//set the number of mappers and reducers 
+			job.setNumReduceTasks( numReducers );			
+			
+			//disable automatic tasks timeouts and speculative task exec
+			job.setInt("mapred.task.timeout", 0);			
+			job.setMapSpeculativeExecution(false);
+			
+			//set up preferred custom serialization framework for binary block format
+			if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
+				MRJobConfiguration.addBinaryBlockSerializationFramework( job );
+	
+			//set up map/reduce memory configurations (if in AM context)
+			DMLConfig config = ConfigurationManager.getConfig();
+			DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, config);
+			
+			//disable JVM reuse
+			job.setNumTasksToExecutePerJvm( 1 ); //-1 for unlimited 
+			
+			//set the replication factor for the results
+			job.setInt("dfs.replication", replication);
+			
+			//set the max number of retries per map task
+			//note: currently disabled to use cluster config
+			//job.setInt("mapreduce.map.maxattempts", max_retry);
+			
+			//set unique working dir
+			MRJobConfiguration.setUniqueWorkingDir(job);
+			
+			/////
+			// execute the MR job			
+			RunningJob runjob = JobClient.runJob(job);
+			
+			// Process different counters 
+			Statistics.incrementNoOfExecutedMRJobs();
+			Group pgroup = runjob.getCounters().getGroup(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME);
+			int numTasks = (int)pgroup.getCounter( Stat.PARFOR_NUMTASKS.toString() );
+			int numIters = (int)pgroup.getCounter( Stat.PARFOR_NUMITERS.toString() );
+			if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode() ) {
+				Statistics.incrementJITCompileTime( pgroup.getCounter( Stat.PARFOR_JITCOMPILE.toString() ) );
+				Statistics.incrementJVMgcCount( pgroup.getCounter( Stat.PARFOR_JVMGC_COUNT.toString() ) );
+				Statistics.incrementJVMgcTime( pgroup.getCounter( Stat.PARFOR_JVMGC_TIME.toString() ) );
+				Group cgroup = runjob.getCounters().getGroup(CacheableData.CACHING_COUNTER_GROUP_NAME.toString());
+				CacheStatistics.incrementMemHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_MEM.toString() ));
+				CacheStatistics.incrementFSBuffHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString() ));
+				CacheStatistics.incrementFSHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_FS.toString() ));
+				CacheStatistics.incrementHDFSHits((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_HITS_HDFS.toString() ));
+				CacheStatistics.incrementFSBuffWrites((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString() ));
+				CacheStatistics.incrementFSWrites((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_FS.toString() ));
+				CacheStatistics.incrementHDFSWrites((int)cgroup.getCounter( CacheStatistics.Stat.CACHE_WRITES_HDFS.toString() ));
+				CacheStatistics.incrementAcquireRTime(cgroup.getCounter( CacheStatistics.Stat.CACHE_TIME_ACQR.toString() ));
+				CacheStatistics.incrementAcquireMTime(cgroup.getCounter( CacheStatistics.Stat.CACHE_TIME_ACQM.toString() ));
+				CacheStatistics.incrementReleaseTime(cgroup.getCounter( CacheStatistics.Stat.CACHE_TIME_RLS.toString() ));
+				CacheStatistics.incrementExportTime(cgroup.getCounter( CacheStatistics.Stat.CACHE_TIME_EXP.toString() ));
+			}
+				
+			// read all files of result variables and prepare for return
+			LocalVariableMap[] results = readResultFile(job, resultFile); 
+
+			ret = new RemoteParForJobReturn(runjob.isSuccessful(), 
+					                        numTasks, numIters, 
+					                        results);  	
+		}
+		catch(Exception ex)
+		{
+			throw new DMLRuntimeException(ex);
+		}
+		finally
+		{
+			// remove created files 
+			try
+			{
+				MapReduceTool.deleteFileIfExistOnHDFS(new Path(resultFile), job);
+			}
+			catch(IOException ex)
+			{
+				throw new DMLRuntimeException(ex);
+			}
+		}
+		
+		if( DMLScript.STATISTICS ){
+			long t1 = System.nanoTime();
+			Statistics.maintainCPHeavyHitters("MR-Job_"+jobname, t1-t0);
+		}
+		
+		return ret;
+	}
+	
+
+	/**
+	 * Result file contains hierarchy of workerID-resultvar(incl filename). We deduplicate
+	 * on the workerID. Without JVM reuse each task refers to a unique workerID, so we
+	 * will not find any duplicates. With JVM reuse, however, each slot refers to a workerID, 
+	 * and there are duplicate filenames due to partial aggregation and overwrite of fname 
+	 * (the RemoteParWorkerMapper ensures uniqueness of those files independent of the 
+	 * runtime implementation). 
+	 * 
+	 * @param job 
+	 * @param fname
+	 * @return
+	 * @throws DMLRuntimeException
+	 */
+	@SuppressWarnings("deprecation")
+	public static LocalVariableMap [] readResultFile( JobConf job, String fname )
+		throws DMLRuntimeException, IOException
+	{
+		HashMap<Long,LocalVariableMap> tmp = new HashMap<Long,LocalVariableMap>();
+
+		FileSystem fs = FileSystem.get(job);
+		Path path = new Path(fname);
+		LongWritable key = new LongWritable(); //workerID
+		Text value = new Text();               //serialized var header (incl filename)
+		
+		int countAll = 0;
+		for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+		{
+			SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(job),lpath,job);
+			try
+			{
+				while( reader.next(key, value) )
+				{
+					//System.out.println("key="+key.get()+", value="+value.toString());
+					if( !tmp.containsKey( key.get() ) )
+		        		tmp.put(key.get(), new LocalVariableMap ());	   
+					Object[] dat = ProgramConverter.parseDataObject( value.toString() );
+		        	tmp.get( key.get() ).put((String)dat[0], (Data)dat[1]);
+		        	countAll++;
+				}
+			}	
+			finally
+			{
+				if( reader != null )
+					reader.close();
+			}
+		}		
+
+		LOG.debug("Num remote worker results (before deduplication): "+countAll);
+		LOG.debug("Num remote worker results: "+tmp.size());
+
+		//create return array
+		return tmp.values().toArray(new LocalVariableMap[0]);	
+	}
+}