You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/07/06 22:15:05 UTC

[1/2] systemml git commit: [MINOR] Fix consistency matrix/frame writers (crc files, part names)

Repository: systemml
Updated Branches:
  refs/heads/master f418c4460 -> 988366de0


[MINOR] Fix consistency matrix/frame writers (crc files, part names)

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f0cb8cc8
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f0cb8cc8
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f0cb8cc8

Branch: refs/heads/master
Commit: f0cb8cc86feae0d0b5825f01cf85b47337336fa7
Parents: f418c44
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Jul 5 23:46:11 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Jul 5 23:46:11 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/io/FrameWriterBinaryBlock.java |  3 ++-
 .../io/FrameWriterBinaryBlockParallel.java       | 14 +++++++++++---
 .../sysml/runtime/io/FrameWriterTextCSV.java     |  3 ++-
 .../runtime/io/FrameWriterTextCSVParallel.java   | 10 +++++++++-
 .../sysml/runtime/io/FrameWriterTextCell.java    |  5 +++--
 .../runtime/io/FrameWriterTextCellParallel.java  | 11 +++++++++--
 .../apache/sysml/runtime/io/IOUtilFunctions.java |  6 +++++-
 .../runtime/io/WriterBinaryBlockParallel.java    | 18 ++++++++----------
 .../runtime/io/WriterMatrixMarketParallel.java   | 18 ++++++++----------
 .../sysml/runtime/io/WriterTextCSVParallel.java  | 19 ++++++++-----------
 .../sysml/runtime/io/WriterTextCellParallel.java | 18 ++++++++----------
 11 files changed, 73 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
index 819b7d0..e208bbe 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
@@ -67,7 +67,8 @@ public class FrameWriterBinaryBlock extends FrameWriter
 		int blen = ConfigurationManager.getBlocksize();
 		
 		//sequential write to single file
-		writeBinaryBlockFrameToSequenceFile(path, job, fs, src, blen, 0, (int)rlen);		
+		writeBinaryBlockFrameToSequenceFile(path, job, fs, src, blen, 0, (int)rlen);
+		IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
index a25fe75..52f1ed0 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.conf.ConfigurationManager;
@@ -45,7 +46,7 @@ import org.apache.sysml.runtime.util.MapReduceTool;
  */
 public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock
 {	
-
+	@Override
 	protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen )
 		throws IOException, DMLRuntimeException
 	{
@@ -75,7 +76,7 @@ public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock
 			ArrayList<WriteFileTask> tasks = new ArrayList<WriteFileTask>();
 			int blklen = (int)Math.ceil((double)rlen / blen / numThreads) * blen;
 			for(int i=0; i<numThreads & i*blklen<rlen; i++) {
-				Path newPath = new Path(path, String.format("0-m-%05d",i));
+				Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i));
 				tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, Math.min((i+1)*blklen, (int)rlen), blen));
 			}
 
@@ -86,10 +87,17 @@ public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock
 			//check for exceptions 
 			for( Future<Object> task : rt )
 				task.get();
+			
+			// delete crc files if written to local file system
+			if (fs instanceof LocalFileSystem) {
+				for(int i=0; i<numThreads & i*blklen<rlen; i++) 
+					IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs,
+						new Path(path, IOUtilFunctions.getPartFileName(i)));
+			}
 		} 
 		catch (Exception e) {
 			throw new IOException("Failed parallel write of binary block input.", e);
-		}	
+		}
 	}
 
 	private class WriteFileTask implements Callable<Object> 

http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
index 83f3861..7d15d9b 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
@@ -77,7 +77,8 @@ public class FrameWriterTextCSV extends FrameWriter
 		FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
 		
 		//sequential write to single text file
-		writeCSVFrameToFile(path, job, fs, src, 0, (int)rlen, csvprops);	
+		writeCSVFrameToFile(path, job, fs, src, 0, (int)rlen, csvprops);
+		IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
 	}
 
 	protected final void writeCSVFrameToFile( Path path, JobConf job, FileSystem fs, FrameBlock src, int rl, int ru, CSVFileFormatProperties props )

http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
index 10f0827..fe4fd39 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.conf.DMLConfig;
@@ -77,7 +78,7 @@ public class FrameWriterTextCSVParallel extends FrameWriterTextCSV
 			ArrayList<WriteFileTask> tasks = new ArrayList<WriteFileTask>();
 			int blklen = (int)Math.ceil((double)rlen / numThreads);
 			for(int i=0; i<numThreads & i*blklen<rlen; i++) {
-				Path newPath = new Path(path, String.format("0-m-%05d",i));
+				Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i));
 				tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen), csvprops));
 			}
 
@@ -88,6 +89,13 @@ public class FrameWriterTextCSVParallel extends FrameWriterTextCSV
 			//check for exceptions 
 			for( Future<Object> task : rt )
 				task.get();
+			
+			// delete crc files if written to local file system
+			if (fs instanceof LocalFileSystem) {
+				for(int i=0; i<numThreads & i*blklen<rlen; i++) 
+					IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs,
+						new Path(path, IOUtilFunctions.getPartFileName(i)));
+			}
 		} 
 		catch (Exception e) {
 			throw new IOException("Failed parallel write of csv output.", e);

http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
index 7263e7a..134d961 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
@@ -66,8 +66,9 @@ public class FrameWriterTextCell extends FrameWriter
 		FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
 		
 		//sequential write to single text file
-		writeTextCellFrameToFile(path, job, fs, src, 0, (int)rlen);	
-	}	
+		writeTextCellFrameToFile(path, job, fs, src, 0, (int)rlen);
+		IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
+	}
 	
 	/**
 	 * Internal primitive to write a row range of a frame to a single text file, 

http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
index 8eed53c..f42ca41 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.conf.DMLConfig;
@@ -43,7 +44,6 @@ import org.apache.sysml.runtime.util.MapReduceTool;
  */
 public class FrameWriterTextCellParallel extends FrameWriterTextCell
 {
-
 	@Override
 	protected void writeTextCellFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen )
 		throws IOException
