You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/29 17:54:54 UTC

[1/3] incubator-systemml git commit: [SYSTEMML-958] Improved multilogreg algorithm script (known size of Y)

Repository: incubator-systemml
Updated Branches:
  refs/heads/master cf122042f -> 24cc180f3


[SYSTEMML-958] Improved multilogreg algorithm script (known size of Y)

The size of the modified label vector Y (as indicator matrix with one
column per class label) is data dependent. Despite worst-case estimates
and dynamic recompilation, this leads to unnecessary distributed
operations for the table operation that creates the indicator matrix.
This script change makes a slight modification to this preprocessing,
which allows dynamic recompilation to exactly infer the output
dimensions. On a scenario of 100M x 10, 2 classes, and 15GB driver this
led to an improvement from 360s to 230s end-to-end. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/10dff5c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/10dff5c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/10dff5c9

Branch: refs/heads/master
Commit: 10dff5c9e3eb737a965846246d8187fcb0b03689
Parents: cf12204
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Sep 28 16:24:24 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Sep 29 10:54:18 2016 -0700

----------------------------------------------------------------------
 scripts/algorithms/MultiLogReg.dml | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/10dff5c9/scripts/algorithms/MultiLogReg.dml
----------------------------------------------------------------------
diff --git a/scripts/algorithms/MultiLogReg.dml b/scripts/algorithms/MultiLogReg.dml
index 8dc5a76..eaef4a1 100644
--- a/scripts/algorithms/MultiLogReg.dml
+++ b/scripts/algorithms/MultiLogReg.dml
@@ -138,13 +138,14 @@ if (intercept_status == 2)  # scale-&-shift X columns to mean 0, variance 1
 # tssX_A = t(SHIFT/SCALE TRANSFORM) %*% A   --- is rewritten as:
 # tssX_A = diag (scale_X) %*% A + shift_X %*% A [D, ];
 
-# Convert "Y_vec" into indicator matrice:
+# Convert "Y_vec" into indicator matrix:
+max_y = max (Y_vec);
 if (min (Y_vec) <= 0) { 
     # Category labels "0", "-1" etc. are converted into the largest label
-    max_y = max (Y_vec);
     Y_vec  = Y_vec  + (- Y_vec  + max_y + 1) * (Y_vec <= 0);
+    max_y = max_y + 1;
 }
-Y = table (seq (1, N, 1), Y_vec);
+Y = table (seq (1, N, 1), Y_vec, N, max_y);
 K = ncol (Y) - 1;   # The number of  non-baseline categories
 
 lambda = (scale_lambda %*% matrix (1, rows = 1, cols = K)) * regularization;


[3/3] incubator-systemml git commit: [SYSTEMML-968] Fix buffer pool integration of frames (evict, serialize)

Posted by mb...@apache.org.
[SYSTEMML-968] Fix buffer pool integration of frames (evict, serialize)

There was a basic integration of frames into the buffer pool. However,
the in-memory and serialized size was always reported as 1, which
effectively avoided evictions for frames. Accordingly, with sufficiently
large frame intermediates, out-of-memory errors occurred.

This patch now fully integrates frames into the buffer pool, which
covers the following changes:

(1) Implementation of frame block getInMemorySize and
getExcactSerializedSize as well as the decision on shallow serialize.

(2) Implementation of writeUTF in our CacheDataOutput and
FastBufferedOutputStream, which avoids redundant copies compared the
default approach of new DataOutputStream(new BufferedOutputStream(out)).

(3) Handling of deserialization and read of frame blocks on restore,
which so far only handled matrix blocks.  

(4) Variety of testcases with serialization/deserialization, write/read,
dense/sparse, and a number of different schemas.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/24cc180f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/24cc180f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/24cc180f

Branch: refs/heads/master
Commit: 24cc180f3f50e19fba95bd6277d507eac089e87e
Parents: 48a3bf7
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Sep 28 23:16:16 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Sep 29 10:54:26 2016 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/ByteBuffer.java      |  21 +-
 .../controlprogram/caching/CacheDataOutput.java |  74 +++---
 .../controlprogram/caching/LazyWriteBuffer.java |  29 +++
 .../sysml/runtime/io/IOUtilFunctions.java       |  23 ++
 .../sysml/runtime/matrix/data/FrameBlock.java   |  89 ++++++-
 .../util/FastBufferedDataOutputStream.java      |  57 ++++-
 .../sysml/runtime/util/LocalFileUtils.java      |  12 +-
 .../functions/frame/FrameEvictionTest.java      | 234 +++++++++++++++++++
 .../org/apache/sysml/test/utils/TestUtils.java  |   7 +-
 9 files changed, 494 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/24cc180f/src/main/java/org/apache/sysml/runtime/controlprogram/caching/ByteBuffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/ByteBuffer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/ByteBuffer.java
