You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ac...@apache.org on 2016/09/29 19:25:36 UTC

incubator-systemml git commit: [SYSTEMML-964] Do not use FastBufferedDataIOStream for frame

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 24cc180f3 -> 91118def5


[SYSTEMML-964] Do not use FastBufferedDataIOStream for frame


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/91118def
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/91118def
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/91118def

Branch: refs/heads/master
Commit: 91118def5e19d34cc0202d60538044d76b2e26e8
Parents: 24cc180
Author: Arvind Surve <ac...@yahoo.com>
Authored: Wed Sep 28 13:43:40 2016 -0700
Committer: Arvind Surve <ac...@yahoo.com>
Committed: Thu Sep 29 12:24:41 2016 -0700

----------------------------------------------------------------------
 .../spark/data/PartitionedBlock.java            | 28 +++++++++++++++-----
 1 file changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/91118def/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
index fbd7a25..b18f32a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -305,13 +305,14 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 	{
 		DataInput dis = is;
 		
-		if( is instanceof ObjectInputStream ) {
+		int code = readHeader(dis);
+		if( is instanceof ObjectInputStream && code == 0) {	// Apply only for MatrixBlock at this point as a temporary workaround
+															// We will generalize this code by adding UTF functionality to support Frame
 			//fast deserialize of dense/sparse blocks
 			ObjectInputStream ois = (ObjectInputStream)is;
 			dis = new FastBufferedDataInputStream(ois);
 		}
-		
-		readHeaderAndPayload(dis);
+		readPayload(dis, code);
 	}
 	
 	/**
@@ -324,7 +325,9 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 	public void writeExternal(ObjectOutput os) 
 		throws IOException
 	{
-		if( os instanceof ObjectOutputStream ) {
+		if( os instanceof ObjectOutputStream 
+				&& CacheBlockFactory.getCode(_partBlocks[0]) == 0) {// Apply only for MatrixBlock at this point as a temporary workaround
+																	// We will generalize this code by adding UTF functionality to support Frame 
 			//fast serialize of dense/sparse blocks
 			ObjectOutputStream oos = (ObjectOutputStream)os;
 			FastBufferedDataOutputStream fos = new FastBufferedDataOutputStream(oos);
@@ -356,13 +359,13 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 		for( CacheBlock block : _partBlocks )
 			block.write(dos);
 	}
-
+	
 	/**
 	 * 
 	 * @param din
 	 * @throws IOException 
 	 */
-	private void readHeaderAndPayload(DataInput dis) 
+	private int readHeader(DataInput dis) 
 		throws IOException
 	{
 		_rlen = dis.readLong();
@@ -374,6 +377,19 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 		int code = dis.readByte();
 		
 		_partBlocks = new CacheBlock[len];
+		
+		return code;
+	}
+
+	/**
+	 * 
+	 * @param din
+	 * @throws IOException 
+	 */
+	private void readPayload(DataInput dis, int code) 
+		throws IOException
+	{
+		int len = _partBlocks.length;
 		for( int i=0; i<len; i++ ) {
 			_partBlocks[i] = CacheBlockFactory.newInstance(code);
 			_partBlocks[i].readFields(dis);