@@ -73,7 +73,7 @@ public class FrameWriterTextCellParallel extends FrameWriterTextCell
 			ArrayList<WriteFileTask> tasks = new ArrayList<WriteFileTask>();
 			int blklen = (int)Math.ceil((double)rlen / numThreads);
 			for(int i=0; i<numThreads & i*blklen<rlen; i++) {
-				Path newPath = new Path(path, String.format("0-m-%05d",i));
+				Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i));
 				tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen)));
 			}
 
@@ -84,6 +84,13 @@ public class FrameWriterTextCellParallel extends FrameWriterTextCell
 			//check for exceptions 
 			for( Future<Object> task : rt )
 				task.get();
+			
+			// delete crc files if written to local file system
+			if (fs instanceof LocalFileSystem) {
+				for(int i=0; i<numThreads & i*blklen<rlen; i++) 
+					IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs,
+						new Path(path, IOUtilFunctions.getPartFileName(i)));
+			}
 		} 
 		catch (Exception e) {
 			throw new IOException("Failed parallel write of text output.", e);

http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 040da01..ecbf7e4 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -56,7 +56,7 @@ public class IOUtilFunctions
 	private static final Log LOG = LogFactory.getLog(UtilFunctions.class.getName());
 
 	private static final char CSV_QUOTE_CHAR = '"';
-
+	
 	public static FileSystem getFileSystem(String fname) throws IOException {
 		return getFileSystem(new Path(fname),
 			ConfigurationManager.getCachedJobConf());
@@ -88,6 +88,10 @@ public class IOUtilFunctions
 		return scheme.startsWith("s3") || scheme.startsWith("swift");
 	}
 	
+	public static String getPartFileName(int pos) {
+		return String.format("0-m-%05d", pos);
+	}
+	
 	public static void closeSilently( Closeable io ) {
 		try {
 			if( io != null )

http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
index 6f33011..b5aec06 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
@@ -73,7 +73,7 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock
 			ArrayList<WriteFileTask> tasks = new ArrayList<WriteFileTask>();
 			int blklen = (int)Math.ceil((double)rlen / brlen / numThreads) * brlen;
 			for(int i=0; i<numThreads & i*blklen<rlen; i++) {
-				Path newPath = new Path(path, String.format("0-m-%05d",i));
+				Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i));
 				tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, Math.min((i+1)*blklen, rlen), brlen, bclen));
 			}
 
@@ -84,19 +84,17 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock
 			//check for exceptions 
 			for( Future<Object> task : rt )
 				task.get();
+			
+			// delete crc files if written to local file system
+			if (fs instanceof LocalFileSystem) {
+				for(int i=0; i<numThreads & i*blklen<rlen; i++) 
+					IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs,
+						new Path(path, IOUtilFunctions.getPartFileName(i)));
+			}
 		} 
 		catch (Exception e) {
 			throw new IOException("Failed parallel write of binary block input.", e);
 		}
-
-		// delete crc files if written to local file system
-		if (fs instanceof LocalFileSystem) {
-			int blklen = (int)Math.ceil((double)rlen / numThreads);
-			for(int i=0; i<numThreads & i*blklen<rlen; i++) {
-				Path newPath = new Path(path, String.format("0-m-%05d",i));
-				IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, newPath);
-			}
-		}
 	}
 
 	private class WriteFileTask implements Callable<Object> 

http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
index 1fed377..afc1bdb 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
@@ -71,7 +71,7 @@ public class WriterMatrixMarketParallel extends WriterMatrixMarket
 			ArrayList<WriteMMTask> tasks = new ArrayList<WriteMMTask>();
 			int blklen = (int)Math.ceil((double)rlen / numThreads);
 			for(int i=0; i<numThreads & i*blklen<rlen; i++) {
-				Path newPath = new Path(path, String.format("0-m-%05d",i));
+				Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i));
 				tasks.add(new WriteMMTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen)));
 			}
 
@@ -82,19 +82,17 @@ public class WriterMatrixMarketParallel extends WriterMatrixMarket
 			//check for exceptions 
 			for( Future<Object> task : rt )
 				task.get();
+			
+			// delete crc files if written to local file system
+			if (fs instanceof LocalFileSystem) {
+				for(int i=0; i<numThreads & i*blklen<rlen; i++) 
+					IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs,
+						new Path(path, IOUtilFunctions.getPartFileName(i)));
+			}
 		} 
 		catch (Exception e) {
 			throw new IOException("Failed parallel write of text output.", e);
 		}
-
-		// delete crc files if written to local file system
-		if (fs instanceof LocalFileSystem) {
-			int blklen = (int)Math.ceil((double)rlen / numThreads);
-			for(int i=0; i<numThreads & i*blklen<rlen; i++) {
-				Path newPath = new Path(path, String.format("0-m-%05d",i));
-				IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, newPath);
-			}
-		}
 	}
 
 	private class WriteMMTask implements Callable<Object> 