index 9402392..73d3068 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/ByteBuffer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/ByteBuffer.java
@@ -19,9 +19,13 @@
 
 package org.apache.sysml.runtime.controlprogram.caching;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.util.LocalFileUtils;
 
@@ -34,13 +38,14 @@ public class ByteBuffer
 {
 	private boolean _serialized;	
 	private boolean _shallow;
-	private long _size;
+	private boolean _matrix;
+	private int _size;
 	
 	protected byte[]     _bdata = null; //sparse matrix
 	protected CacheBlock _cdata = null; //dense matrix/frame
 	
 	public ByteBuffer( long size ) {
-		_size = size;
+		_size = (int)size;
 		_serialized = false;
 	}
 	
@@ -53,6 +58,7 @@ public class ByteBuffer
 		throws IOException
 	{	
 		_shallow = cb.isShallowSerialize();
+		_matrix = (cb instanceof MatrixBlock);
 		
 		try
 		{
@@ -60,9 +66,9 @@ public class ByteBuffer
 			{
 				//deep serialize (for compression)
 				if( CacheableData.CACHING_BUFFER_PAGECACHE )
-					_bdata = PageCache.getPage((int)_size);
+					_bdata = PageCache.getPage(_size);
 				if( _bdata==null )
-					_bdata = new byte[(int)_size];
+					_bdata = new byte[_size];
 				DataOutput dout = new CacheDataOutput(_bdata);
 				cb.write(dout);
 			}
@@ -89,9 +95,10 @@ public class ByteBuffer
 	{
 		CacheBlock ret = null;
 		
-		if( !_shallow ) { //sparse matrix 
-			CacheDataInput din = new CacheDataInput(_bdata);
-			ret = new MatrixBlock();
+		if( !_shallow ) { //sparse matrix / string frame
+			DataInput din = _matrix ? new CacheDataInput(_bdata) :
+				new DataInputStream(new ByteArrayInputStream(_bdata));
+			ret = _matrix ? new MatrixBlock() : new FrameBlock();
 			ret.readFields(din);
 		}
 		else { //dense matrix/frame

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/24cc180f/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataOutput.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataOutput.java
index 5fd0b4b..c74516c 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataOutput.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataOutput.java
@@ -21,72 +21,59 @@ package org.apache.sysml.runtime.controlprogram.caching;
 
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.UTFDataFormatException;
 
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.data.MatrixBlockDataOutput;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
 
 /**
- * Customer DataOutput to serialize directly into the given byte array.
- * 
+ * Custom DataOutput to serialize directly into the given byte array.
  * 
  */
 public class CacheDataOutput implements DataOutput, MatrixBlockDataOutput 
 {
-	
 	protected byte[] _buff;
 	protected int _bufflen;
 	protected int _count;
 
-	public CacheDataOutput( byte[] mem ) 
-	{		
+	public CacheDataOutput( byte[] mem ) {		
 		_buff = mem;
 		_bufflen = _buff.length;
 		_count = 0;
 	}
 	
 	@Override
-	public void write(int b) 
-    	throws IOException 
-    {
+	public void write(int b) throws IOException {
 		_buff[_count++] = (byte)b;
     }
 
     @Override
-	public void write(byte[] b) 
-		throws IOException 
-	{
+	public void write(byte[] b) throws IOException {
 		System.arraycopy(b, 0, _buff, _count, b.length);
 		_count += b.length;
 	}
     
     @Override
-	public void write(byte[] b, int off, int len) 
-    	throws IOException 
-    {
+	public void write(byte[] b, int off, int len) throws IOException {
 		System.arraycopy(b, off, _buff, _count, len);
 		_count += len;
     }
 	
 	@Override
-	public void writeBoolean(boolean v) 
-		throws IOException 
-	{
+	public void writeBoolean(boolean v) throws IOException {
 		_buff[_count++] = (byte)( v ? 1 : 0 );
 	}
 
 
 	@Override
-	public void writeInt(int v) 
-		throws IOException 
-	{
+	public void writeInt(int v) throws IOException {
 		intToBa(v, _buff, _count);
 		_count += 4;
 	}
 	
 	@Override
-	public void writeDouble(double v) 
-		throws IOException 
-	{
+	public void writeDouble(double v) throws IOException {
 		long tmp = Double.doubleToRawLongBits(v);		
 		longToBa(tmp, _buff, _count);
 		_count += 8;
@@ -125,15 +112,37 @@ public class CacheDataOutput implements DataOutput, MatrixBlockDataOutput
 
 	@Override
 	public void writeShort(int v) throws IOException {
-		throw new IOException("Not supported.");
+		shortToBa(v, _buff, _count);
+		_count += 2;
 	}
 
 	@Override
 	public void writeUTF(String s) throws IOException {
-		throw new IOException("Not supported.");
+		int slen = s.length();
+		int utflen = IOUtilFunctions.getUTFSize(s) - 2;
+		if (utflen-2 > 65535)
+			throw new UTFDataFormatException("encoded string too long: "+utflen);
+		
+		//write utf len (2 bytes) 
+		writeShort(utflen);
+		
+		//write utf payload
+		for( int i=0; i<slen; i++ ) {
+			char c = s.charAt(i);
+			if( c>= 0x0001 && c<=0x007F ) //1 byte range
+				writeByte(c);
+			else if( c>=0x0800 ) { //3 byte range
+				_buff[_count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+				_buff[_count++] = (byte) (0x80 | ((c >>  6) & 0x3F));
+				_buff[_count++] = (byte) (0x80 | ((c >>  0) & 0x3F));
+			}
+			else { //2 byte range and null
+				_buff[_count++] = (byte) (0xC0 | ((c >>  6) & 0x1F));
+				_buff[_count++] = (byte) (0x80 | ((c >>  0) & 0x3F));
+			}
+		}
 	}
 
-
     ///////////////////////////////////////////////
     // Implementation of MatrixBlockDSMDataOutput
     ///////////////////////////////////////////////	
@@ -197,6 +206,19 @@ public class CacheDataOutput implements DataOutput, MatrixBlockDataOutput
 	 * @param ba
 	 * @param off
 	 */
+	private static void shortToBa( final int val, byte[] ba, final int off )
+	{
+		//shift and mask out 2 bytes
+		ba[ off+0 ] = (byte)((val >>>  8) & 0xFF);
+		ba[ off+1 ] = (byte)((val >>>  0) & 0xFF);
+	}
+	
+	/**
+	 * 
+	 * @param val
+	 * @param ba
+	 * @param off
+	 */
 	private static void intToBa( final int val, byte[] ba, final int off )
 	{
 		//shift and mask out 4 bytes

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/24cc180f/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
index 61742bb..0aad717 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
@@ -227,6 +227,8 @@ public class LazyWriteBuffer
 	}
 	
 	/**
+	 * Print current status of buffer pool, including all entries.
+	 * NOTE: use only for debugging or testing.  
 	 * 
 	 */
 	public static void printStatus( String position )
@@ -252,6 +254,33 @@ public class LazyWriteBuffer
 	}
 	
 	/**
+	 * Evicts all buffer pool entries. 
+	 * NOTE: use only for debugging or testing.  
+	 * @throws IOException 
+	 * 
+	 */
+	public static void forceEviction() 
+		throws IOException 
+	{
+		//evict all matrices and frames
+		while( !_mQueue.isEmpty() )
+		{
+			//remove first entry from eviction queue
+			Entry<String, ByteBuffer> entry = _mQueue.removeFirst();
+			ByteBuffer tmp = entry.getValue();
+			
+			if( tmp != null ) {
+				//wait for pending serialization
+				tmp.checkSerialized();
+				
+				//evict matrix
+				tmp.evictBuffer(entry.getKey());
+				tmp.freeMemory();
+			}
+		}
+	}
+	
+	/**
 	 * Extended LinkedHashMap with convenience methods for adding and removing 
 	 * last/first entries.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/24cc180f/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index b615a59..7327796 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -285,6 +285,29 @@ public class IOUtilFunctions
 	}
 	
 	/**
+	 * Returns the serialized size in bytes of the given string value,
+	 * following the modified UTF-8 specification as used by Java's
+	 * DataInput/DataOutput.
+	 * 
+	 * see java docs: docs/api/java/io/DataInput.html#modified-utf-8
+	 * 
+	 * @param value
+	 * @return
+	 */
+	public static int getUTFSize(String value) {
+		if( value == null )
+			return 2;
+		//size in modified UTF-8 as used by DataInput/DataOutput
+		int size = 2; //length in bytes
+		for (int i = 0; i < value.length(); i++) {
+            char c = value.charAt(i);
+            size += ( c>=0x0001 && c<=0x007F) ? 1 :
+            	(c >= 0x0800) ? 3 : 2;
+        }
+		return size;
+	}
+	
+	/**
 	 * 
 	 * @param input
 	 * @return

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/24cc180f/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 67674fe..8d74dba 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -629,20 +629,86 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 	////////
 	// CacheBlock implementation
 	
+	@Override
 	public long getInMemorySize() {
-		return 1;
+		//frame block header
+		long size = 16 + 4; //object, num rows
+		
+		//schema array (overhead and int entries)
+		int clen = getNumColumns();
+		size += 8 + 32 + clen * 4;
+		
+		//colname array (overhead and string entries)
+		size += 8 + 32;
+		for( int j=0; j<clen; j++ )
+			size += getInMemoryStringSize(_colnames[j]);
+		
+		//meta data array (overhead and entries)
+		size += 8 + 32;
+		for( int j=0; j<clen; j++ ) {
+			size += 16 + 8 + 8 //object, long num distinct, ref mv 
+				+ getInMemoryStringSize(_colmeta[j].getMvValue());
+		}
+		
+		//data array (overhead and entries)
+		size += 8 + 32 + clen * (16+4+8+32);
+		for( int j=0; j<clen; j++ ) {
+			switch( _schema[j] ) {
+				case BOOLEAN: size += _numRows; break;
+				case INT:
+				case DOUBLE: size += 8*_numRows; break;
+				case STRING: 
+					StringArray arr = (StringArray)_coldata[j];
+					for( int i=0; i<_numRows; i++ )
+						size += getInMemoryStringSize(arr.get(i));
+					break;
+				default: //not applicable	
+			}
+		}
+		
+		return size;
 	}
 	
 	@Override
 	public long getExactSerializedSize() {
-		//TODO implement getExactSizeOnDisk();
-		return 1;
+		//header: 2xint, boolean
+		long size = 9;
+		
+		//column sizes
+		boolean isDefaultMeta = isColNamesDefault()
+				&& isColumnMetadataDefault();
+		for( int j=0; j<getNumColumns(); j++ ) {
+			size += 1; //column schema
+			if( !isDefaultMeta ) {
+				size += IOUtilFunctions.getUTFSize(_colnames[j]);
+				size += 8;
+				size += IOUtilFunctions.getUTFSize(_colmeta[j].getMvValue());
+			}
+			switch( _schema[j] ) {
+				case BOOLEAN: size += _numRows; break;
+				case INT:
+				case DOUBLE: size += 8*_numRows; break;
+				case STRING: 
+					StringArray arr = (StringArray)_coldata[j];
+					for( int i=0; i<_numRows; i++ )
+						size += IOUtilFunctions.getUTFSize(arr.get(i));
+					break;
+				default: //not applicable	
+			}
+		}
+		
+		return size;
 	}
 	
 	@Override
 	public boolean isShallowSerialize() {
-		//shallow serialize since frames always dense
-		return true;
+		//shallow serialize if non-string schema because a frame block
+		//is always dense but strings have large array overhead per cell
+		boolean ret = true;
+		for( int j=0; j<_schema.length && ret; j++ )
+			ret &= (_schema[j] != ValueType.STRING);
+		
+		return ret;
 	}
 	
 	@Override
@@ -650,6 +716,19 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		//do nothing
 	}
 	
+	/**
+	 * Returns the in-memory size in bytes of the given string value. 
+	 * 
+	 * @param value
+	 * @return
+	 */
+	private long getInMemoryStringSize(String value) {
+		if( value == null )
+			return 0;
+		return 16 + 4 + 8 //object, hash, array ref
+			+ 32 + value.length();     //char array 
+	}
+	
 	///////
 	// indexing and append operations
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/24cc180f/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
index b059a06..b19eee2 100644
--- a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
+++ b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataOutputStream.java
@@ -23,7 +23,9 @@ import java.io.DataOutput;
 import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.UTFDataFormatException;
 
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.data.MatrixBlockDataOutput;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
 
@@ -175,6 +177,15 @@ public class FastBufferedDataOutputStream extends FilterOutputStream implements
 	}
 
 	@Override
+	public void writeShort(int v) throws IOException {
+		if (_count+2 > _bufflen) {
+		    flushBuffer();
+		}
+		shortToBa(v, _buff, _count);
+		_count += 2;	
+	}
+
+	@Override
 	public void writeBytes(String s) throws IOException {
 		throw new IOException("Not supported.");
 	}
@@ -195,17 +206,34 @@ public class FastBufferedDataOutputStream extends FilterOutputStream implements
 	}
 
 	@Override
-	public void writeShort(int v) throws IOException {
-		throw new IOException("Not supported.");
-	}
-
-	@Override
 	public void writeUTF(String s) throws IOException {
-		byte[] strBytes = s.getBytes("UTF-8");
-		write(strBytes, 0, strBytes.length);
+		int slen = s.length();
+		int utflen = IOUtilFunctions.getUTFSize(s) - 2;
+		if (utflen-2 > 65535)
+			throw new UTFDataFormatException("encoded string too long: "+utflen);
+		
+		//write utf len (2 bytes) 
+		writeShort(utflen);
+		
+		//write utf payload
+		for( int i=0; i<slen; i++ ) {
+			if (_count+3 > _bufflen)
+			    flushBuffer();
+			char c = s.charAt(i);
+			if( c>= 0x0001 && c<=0x007F ) //1 byte range
+				_buff[_count++] = (byte) c;
+			else if( c>=0x0800 ) { //3 byte range
+				_buff[_count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+				_buff[_count++] = (byte) (0x80 | ((c >>  6) & 0x3F));
+				_buff[_count++] = (byte) (0x80 | ((c >>  0) & 0x3F));
+			}
+			else { //2 byte range and null
+				_buff[_count++] = (byte) (0xC0 | ((c >>  6) & 0x1F));
+				_buff[_count++] = (byte) (0x80 | ((c >>  0) & 0x3F));
+			}
+		}
 	}
 
-
     ///////////////////////////////////////////////
     // Implementation of MatrixBlockDSMDataOutput
     ///////////////////////////////////////////////	
@@ -298,6 +326,19 @@ public class FastBufferedDataOutputStream extends FilterOutputStream implements
 	 * @param ba
 	 * @param off
 	 */
+	private static void shortToBa( final int val, byte[] ba, final int off )
+	{
+		//shift and mask out 2 bytes
+		ba[ off+0 ] = (byte)((val >>>  8) & 0xFF);
+		ba[ off+1 ] = (byte)((val >>>  0) & 0xFF);
+	}
+	
+	/**
+	 * 
+	 * @param val
+	 * @param ba
+	 * @param off
+	 */
 	private static void intToBa( final int val, byte[] ba, final int off )
 	{
 		//shift and mask out 4 bytes

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/24cc180f/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
index c121844..f024601 100644
--- a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
@@ -19,10 +19,14 @@
 
 package org.apache.sysml.runtime.util;
 
+import java.io.BufferedInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
@@ -101,13 +105,15 @@ public class LocalFileUtils
 		throws IOException
 	{
 		FileInputStream fis = new FileInputStream( filePathAndName );
-		FastBufferedDataInputStream in = new FastBufferedDataInputStream(fis, BUFFER_SIZE);
-		
+		DataInput in  = !(ret instanceof MatrixBlock) ? 
+				new DataInputStream(new BufferedInputStream(fis, BUFFER_SIZE)) :
+				new FastBufferedDataInputStream(fis, BUFFER_SIZE);		
 		try {
 			ret.readFields(in);
 		}
 		finally {
-			IOUtilFunctions.closeSilently(in);
+			IOUtilFunctions.closeSilently(
+					(InputStream)in);
 		}
 			
 		return ret;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/24cc180f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameEvictionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameEvictionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameEvictionTest.java
new file mode 100644
index 0000000..7d0abfa
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameEvictionTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.frame;
+
+import java.lang.reflect.Method;
+
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysml.runtime.controlprogram.caching.LazyWriteBuffer;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
+
+public class FrameEvictionTest extends AutomatedTestBase
+{
+	private final static int rows = 1593;
+	private final static double sparsity1 = 0.9;
+	private final static double sparsity2 = 0.1;		
+	
+	private final static ValueType[] schemaDoubles = new ValueType[]{ValueType.DOUBLE, ValueType.DOUBLE, ValueType.DOUBLE};	
+	private final static ValueType[] schemaStrings = new ValueType[]{ValueType.STRING, ValueType.STRING, ValueType.STRING};	
+	private final static ValueType[] schemaMixed = new ValueType[]{ValueType.STRING, ValueType.DOUBLE, ValueType.INT, ValueType.BOOLEAN};	
+	
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+	}
+
+	@Test
+	public void testFrameEvictionDoublesDenseDefault()  {
+		runFrameEvictionTest(schemaDoubles, false, true, false);
+	}
+	
+	@Test
+	public void testFrameEvictionDoublesDenseCustom()  {
+		runFrameEvictionTest(schemaDoubles, false, false, false);
+	}
+	
+	@Test
+	public void testFrameEvictionDoublesSparseDefault()  {
+		runFrameEvictionTest(schemaDoubles, true, true, false);
+	}
+	
+	@Test
+	public void testFrameEvictionDoublesSparseCustom()  {
+		runFrameEvictionTest(schemaDoubles, true, false, false);
+	}
+	
+	@Test
+	public void testFrameEvictionStringsDenseDefault()  {
+		runFrameEvictionTest(schemaStrings, false, true, false);
+	}
+	
+	@Test
+	public void testFrameEvictionStringsDenseCustom()  {
+		runFrameEvictionTest(schemaStrings, false, false, false);
+	}
+	
+	@Test
+	public void testFrameEvictionStringsSparseDefault()  {
+		runFrameEvictionTest(schemaStrings, true, true, false);
+	}
+	
+	@Test
+	public void testFrameEvictionStringsSparseCustom()  {
+		runFrameEvictionTest(schemaStrings, true, false, false);
+	}
+	
+	@Test
+	public void testFrameEvictionMixedDenseDefault()  {
+		runFrameEvictionTest(schemaMixed, false, true, false);
+	}
+	
+	@Test
+	public void testFrameEvictionMixedDenseCustom()  {
+		runFrameEvictionTest(schemaMixed, false, false, false);
+	}
+	
+	@Test
+	public void testFrameEvictionMixedSparseDefault()  {
+		runFrameEvictionTest(schemaMixed, true, true, false);
+	}
+	
+	@Test
+	public void testFrameEvictionMixedSparseCustom()  {
+		runFrameEvictionTest(schemaMixed, true, false, false);
+	}
+
+	@Test
+	public void testFrameEvictionDoublesDenseDefaultForce()  {
+		runFrameEvictionTest(schemaDoubles, false, true, true);
+	}
+	
+	@Test
+	public void testFrameEvictionDoublesDenseCustomForce()  {
+		runFrameEvictionTest(schemaDoubles, false, false, true);
+	}
+	
+	@Test
+	public void testFrameEvictionDoublesSparseDefaultForce()  {
+		runFrameEvictionTest(schemaDoubles, true, true, true);
+	}
+	
+	@Test
+	public void testFrameEvictionDoublesSparseCustomForce()  {
+		runFrameEvictionTest(schemaDoubles, true, false, true);
+	}
+	
+	@Test
+	public void testFrameEvictionStringsDenseDefaultForce()  {
+		runFrameEvictionTest(schemaStrings, false, true, true);
+	}
+	
+	@Test
+	public void testFrameEvictionStringsDenseCustomForce()  {
+		runFrameEvictionTest(schemaStrings, false, false, true);
+	}
+	
+	@Test
+	public void testFrameEvictionStringsSparseDefaultForce()  {
+		runFrameEvictionTest(schemaStrings, true, true, true);
+	}
+	
+	@Test
+	public void testFrameEvictionStringsSparseCustomForce()  {
+		runFrameEvictionTest(schemaStrings, true, false, true);
+	}
+	
+	@Test
+	public void testFrameEvictionMixedDenseDefaultForce()  {
+		runFrameEvictionTest(schemaMixed, false, true, true);
+	}
+	
+	@Test
+	public void testFrameEvictionMixedDenseCustomForce()  {
+		runFrameEvictionTest(schemaMixed, false, false, true);
+	}
+	
+	@Test
+	public void testFrameEvictionMixedSparseDefaultForce()  {
+		runFrameEvictionTest(schemaMixed, true, true, true);
+	}
+	
+	@Test
+	public void testFrameEvictionMixedSparseCustomForce()  {
+		runFrameEvictionTest(schemaMixed, true, false, true);
+	}
+
+	
+	/**
+	 * 
+	 * @param schema
+	 * @param sparse
+	 * @param defaultMeta
+	 * @param force
+	 */
+	private void runFrameEvictionTest( ValueType[] schema, boolean sparse, boolean defaultMeta, boolean force)
+	{
+		try
+		{
+			//data generation
+			double sparsity = sparse ? sparsity2 : sparsity1;
+			double[][] A = getRandomMatrix(rows, schema.length, -10, 10, sparsity, 765); 
+			MatrixBlock mA = DataConverter.convertToMatrixBlock(A);
+			FrameBlock fA = DataConverter.convertToFrameBlock(mA, schema);
+			
+			//create non-default column names
+			if( !defaultMeta ) {
+				String[] colnames = new String[schema.length];
+				for( int i=0; i<schema.length; i++ )
+					colnames[i] = "Custom_name_"+i;
+				fA.setColumnNames(colnames);
+			}
+		
+			//setup caching
+			CacheableData.initCaching("tmp_frame_eviction_test");
+			
+			//create frame object
+			MatrixCharacteristics mc = new MatrixCharacteristics(rows, schema.length, -1, -1, -1);
+			MatrixFormatMetaData meta = new MatrixFormatMetaData (mc, 
+					OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo);
+			FrameObject fo = new FrameObject("fA",  meta, schema);
+			fo.acquireModify(fA);
+			fo.release();
+			
+			//evict frame and clear in-memory reference
+			if( force )
+				LazyWriteBuffer.forceEviction();
+			Method clearfo = CacheableData.class
+					.getDeclaredMethod("clearCache", new Class[]{});
+			clearfo.setAccessible(true); //make method public
+			clearfo.invoke(fo, new Object[]{});
+			
+			//read frame through buffer pool (if forced, this is a read from disk
+			//otherwise deserialization or simple reference depending on schema)
+			FrameBlock fA2 = fo.acquireRead();
+			fo.release();
+			
+			//compare frames
+			String[][] sA = DataConverter.convertToStringFrame(fA);
+			String[][] sA2 = DataConverter.convertToStringFrame(fA2);
+			TestUtils.compareFrames(sA, sA2, rows, schema.length);
+		}
+		catch(Exception ex) {
+			ex.printStackTrace();
+			throw new RuntimeException(ex);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/24cc180f/src/test/java/org/apache/sysml/test/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/utils/TestUtils.java b/src/test/java/org/apache/sysml/test/utils/TestUtils.java
index 2c387d2..3b85f7b 100644
--- a/src/test/java/org/apache/sysml/test/utils/TestUtils.java
+++ b/src/test/java/org/apache/sysml/test/utils/TestUtils.java
@@ -754,12 +754,13 @@ public class TestUtils
 	 * @param cols
 	 * @param epsilon
 	 */
-	public static void compareFrames(String[][] expectedMatrix, String[][] actualMatrix, int rows, int cols ) {
+	public static void compareFrames(String[][] expectedFrame, String[][] actualFrame, int rows, int cols ) {
 		int countErrors = 0;
 		for (int i = 0; i < rows; i++) {
 			for (int j = 0; j < cols; j++) {
-				if( !(expectedMatrix[i][j].equals(actualMatrix[i][j]) || (expectedMatrix[i][j]+".0").equals(actualMatrix[i][j])) ) {
-					System.out.println(expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j);
+				if( !( (expectedFrame[i][j]==null && actualFrame[i][j]==null) ||
+					expectedFrame[i][j].equals(actualFrame[i][j]) || (expectedFrame[i][j]+".0").equals(actualFrame[i][j])) ) {
+					System.out.println(expectedFrame[i][j] +" vs actual: "+actualFrame[i][j]+" at "+i+" "+j);
 					countErrors++;
 				}
 			}


[2/3] incubator-systemml git commit: [SYSTEMML-970] Fix indexing vectorization rewrite (awareness of frames)

Posted by mb...@apache.org.
[SYSTEMML-970] Fix indexing vectorization rewrite (awareness of frames)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/48a3bf72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/48a3bf72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/48a3bf72

Branch: refs/heads/master
Commit: 48a3bf72a9e734be40c2c242e6fd40a50f166e7d
Parents: 10dff5c
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Sep 28 16:58:36 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Sep 29 10:54:22 2016 -0700

----------------------------------------------------------------------
 .../hops/rewrite/RewriteIndexingVectorization.java    | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/48a3bf72/src/main/java/org/apache/sysml/hops/rewrite/RewriteIndexingVectorization.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteIndexingVectorization.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteIndexingVectorization.java
index 26aeb1b..97205e4 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteIndexingVectorization.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteIndexingVectorization.java
@@ -28,8 +28,6 @@ import org.apache.sysml.hops.HopsException;
 import org.apache.sysml.hops.IndexingOp;
 import org.apache.sysml.hops.LeftIndexingOp;
 import org.apache.sysml.hops.LiteralOp;
-import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.parser.Expression.ValueType;
 
 /**
  * Rule: Indexing vectorization. This rewrite rule set simplifies
@@ -138,7 +136,7 @@ public class RewriteIndexingVectorization extends HopRewriteRule
 				//apply rewrite if found candidates
 				if( ihops.size() > 1 ){
 					//new row indexing operator
-					IndexingOp newRix = new IndexingOp("tmp", DataType.MATRIX, ValueType.DOUBLE, input, 
+					IndexingOp newRix = new IndexingOp("tmp", input.getDataType(), input.getValueType(), input, 
 							            ihop0.getInput().get(1), ihop0.getInput().get(1), new LiteralOp(1), 
 							            HopRewriteUtils.createValueHop(input, false), true, false); 
 					HopRewriteUtils.setOutputParameters(newRix, -1, -1, input.getRowsInBlock(), input.getColsInBlock(), -1);
@@ -177,7 +175,7 @@ public class RewriteIndexingVectorization extends HopRewriteRule
 				//apply rewrite if found candidates
 				if( ihops.size() > 1 ){
 					//new row indexing operator
-					IndexingOp newRix = new IndexingOp("tmp", DataType.MATRIX, ValueType.DOUBLE, input, 
+					IndexingOp newRix = new IndexingOp("tmp", input.getDataType(), input.getValueType(), input, 
 							         new LiteralOp(1), HopRewriteUtils.createValueHop(input, true),
 				                    ihop0.getInput().get(3), ihop0.getInput().get(3), false, true); 
 					HopRewriteUtils.setOutputParameters(newRix, -1, -1, input.getRowsInBlock(), input.getColsInBlock(), -1);
@@ -240,7 +238,7 @@ public class RewriteIndexingVectorization extends HopRewriteRule
 					Hop rowExpr = ihop0.getInput().get(2); //keep before reset
 					
 					//new row indexing operator
-					IndexingOp newRix = new IndexingOp("tmp1", DataType.MATRIX, ValueType.DOUBLE, input, 
+					IndexingOp newRix = new IndexingOp("tmp1", input.getDataType(), input.getValueType(), input, 
 							            rowExpr, rowExpr, new LiteralOp(1), 
 							            HopRewriteUtils.createValueHop(input, false), true, false); 
 					HopRewriteUtils.setOutputParameters(newRix, -1, -1, input.getRowsInBlock(), input.getColsInBlock(), -1);
@@ -271,7 +269,7 @@ public class RewriteIndexingVectorization extends HopRewriteRule
 						ihop0parentsPos.add(posp);
 					}
 					
-					LeftIndexingOp newLix = new LeftIndexingOp("tmp2", DataType.MATRIX, ValueType.DOUBLE, input, ihop0, 
+					LeftIndexingOp newLix = new LeftIndexingOp("tmp2", input.getDataType(), input.getValueType(), input, ihop0, 
 													rowExpr, rowExpr, new LiteralOp(1), 
 													HopRewriteUtils.createValueHop(input, false), true, false); 
 					HopRewriteUtils.setOutputParameters(newLix, -1, -1, input.getRowsInBlock(), input.getColsInBlock(), -1);
@@ -314,7 +312,7 @@ public class RewriteIndexingVectorization extends HopRewriteRule
 					Hop colExpr = ihop0.getInput().get(4); //keep before reset
 					
 					//new row indexing operator
-					IndexingOp newRix = new IndexingOp("tmp1", DataType.MATRIX, ValueType.DOUBLE, input, 
+					IndexingOp newRix = new IndexingOp("tmp1", input.getDataType(), input.getValueType(), input, 
 							        new LiteralOp(1), HopRewriteUtils.createValueHop(input, true),            
 									colExpr, colExpr, false, true); 
 					HopRewriteUtils.setOutputParameters(newRix, -1, -1, input.getRowsInBlock(), input.getColsInBlock(), -1);
@@ -345,7 +343,7 @@ public class RewriteIndexingVectorization extends HopRewriteRule
 						ihop0parentsPos.add(posp);
 					}
 					
-					LeftIndexingOp newLix = new LeftIndexingOp("tmp2", DataType.MATRIX, ValueType.DOUBLE, input, ihop0, 
+					LeftIndexingOp newLix = new LeftIndexingOp("tmp2", input.getDataType(), input.getValueType(), input, ihop0, 
 							                        new LiteralOp(1), HopRewriteUtils.createValueHop(input, true), 
 													colExpr, colExpr, false, true); 
 					HopRewriteUtils.setOutputParameters(newLix, -1, -1, input.getRowsInBlock(), input.getColsInBlock(), -1);