You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ac...@apache.org on 2016/09/29 19:25:36 UTC
incubator-systemml git commit: [SYSTEMML-964] Do not use
FastBufferedDataIOStream for frame
Repository: incubator-systemml
Updated Branches:
refs/heads/master 24cc180f3 -> 91118def5
[SYSTEMML-964] Do not use FastBufferedDataIOStream for frame
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/91118def
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/91118def
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/91118def
Branch: refs/heads/master
Commit: 91118def5e19d34cc0202d60538044d76b2e26e8
Parents: 24cc180
Author: Arvind Surve <ac...@yahoo.com>
Authored: Wed Sep 28 13:43:40 2016 -0700
Committer: Arvind Surve <ac...@yahoo.com>
Committed: Thu Sep 29 12:24:41 2016 -0700
----------------------------------------------------------------------
.../spark/data/PartitionedBlock.java | 28 +++++++++++++++-----
1 file changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/91118def/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
index fbd7a25..b18f32a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -305,13 +305,14 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
{
DataInput dis = is;
- if( is instanceof ObjectInputStream ) {
+ int code = readHeader(dis);
+ if( is instanceof ObjectInputStream && code == 0) { // Apply only for MatrixBlock at this point as a temporary workaround
+ // We will generalize this code by adding UTF functionality to support Frame
//fast deserialize of dense/sparse blocks
ObjectInputStream ois = (ObjectInputStream)is;
dis = new FastBufferedDataInputStream(ois);
}
-
- readHeaderAndPayload(dis);
+ readPayload(dis, code);
}
/**
@@ -324,7 +325,9 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
public void writeExternal(ObjectOutput os)
throws IOException
{
- if( os instanceof ObjectOutputStream ) {
+ if( os instanceof ObjectOutputStream
+ && CacheBlockFactory.getCode(_partBlocks[0]) == 0) {// Apply only for MatrixBlock at this point as a temporary workaround
+ // We will generalize this code by adding UTF functionality to support Frame
//fast serialize of dense/sparse blocks
ObjectOutputStream oos = (ObjectOutputStream)os;
FastBufferedDataOutputStream fos = new FastBufferedDataOutputStream(oos);
@@ -356,13 +359,13 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
for( CacheBlock block : _partBlocks )
block.write(dos);
}
-
+
/**
*
* @param din
* @throws IOException
*/
- private void readHeaderAndPayload(DataInput dis)
+ private int readHeader(DataInput dis)
throws IOException
{
_rlen = dis.readLong();
@@ -374,6 +377,19 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
int code = dis.readByte();
_partBlocks = new CacheBlock[len];
+
+ return code;
+ }
+
+ /**
+ *
+ * @param din
+ * @throws IOException
+ */
+ private void readPayload(DataInput dis, int code)
+ throws IOException
+ {
+ int len = _partBlocks.length;
for( int i=0; i<len; i++ ) {
_partBlocks[i] = CacheBlockFactory.newInstance(code);
_partBlocks[i].readFields(dis);