http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
index 581225f..173f602 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
@@ -74,7 +74,7 @@ public class WriterTextCSVParallel extends WriterTextCSV
 			int rlen = src.getNumRows();
 			int blklen = (int)Math.ceil((double)rlen / numThreads);
 			for(int i=0; i<numThreads & i*blklen<rlen; i++) {
-				Path newPath = new Path(path, String.format("0-m-%05d",i));
+				Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i));
 				tasks.add(new WriteCSVTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen), csvprops));
 			}
 
@@ -85,20 +85,17 @@ public class WriterTextCSVParallel extends WriterTextCSV
 			//check for exceptions 
 			for( Future<Object> task : rt )
 				task.get();
+			
+			// delete crc files if written to local file system
+			if (fs instanceof LocalFileSystem) {
+				for(int i=0; i<numThreads & i*blklen<rlen; i++) 
+					IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs,
+						new Path(path, IOUtilFunctions.getPartFileName(i)));
+			}
 		} 
 		catch (Exception e) {
 			throw new IOException("Failed parallel write of csv output.", e);
 		}
-
-		// delete crc files if written to local file system
-		if (fs instanceof LocalFileSystem) {
-			int rlen = src.getNumRows();
-			int blklen = (int)Math.ceil((double)rlen / numThreads);
-			for(int i=0; i<numThreads & i*blklen<rlen; i++) {
-				Path newPath = new Path(path, String.format("0-m-%05d",i));
-				IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, newPath);
-			}
-		}
 	}
 
 	private class WriteCSVTask implements Callable<Object> 

http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
index 1c08459..ec4b042 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
@@ -70,7 +70,7 @@ public class WriterTextCellParallel extends WriterTextCell
 			ArrayList<WriteTextTask> tasks = new ArrayList<WriteTextTask>();
 			int blklen = (int)Math.ceil((double)rlen / numThreads);
 			for(int i=0; i<numThreads & i*blklen<rlen; i++) {
-				Path newPath = new Path(path, String.format("0-m-%05d",i));
+				Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i));
 				tasks.add(new WriteTextTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen)));
 			}
 
@@ -81,19 +81,17 @@ public class WriterTextCellParallel extends WriterTextCell
 			//check for exceptions 
 			for( Future<Object> task : rt )
 				task.get();
+			
+			// delete crc files if written to local file system
+			if (fs instanceof LocalFileSystem) {
+				for(int i=0; i<numThreads & i*blklen<rlen; i++) 
+					IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs,
+						new Path(path, IOUtilFunctions.getPartFileName(i)));
+			}
 		} 
 		catch (Exception e) {
 			throw new IOException("Failed parallel write of text output.", e);
 		}
-
-		// delete crc files if written to local file system
-		if (fs instanceof LocalFileSystem) {
-			int blklen = (int)Math.ceil((double)rlen / numThreads);
-			for(int i=0; i<numThreads & i*blklen<rlen; i++) {
-				Path newPath = new Path(path, String.format("0-m-%05d",i));
-				IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, newPath);
-			}
-		}
 	}
 
 	private class WriteTextTask implements Callable<Object> 


[2/2] systemml git commit: [SYSTEMML-1747] New rewrite and performance binary outer operations

Posted by mb...@apache.org.
[SYSTEMML-1747] New rewrite and performance binary outer operations

This patch adds a new rewrite to avoid unnecessary column replication
for subsequent binary matrix-vector operations, which can be directly
expressed via outer: (a %*% ones) == b) -> outer(a, b, "=="). As a
result of this rewrite, there are fewer intermediates and smaller memory
requirements (2*matrix + vector -> matrix + 2*vector).

Furthermore, this patch also makes a couple of performance improvements
in the related outer operations, specifically (1) shallow instead of
deep dense vector copies, and (2) avoid binary search for outer
comparison operations with sorted rhs vector if the number of columns is
very small.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/988366de
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/988366de
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/988366de

Branch: refs/heads/master
Commit: 988366de0300c43ec5b01e129576278c59fa49be
Parents: f0cb8cc
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Jul 6 01:55:57 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Thu Jul 6 01:55:57 2017 -0700

----------------------------------------------------------------------
 .../sysml/hops/rewrite/HopRewriteUtils.java     |  19 ++-
 .../RewriteAlgebraicSimplificationDynamic.java  |  29 +++--
 .../runtime/compress/utils/ConverterUtils.java  |   8 +-
 .../sysml/runtime/matrix/data/LibMatrixAgg.java |   2 +-
 .../runtime/matrix/data/LibMatrixBincell.java   |  70 +++++-----
 .../matrix/mapred/DistributedCacheInput.java    |   4 +-
 .../sysml/runtime/util/DataConverter.java       |  17 ++-
 .../sysml/runtime/util/MapReduceTool.java       |   2 +-
 .../apache/sysml/runtime/util/SortUtils.java    |  26 ++--
 .../codegen/CPlanVectorPrimitivesTest.java      |   8 +-
 .../misc/RewriteBinaryMV2OuterTest.java         | 130 +++++++++++++++++++
 .../DataFrameVectorFrameConversionTest.java     |   2 +-
 .../mlcontext/DataFrameVectorScriptTest.java    |   2 +-
 .../functions/misc/RewriteBinaryMV2Outer.R      |  33 +++++
 .../functions/misc/RewriteBinaryMV2Outer.dml    |  35 +++++
 .../functions/misc/ZPackageSuite.java           |   1 +
 16 files changed, 302 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
index 582809e..24698f5 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
@@ -456,6 +456,12 @@ public class HopRewriteUtils
 		return datagen;
 	}
 	
+	public static boolean isDataGenOpWithConstantValue(Hop hop, double value) {
+		return hop instanceof DataGenOp
+			&& ((DataGenOp)hop).getOp()==DataGenMethod.RAND
+			&& ((DataGenOp)hop).hasConstantValue(value);
+	}
+	
 	public static ReorgOp createTranspose(Hop input) {
 		return createReorg(input, ReOrgOp.TRANSPOSE);
 	}
@@ -493,8 +499,11 @@ public class HopRewriteUtils
 		return createBinary(new LiteralOp(0), input, OpOp2.MINUS);
 	}
 	
-	public static BinaryOp createBinary(Hop input1, Hop input2, OpOp2 op)
-	{
+	public static BinaryOp createBinary(Hop input1, Hop input2, OpOp2 op) {
+		return createBinary(input1, input2, op, false);
+	}
+	
+	public static BinaryOp createBinary(Hop input1, Hop input2, OpOp2 op, boolean outer) {
 		Hop mainInput = input1.getDataType().isMatrix() ? input1 : 
 			input2.getDataType().isMatrix() ? input2 : input1;
 		BinaryOp bop = new BinaryOp(mainInput.getName(), mainInput.getDataType(), 
@@ -502,6 +511,7 @@ public class HopRewriteUtils
 		//cleanup value type for relational operations
 		if( bop.isPPredOperation() && bop.getDataType().isScalar() )
 			bop.setValueType(ValueType.BOOLEAN);
+		bop.setOuterVectorOperation(outer);
 		bop.setOutputBlocksizes(mainInput.getRowsInBlock(), mainInput.getColsInBlock());
 		copyLineNumbers(mainInput, bop);
 		bop.refreshSizeInformation();	
@@ -716,6 +726,11 @@ public class HopRewriteUtils
 			&& hop.getInput().get(1).getDim1() < hop.getInput().get(1).getDim2();
 	}
 	
+	public static boolean isValidOuterBinaryOp( OpOp2 op ) {
+		String opcode = Hop.getBinaryOpCode(op);
+		return (Hop.getOpOp2ForOuterVectorOperation(opcode) == op);
+	}
+	
 	public static boolean isSparse( Hop hop ) {
 		return hop.dimsKnown(true) //dims and nnz known
 			&& MatrixBlock.evalSparseFormatInMemory(hop.getDim1(), hop.getDim2(), hop.getNnz());

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationDynamic.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationDynamic.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationDynamic.java
index 9681e44..8cd71f4 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationDynamic.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationDynamic.java
@@ -411,14 +411,13 @@ public class RewriteAlgebraicSimplificationDynamic extends HopRewriteRule
 	{
 		if( hi instanceof BinaryOp  ) //binary cell operation 
 		{
+			OpOp2 bop = ((BinaryOp)hi).getOp();
+			Hop left = hi.getInput().get(0);
 			Hop right = hi.getInput().get(1);
 			
-			//check for column replication
+			//check for matrix-vector column replication: (A + b %*% ones) -> (A + b)
 			if(    HopRewriteUtils.isMatrixMultiply(right) //matrix mult with datagen
-				&& right.getInput().get(1) instanceof DataGenOp 
-				&& ((DataGenOp)right.getInput().get(1)).getOp()==DataGenMethod.RAND
-				&& ((DataGenOp)right.getInput().get(1)).hasConstantValue(1d)
-				&& right.getInput().get(1).getDim1() == 1 //row vector for replication
+				&& HopRewriteUtils.isDataGenOpWithConstantValue(right.getInput().get(1), 1)
 				&& right.getInput().get(0).getDim2() == 1 ) //column vector for mv binary
 			{
 				//remove unnecessary outer product
@@ -427,11 +426,9 @@ public class RewriteAlgebraicSimplificationDynamic extends HopRewriteRule
 				
 				LOG.debug("Applied removeUnnecessaryOuterProduct1 (line "+right.getBeginLine()+")");
 			}
-			//check for row replication
+			//check for matrix-vector row replication: (A + ones %*% b) -> (A + b)
 			else if( HopRewriteUtils.isMatrixMultiply(right) //matrix mult with datagen
-				&& right.getInput().get(0) instanceof DataGenOp 
-				&& ((DataGenOp)right.getInput().get(0)).hasConstantValue(1d)
-				&& right.getInput().get(0).getDim2() == 1 //colunm vector for replication
+				&& HopRewriteUtils.isDataGenOpWithConstantValue(right.getInput().get(0), 1)
 				&& right.getInput().get(1).getDim1() == 1 ) //row vector for mv binary
 			{
 				//remove unnecessary outer product
@@ -440,6 +437,20 @@ public class RewriteAlgebraicSimplificationDynamic extends HopRewriteRule
 				
 				LOG.debug("Applied removeUnnecessaryOuterProduct2 (line "+right.getBeginLine()+")");
 			}
+			//check for vector-vector column replication: (a %*% ones) == b) -> outer(a, b, "==")
+			else if(HopRewriteUtils.isValidOuterBinaryOp(bop) 
+				&& HopRewriteUtils.isMatrixMultiply(left)
+				&& HopRewriteUtils.isDataGenOpWithConstantValue(left.getInput().get(1), 1)
+				&& left.getInput().get(0).getDim2() == 1 //column vector 
+				&& left.getDim1() != 1 && right.getDim1() == 1 ) //outer vector product 
+			{
+				Hop hnew = HopRewriteUtils.createBinary(left.getInput().get(0), right, bop, true);
+				HopRewriteUtils.replaceChildReference(parent, hi, hnew, pos);
+				HopRewriteUtils.cleanupUnreferenced(hi);
+				
+				hi = hnew;
+				LOG.debug("Applied removeUnnecessaryOuterProduct3 (line "+right.getBeginLine()+")");
+			}
 		}
 		
 		return hi;

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java b/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
index f4d9f1c..6a2753b 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
@@ -64,12 +64,8 @@ public class ConverterUtils
 		return ret;
 	}
 
-	public static double[] getDenseVector( MatrixBlock vector )
-	{
-		if( vector.isInSparseFormat() )
-			return DataConverter.convertToDoubleVector(vector);
-		else 
-			return vector.getDenseBlock();
+	public static double[] getDenseVector( MatrixBlock vector ) {
+		return DataConverter.convertToDoubleVector(vector, false);
 	}
 
 	public static MatrixBlock getUncompressedColBlock( ColGroup group )

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 404f440..094bc93 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -367,7 +367,7 @@ public class LibMatrixAgg
 			ArrayList<CumAggTask> tasks2 = new ArrayList<CumAggTask>();
 			for( int i=0; i<k & i*blklen<m; i++ ) {
 				double[] agg = (i==0)? null : 
-					DataConverter.convertToDoubleVector(tmp2.sliceOperations(i-1, i-1, 0, n2-1, new MatrixBlock()));
+					DataConverter.convertToDoubleVector(tmp2.sliceOperations(i-1, i-1, 0, n2-1, new MatrixBlock()), false);
 				tasks2.add( new CumAggTask(in, agg, out, aggtype, uop, i*blklen, Math.min((i+1)*blklen, m)) );
 			}
 			List<Future<Long>> taskret2 = pool.invokeAll(tasks2);	

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
index 4542882..e188b4e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
@@ -720,9 +720,11 @@ public class LibMatrixBincell
 		if( ret.sparse )
 			ret.allocateSparseRowsBlock();
 		
-		if(LibMatrixOuterAgg.isCompareOperator(op) && SortUtils.isSorted(0, m2.getNumColumns(), DataConverter.convertToDoubleVector(m2))) {
+		if(LibMatrixOuterAgg.isCompareOperator(op) 
+			&& m2.getNumColumns()>16 && SortUtils.isSorted(m2) ) {
 			performBinOuterOperation(m1, m2, ret, op);
-		} else {
+		}
+		else {
 			for(int r=0; r<rlen; r++) {
 				double v1 = m1.quickGetValue(r, 0);		
 				for(int c=0; c<clen; c++)
@@ -751,58 +753,51 @@ public class LibMatrixBincell
 			throws DMLRuntimeException
 	{
 		int rlen = mbLeft.rlen;
-		double bv[] = DataConverter.convertToDoubleVector(mbRight); 
-		
+		int clen = mbOut.clen;
+		double b[] = DataConverter.convertToDoubleVector(mbRight);
 		if(!mbOut.isAllocated())
 			mbOut.allocateDenseBlock();
+		double c[] = mbOut.getDenseBlock();
 		
-		long lNNZ = 0;
-		for(int r=0; r<rlen; r++) {
+		long lnnz = 0;
+		for(int r=0, off=0; r<rlen; r++, off+=clen) {
 			double value = mbLeft.quickGetValue(r, 0);		
-			int ixPos1 = Arrays.binarySearch(bv, value);
+			int ixPos1 = Arrays.binarySearch(b, value);
 			int ixPos2 = ixPos1;
 
 			if( ixPos1 >= 0 ){ //match, scan to next val
 				if(bOp.fn instanceof LessThan || bOp.fn instanceof GreaterThanEquals 
 						|| bOp.fn instanceof Equals || bOp.fn instanceof NotEquals)
-					while( ixPos1<bv.length && value==bv[ixPos1]  ) ixPos1++;
+					while( ixPos1<b.length && value==b[ixPos1]  ) ixPos1++;
 				if(bOp.fn instanceof GreaterThan || bOp.fn instanceof LessThanEquals 
 						|| bOp.fn instanceof Equals || bOp.fn instanceof NotEquals)
-					while(  ixPos2 > 0 && value==bv[ixPos2-1]) --ixPos2;
+					while(  ixPos2 > 0 && value==b[ixPos2-1]) --ixPos2;
 			} else {
 				ixPos2 = ixPos1 = Math.abs(ixPos1) - 1;
 			}
 
-			int iStartPos = 0, iEndPos = bv.length;
-
-			if(bOp.fn instanceof LessThan)
-				iStartPos = ixPos1;
-			else  if(bOp.fn instanceof LessThanEquals)
-				iStartPos = ixPos2;  
-			else if(bOp.fn instanceof GreaterThan)
-				iEndPos = ixPos2;
-			else if(bOp.fn instanceof GreaterThanEquals)
-				iEndPos = ixPos1;
+			int start = 0, end = clen;
+			if(bOp.fn instanceof LessThan || bOp.fn instanceof LessThanEquals)
+				start = (bOp.fn instanceof LessThan) ? ixPos1 : ixPos2;
+			else if(bOp.fn instanceof GreaterThan || bOp.fn instanceof GreaterThanEquals)
+				end = (bOp.fn instanceof GreaterThan) ? ixPos2 : ixPos1;
 			else if(bOp.fn instanceof Equals || bOp.fn instanceof NotEquals) {
-				iStartPos = ixPos2;
-				iEndPos = ixPos1;
+				start = ixPos2;
+				end = ixPos1;
 			}
-			if(iStartPos < iEndPos || bOp.fn instanceof NotEquals) {
-				int iOffSet = r*mbRight.getNumColumns();
-				if(bOp.fn instanceof LessThan || bOp.fn instanceof GreaterThanEquals 
-						|| bOp.fn instanceof GreaterThan || bOp.fn instanceof LessThanEquals 
-						|| bOp.fn instanceof Equals)	{
-					Arrays.fill(mbOut.getDenseBlock(), iOffSet+iStartPos, iOffSet+iEndPos, 1.0);
-					lNNZ += (iEndPos-iStartPos);
+			if(start < end || bOp.fn instanceof NotEquals) {
+				if (bOp.fn instanceof NotEquals) {
+					Arrays.fill(c, off, off+start, 1.0);
+					Arrays.fill(c, off+end, off+clen, 1.0);
+					lnnz += (start+(clen-end));
 				}
-				else if (bOp.fn instanceof NotEquals) {
-					Arrays.fill(mbOut.getDenseBlock(), iOffSet, iOffSet+iStartPos, 1.0);
-					Arrays.fill(mbOut.getDenseBlock(), iOffSet+iEndPos, iOffSet+bv.length, 1.0);
-					lNNZ += (iStartPos+(bv.length-iEndPos));
+				else {
+					Arrays.fill(c, off+start, off+end, 1.0);
+					lnnz += (end-start);
 				}
 			}
 		}
-		mbOut.setNonZeros(lNNZ);		
+		mbOut.setNonZeros(lnnz);
 		mbOut.examSparsity();
 	}
 
@@ -843,12 +838,11 @@ public class LibMatrixBincell
 		{
 			int clen2 = m2.clen; 
 			
-			//TODO performance improvement for relational operations like ">"
-			//sort rhs by val, compute cutoff and memset 1/0 for halfs
-	
-			if(LibMatrixOuterAgg.isCompareOperator(op) && SortUtils.isSorted(0, m2.getNumColumns(), DataConverter.convertToDoubleVector(m2))) {
+			if(LibMatrixOuterAgg.isCompareOperator(op) 
+				&& m2.getNumColumns()>16 && SortUtils.isSorted(m2)) {
 				performBinOuterOperation(m1, m2, ret, op);
-			} else {
+			} 
+			else {
 				for(int r=0; r<rlen; r++) {
 					double v1 = m1.quickGetValue(r, 0);		
 					for(int c=0; c<clen2; c++)

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/main/java/org/apache/sysml/runtime/matrix/mapred/DistributedCacheInput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/DistributedCacheInput.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/DistributedCacheInput.java
index dc1b5bb..82269f7 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/DistributedCacheInput.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/DistributedCacheInput.java
@@ -106,7 +106,7 @@ public class DistributedCacheInput
 		
 		for( int j=0; j<_clen; j+=_bclen ) {
 			MatrixBlock mb = (MatrixBlock) getDataBlock(1, (int)Math.ceil((double)(j+1)/_bclen)).getValue(); 
-			double[] mbtmp = DataConverter.convertToDoubleVector(mb);
+			double[] mbtmp = DataConverter.convertToDoubleVector(mb, false);
 			System.arraycopy(mbtmp, 0, ret, j, mbtmp.length);
 		}
 		
@@ -120,7 +120,7 @@ public class DistributedCacheInput
 		
 		for( int j=0; j<_rlen; j+=_brlen ) {
 			MatrixBlock mb = (MatrixBlock) getDataBlock((int)Math.ceil((double)(j+1)/_brlen),1).getValue(); 
-			double[] mbtmp = DataConverter.convertToDoubleVector(mb);
+			double[] mbtmp = DataConverter.convertToDoubleVector(mb, false);
 			System.arraycopy(mbtmp, 0, ret, j, mbtmp.length);
 		}
 		

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index b016fdf..ac1e80e 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -307,24 +307,27 @@ public class DataConverter
 		return ret;
 	}
 
-	public static double[] convertToDoubleVector( MatrixBlock mb )
+	public static double[] convertToDoubleVector( MatrixBlock mb ) {
+		return convertToDoubleVector(mb, true);
+	}
+	
+	public static double[] convertToDoubleVector( MatrixBlock mb, boolean deep )
 	{
 		int rows = mb.getNumRows();
 		int cols = mb.getNumColumns();
-		double[] ret = new double[rows*cols]; //0-initialized 
+		double[] ret = (!mb.isInSparseFormat() && mb.isAllocated() && !deep) ? 
+			mb.getDenseBlock() : new double[rows*cols]; //0-initialized
 		
-		if( mb.getNonZeros() > 0 )
+		if( !mb.isEmptyBlock(false) )
 		{
-			if( mb.isInSparseFormat() )
-			{
+			if( mb.isInSparseFormat() ) {
 				Iterator<IJV> iter = mb.getSparseBlockIterator();
 				while( iter.hasNext() ) {
 					IJV cell = iter.next();
 					ret[cell.getI()*cols+cell.getJ()] = cell.getV();
 				}
 			}
-			else
-			{
+			else if( deep ) {
 				//memcopy row major representation if at least 1 non-zero
 				System.arraycopy(mb.getDenseBlock(), 0, ret, 0, rows*cols);
 			}

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index be6e177..e8f206a 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -523,7 +523,7 @@ public class MapReduceTool
 		MatrixReader reader = MatrixReaderFactory.createMatrixReader(inputinfo);
 		long estnnz = (rlen <= 0 || clen <= 0) ? -1 : rlen * clen;
 		MatrixBlock mb = reader.readMatrixFromHDFS(dir, rlen, clen, brlen, bclen, estnnz);
-		return DataConverter.convertToDoubleVector(mb);
+		return DataConverter.convertToDoubleVector(mb, false);
 	}
 	
 	public static double median(String dir, NumItemsByEachReducerMetaData metadata) throws IOException {

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/main/java/org/apache/sysml/runtime/util/SortUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/SortUtils.java b/src/main/java/org/apache/sysml/runtime/util/SortUtils.java
index 5c75a81..c41f3ac 100644
--- a/src/main/java/org/apache/sysml/runtime/util/SortUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/util/SortUtils.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Random;
 
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
 /**
  * Utilities for sorting, primarily used for SparseRows.
@@ -31,28 +32,25 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
 public class SortUtils 
 {
 
-	public static boolean isSorted(int start, int end, int[] indexes)
-	{
+	public static boolean isSorted(int start, int end, int[] indexes) {
 		boolean ret = true;
-		for( int i=start+1; i<end; i++ )
-    		if( indexes[i]<indexes[i-1] ){
-    			ret = false;
-    			break;
-    		}
+		for( int i=start+1; i<end && ret; i++ )
+    		ret &= (indexes[i]<indexes[i-1]);
 		return ret;
 	}
 
-	public static boolean isSorted(int iStart, int iEnd, double[] dVals)
-	{
+	public static boolean isSorted(int start, int end, double[] values) {
 		boolean ret = true;
-		for( int i=iStart+1; i<iEnd; i++ )
-    		if( dVals[i]<dVals[i-1] ){
-    			ret = false;
-    			break;
-    		}
+		for( int i=start+1; i<end && ret; i++ )
+    		ret &= (values[i]<values[i-1]);
 		return ret;
 	}
 	
+	public static boolean isSorted(MatrixBlock in) {
+		return in.isInSparseFormat() ? false : !in.isAllocated() ? true :
+			isSorted(0, in.getNumRows()*in.getNumColumns(), in.getDenseBlock());
+	}
+	
 	/**
 	 * In-place sort of two arrays, only indexes is used for comparison and values
 	 * of same position are sorted accordingly. 

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/test/java/org/apache/sysml/test/integration/functions/codegen/CPlanVectorPrimitivesTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/codegen/CPlanVectorPrimitivesTest.java b/src/test/java/org/apache/sysml/test/integration/functions/codegen/CPlanVectorPrimitivesTest.java
index 15d4eb0..7cc100f 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/codegen/CPlanVectorPrimitivesTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/codegen/CPlanVectorPrimitivesTest.java
@@ -619,7 +619,7 @@ public class CPlanVectorPrimitivesTest extends AutomatedTestBase
 				UnaryOperator uop = new UnaryOperator(Builtin.getBuiltinFnObject(opcode));
 				double[] ret2 = DataConverter.convertToDoubleVector(((MatrixBlock)in
 					.sliceOperations(i, i, 0, n-1, new MatrixBlock())
-					.unaryOperations(uop, new MatrixBlock())));
+					.unaryOperations(uop, new MatrixBlock())), false);
 				
 				//compare results
 				TestUtils.compareMatrices(ret1, ret2, eps);
@@ -683,18 +683,18 @@ public class CPlanVectorPrimitivesTest extends AutomatedTestBase
 					ScalarOperator bop = InstructionUtils.parseScalarBinaryOperator(opcode, true);
 					bop.setConstant(inA.max());
 					ret2 = DataConverter.convertToDoubleVector((MatrixBlock)
-						in2.scalarOperations(bop, new MatrixBlock()));
+						in2.scalarOperations(bop, new MatrixBlock()), false);
 				}
 				else if( type2 == InputType.SCALAR ) {
 					ScalarOperator bop = InstructionUtils.parseScalarBinaryOperator(opcode, false);
 					bop.setConstant(inB.max());
 					ret2 = DataConverter.convertToDoubleVector((MatrixBlock)
-						in1.scalarOperations(bop, new MatrixBlock()));
+						in1.scalarOperations(bop, new MatrixBlock()), false);
 				}
 				else { //vector-vector
 					BinaryOperator bop = InstructionUtils.parseBinaryOperator(opcode);
 					ret2 = DataConverter.convertToDoubleVector((MatrixBlock)
-						in1.binaryOperations(bop, in2, new MatrixBlock()));
+						in1.binaryOperations(bop, in2, new MatrixBlock()), false);
 				}
 				
 				//compare results

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/test/java/org/apache/sysml/test/integration/functions/misc/RewriteBinaryMV2OuterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/misc/RewriteBinaryMV2OuterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/misc/RewriteBinaryMV2OuterTest.java
new file mode 100644
index 0000000..61a7b85
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/misc/RewriteBinaryMV2OuterTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.misc;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class RewriteBinaryMV2OuterTest extends AutomatedTestBase 
+{
+	private static final String TEST_NAME = "RewriteBinaryMV2Outer";
+	
+	private static final String TEST_DIR = "functions/misc/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + RewriteBinaryMV2OuterTest.class.getSimpleName() + "/";
+	
+	private double eps = Math.pow(10, -10);
+	
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration( TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] { "R" }) );
+	}
+	
+	@Test
+	public void testRewriteBinaryMV2OuterEquals() {
+		testRewriteBinaryMV2Outer("==", false);
+	}
+	
+	@Test
+	public void testRewriteBinaryMV2OuterNotEquals() {
+		testRewriteBinaryMV2Outer("!=", false);
+	}
+	
+	@Test
+	public void testRewriteBinaryMV2OuterMinus() {
+		testRewriteBinaryMV2Outer("-", false);
+	}
+	
+	@Test
+	public void testRewriteBinaryMV2OuterPlus() {
+		testRewriteBinaryMV2Outer("+", false);
+	}
+	
+	@Test
+	public void testRewriteBinaryMV2OuterEqualsRewrites() {
+		testRewriteBinaryMV2Outer("==", true);
+	}
+	
+	@Test
+	public void testRewriteBinaryMV2OuterNotEqualsRewrites() {
+		testRewriteBinaryMV2Outer("!=", true);
+	}
+	
+	@Test
+	public void testRewriteBinaryMV2OuterMinusRewrites() {
+		testRewriteBinaryMV2Outer("-", true);
+	}
+	
+	@Test
+	public void testRewriteBinaryMV2OuterPlusRewrites() {
+		testRewriteBinaryMV2Outer("+", true);
+	}
+	
+	private void testRewriteBinaryMV2Outer(String opcode, boolean rewrites)
+	{	
+		boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+		
+		try
+		{
+			TestConfiguration config = getTestConfiguration(TEST_NAME);
+			loadTestConfiguration(config);
+			
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + TEST_NAME + ".dml";
+			programArgs = new String[]{ "-stats","-args", 
+				input("A"), input("B"), opcode, output("R") };
+			
+			fullRScriptName = HOME + TEST_NAME + ".R";
+			rCmd = getRCmd(inputDir(), opcode, expectedDir());			
+
+			OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = rewrites;
+			
+			//generate actual dataset 
+			double[][] A = getRandomMatrix(128, 1, -5, 5, 0.9, 123); 
+			double[][] B = getRandomMatrix(1, 256, -5, 5, 0.9, 456); 
+			writeInputMatrixWithMTD("A", A, true);
+			writeInputMatrixWithMTD("B", B, true);
+			
+			//run test
+			runTest(true, false, null, -1); 
+			runRScript(true); 
+			
+			//compare matrices 
+			HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
+			HashMap<CellIndex, Double> rfile  = readRMatrixFromFS("R");
+			TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R");
+			
+			//check for applied rewrites
+			if( rewrites )
+				Assert.assertTrue(!heavyHittersContainsSubString("ba+*"));
+		}
+		finally {
+			OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldFlag;
+		}	
+	}	
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
index e68eee9..989c4bd 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
@@ -310,7 +310,7 @@ public class DataFrameVectorFrameConversionTest extends AutomatedTestBase
 				}
 				else {
 					double[] tmp = DataConverter.convertToDoubleVector(
-							mb.sliceOperations(i, i, j, j+colsVector-1, new MatrixBlock()));
+							mb.sliceOperations(i, i, j, j+colsVector-1, new MatrixBlock()), false);
 					row[j2+off] = new DenseVector(tmp);
 					j += colsVector-1;
 				}

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
index 4ae22cd..65aee8e 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
@@ -312,7 +312,7 @@ public class DataFrameVectorScriptTest extends AutomatedTestBase
 				}
 				else {
 					double[] tmp = DataConverter.convertToDoubleVector(
-							mb.sliceOperations(i, i, j, j+colsVector-1, new MatrixBlock()));
+							mb.sliceOperations(i, i, j, j+colsVector-1, new MatrixBlock()), false);
 					row[j2+off] = new DenseVector(tmp);
 					j += colsVector-1;
 				}

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/test/scripts/functions/misc/RewriteBinaryMV2Outer.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/misc/RewriteBinaryMV2Outer.R b/src/test/scripts/functions/misc/RewriteBinaryMV2Outer.R
new file mode 100644
index 0000000..c1f2fd5
--- /dev/null
+++ b/src/test/scripts/functions/misc/RewriteBinaryMV2Outer.R
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+library("Matrix")
+library("matrixStats")
+
+A = as.vector(readMM(paste(args[1], "A.mtx", sep="")))
+B = as.vector(readMM(paste(args[1], "B.mtx", sep="")))
+
+R = outer(A, B, args[2]);
+
+writeMM(as(R, "CsparseMatrix"), paste(args[3], "R", sep=""));

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/test/scripts/functions/misc/RewriteBinaryMV2Outer.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/misc/RewriteBinaryMV2Outer.dml b/src/test/scripts/functions/misc/RewriteBinaryMV2Outer.dml
new file mode 100644
index 0000000..9179f97
--- /dev/null
+++ b/src/test/scripts/functions/misc/RewriteBinaryMV2Outer.dml
@@ -0,0 +1,35 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+A = read($1);
+B = read($2);
+
+if( $3 == "==" )
+   R = (A%*%matrix(1,1,ncol(B))) == B;
+else if( $3 == "!=" )
+   R = (A%*%matrix(1,1,ncol(B))) != B;
+else if( $3 == "-" )
+   R = (A%*%matrix(1,1,ncol(B))) - B;
+else if( $3 == "+" )
+   R = (A%*%matrix(1,1,ncol(B))) + B;
+
+write(R, $4);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/systemml/blob/988366de/src/test_suites/java/org/apache/sysml/test/integration/functions/misc/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/misc/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/misc/ZPackageSuite.java
index 860cdbe..95ed276 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/misc/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/misc/ZPackageSuite.java
@@ -48,6 +48,7 @@ import org.junit.runners.Suite;
 	PrintExpressionTest.class,
 	PrintMatrixTest.class,
 	ReadAfterWriteTest.class,
+	RewriteBinaryMV2OuterTest.class,
 	RewriteCSETransposeScalarTest.class,
 	RewriteCTableToRExpandTest.class,
 	RewriteElementwiseMultChainOptimizationTest.class,