You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ac...@apache.org on 2016/03/27 00:12:14 UTC
incubator-systemml git commit: [SYSTEMML-573] Frame write
Repository: incubator-systemml
Updated Branches:
refs/heads/master d40081527 -> 02aadb80a
[SYSTEMML-573] Frame write
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/02aadb80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/02aadb80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/02aadb80
Branch: refs/heads/master
Commit: 02aadb80a9f7cba346d69c08e4ca2782d5fccadf
Parents: d400815
Author: Arvind Surve <ac...@yahoo.com>
Authored: Sat Mar 26 16:11:51 2016 -0700
Committer: Arvind Surve <ac...@yahoo.com>
Committed: Sat Mar 26 16:11:51 2016 -0700
----------------------------------------------------------------------
.../apache/sysml/runtime/io/FrameWriter.java | 11 +-
.../runtime/io/FrameWriterBinaryBlock.java | 104 ++++++++++++++++++-
.../sysml/runtime/io/FrameWriterTextCSV.java | 99 +++++++++++++++++-
.../sysml/runtime/io/FrameWriterTextCell.java | 88 +++++++++++++++-
.../sysml/runtime/util/UtilFunctions.java | 2 +-
5 files changed, 296 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/02aadb80/src/main/java/org/apache/sysml/runtime/io/FrameWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriter.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriter.java
index 0dc528a..7f15c25 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriter.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriter.java
@@ -28,7 +28,7 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
/**
- * Base class for all format-specific matrix writers. Every writer is required to implement the basic
+ * Base class for all format-specific frame writers. Every writer is required to implement the basic
* write functionality but might provide additional custom functionality. Any non-default parameters
* (e.g., CSV read properties) should be passed into custom constructors. There is also a factory
* for creating format-specific writers.
@@ -41,12 +41,14 @@ public abstract class FrameWriter
*
* @param src
* @param fname
+ * @param rlen
+ * @param clen
* @return
* @throws IOException
* @throws DMLUnsupportedOperationException
* @throws DMLRuntimeException
*/
- public abstract void writeFrameToHDFS( FrameBlock src, String fname )
+ public abstract void writeFrameToHDFS( FrameBlock src, String fname, long rlen, long clen )
throws IOException, DMLRuntimeException, DMLUnsupportedOperationException;
/**
@@ -56,12 +58,13 @@ public abstract class FrameWriter
* @return
* @throws DMLRuntimeException
*/
- public static FrameBlock[] createFrameBlocksForReuse( List<ValueType> schema, List<String> names )
+ public static FrameBlock[] createFrameBlocksForReuse( List<ValueType> schema, List<String> names, long rlen )
throws DMLRuntimeException
{
FrameBlock frameBlock[] = new FrameBlock[1];
frameBlock[0] = new FrameBlock(schema, names);
+ frameBlock[0].ensureAllocatedColumns((int)rlen);
return frameBlock;
}
@@ -70,7 +73,7 @@ public abstract class FrameWriter
* @param blocks
* @return
*/
- public static FrameBlock getMatrixBlockForReuse( FrameBlock[] blocks) //TODO do we need this function?
+ public static FrameBlock getFrameBlockForReuse( FrameBlock[] blocks) //TODO do we need this function?
{
return blocks[ 0 ];
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/02aadb80/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
index 46e12fa..88dc8ca 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
@@ -21,24 +21,126 @@ package org.apache.sysml.runtime.io;
import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.util.MapReduceTool;
+
+
+/*
+ * This write uses fixed size blocks with block-encoded keys.
+ *
+ *
+ */
public class FrameWriterBinaryBlock extends FrameWriter
{
/**
* @param src
* @param fname
+ * @param rlen
+ * @param clen
* @return
* @throws IOException
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
@Override
- public void writeFrameToHDFS( FrameBlock src, String fname )
+ public void writeFrameToHDFS( FrameBlock src, String fname, long rlen, long clen )
throws IOException, DMLRuntimeException, DMLUnsupportedOperationException
{
+ //prepare file access
+ JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
+ Path path = new Path( fname );
+
+ //if the file already exists on HDFS, remove it.
+ MapReduceTool.deleteFileIfExistOnHDFS( fname );
+
+ //core write
+ writeBinaryBlockFrameToHDFS(path, job, src, rlen, clen);
}
+ /**
+ *
+ * @param path
+ * @param job
+ * @param src
+ * @param rlen
+ * @param clen
+ * @return
+ * @throws IOException
+ * @throws DMLUnsupportedOperationException
+ * @throws DMLRuntimeException
+ */
+ @SuppressWarnings("deprecation")
+ protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen )
+ throws IOException, DMLRuntimeException, DMLUnsupportedOperationException
+ {
+ FileSystem fs = FileSystem.get(job);
+ int brlen = ConfigurationManager.getBlocksize();
+ int bclen = ConfigurationManager.getBlocksize();
+
+ // 1) create sequence file writer
+ SequenceFile.Writer writer = null;
+ writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class);
+
+ try
+ {
+ // 2) bound check for src block
+ if( src.getNumRows() > rlen || src.getNumColumns() > clen )
+ {
+ throw new IOException("Frame block [1:"+src.getNumRows()+",1:"+src.getNumColumns()+"] " +
+ "out of overall frame range [1:"+rlen+",1:"+clen+"].");
+ }
+
+ //3) reblock and write
+ MatrixIndexes indexes = new MatrixIndexes();
+
+ if( rlen <= brlen && clen <= bclen ) //opt for single block
+ {
+ //directly write single block
+ indexes.setIndexes(1, 1);
+ writer.append(indexes, src);
+ }
+ else //general case
+ {
+ //initialize blocks for reuse (at most 4 different blocks required)
+ FrameBlock[] blocks = createFrameBlocksForReuse(src.getSchema(), src.getColumnNames(), rlen);
+
+ //create and write subblocks of frame
+ for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++)
+ for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++)
+ {
+ int maxRow = (blockRow*brlen + brlen < src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen;
+ int maxCol = (blockCol*bclen + bclen < src.getNumColumns()) ? bclen : src.getNumColumns() - blockCol*bclen;
+
+ int row_offset = blockRow*brlen;
+ int col_offset = blockCol*bclen;
+
+ //get reuse frame block
+ FrameBlock block = getFrameBlockForReuse(blocks);
+
+ //copy subpart to block
+ src.sliceOperations( row_offset, row_offset+maxRow-1,
+ col_offset, col_offset+maxCol-1, block );
+
+ //append block to sequence file
+ indexes.setIndexes(blockRow+1, blockCol+1);
+ writer.append(indexes, block);
+ }
+ }
+
+ }
+ finally
+ {
+ IOUtilFunctions.closeSilently(writer);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/02aadb80/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
index f118b43..650778d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
@@ -19,12 +19,19 @@
package org.apache.sysml.runtime.io;
+import java.io.BufferedWriter;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.MapReduceTool;
/**
*
@@ -44,15 +51,105 @@ public class FrameWriterTextCSV extends FrameWriter
/**
* @param src
* @param fname
+ * @param rlen
+ * @param clen
* @return
* @throws IOException
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
@Override
- public void writeFrameToHDFS( FrameBlock src, String fname )
+ public void writeFrameToHDFS(FrameBlock src, String fname, long rlen, long clen)
throws IOException, DMLRuntimeException, DMLUnsupportedOperationException
{
+ //validity check frame dimensions
+ if( src.getNumRows() != rlen || src.getNumColumns() != clen ) {
+ throw new IOException("Frame dimensions mismatch with metadata: "+src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+".");
+ }
+
+ //prepare file access
+ JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
+ Path path = new Path( fname );
+
+ //if the file already exists on HDFS, remove it.
+ MapReduceTool.deleteFileIfExistOnHDFS( fname );
+
+ //core write
+ writeCSVFrameToHDFS(path, job, src, rlen, clen, _props);
+ }
+
+ /**
+ *
+ * @param path
+ * @param job
+ * @param src
+ * @param rlen
+ * @param clen
+ * @param props
+ * @return
+ * @throws IOException
+ */
+ protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen, CSVFileFormatProperties props )
+ throws IOException
+ {
+ FileSystem fs = FileSystem.get(job);
+ BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
+
+ try
+ {
+ //for obj reuse and preventing repeated buffer re-allocations
+ StringBuilder sb = new StringBuilder();
+
+ props = (props==null)? new CSVFileFormatProperties() : props;
+ String delim = props.getDelim();
+
+ // Write header line, if needed
+ if( props.hasHeader() )
+ {
+ //write row chunk-wise to prevent OOM on large number of columns
+ for( int bj=0; bj<clen; bj+=BLOCKSIZE_J )
+ {
+ for( int j=bj; j < Math.min(clen,bj+BLOCKSIZE_J); j++)
+ {
+ sb.append("C"+ (j+1));
+ if ( j < clen-1 )
+ sb.append(delim);
+ }
+ br.write( sb.toString() );
+ sb.setLength(0);
+ }
+ sb.append('\n');
+ br.write( sb.toString() );
+ sb.setLength(0);
+ }
+
+ // Write data lines
+ for( int i=0; i<rlen; i++ )
+ {
+ //write row chunk-wise to prevent OOM on large number of columns
+ for( int bj=0; bj<clen; bj+=BLOCKSIZE_J )
+ {
+ for( int j=bj; j<Math.min(clen,bj+BLOCKSIZE_J); j++ )
+ {
+ if(src.get(i, j) != null)
+ sb.append(src.get(i, j).toString());
+
+ if( j != clen-1 )
+ sb.append(delim);
+ }
+ br.write( sb.toString() );
+ sb.setLength(0);
+ }
+
+ sb.append('\n');
+ br.write( sb.toString() ); //same as append
+ sb.setLength(0);
+ }
+ }
+ finally
+ {
+ IOUtilFunctions.closeSilently(br);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/02aadb80/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
index 065a11b..2af9a60 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
@@ -19,11 +19,18 @@
package org.apache.sysml.runtime.io;
+import java.io.BufferedWriter;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.MapReduceTool;
public class FrameWriterTextCell extends FrameWriter
{
@@ -36,9 +43,88 @@ public class FrameWriterTextCell extends FrameWriter
* @throws DMLUnsupportedOperationException
*/
@Override
- public void writeFrameToHDFS( FrameBlock src, String fname )
+ public void writeFrameToHDFS( FrameBlock src, String fname, long rlen, long clen )
throws IOException, DMLRuntimeException, DMLUnsupportedOperationException
{
+ //validity check frame dimensions
+ if( src.getNumRows() != rlen || src.getNumColumns() != clen ) {
+ throw new IOException("Frame dimensions mismatch with metadata: "+src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+".");
+ }
+
+ //prepare file access
+ JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
+ Path path = new Path( fname );
+
+ //if the file already exists on HDFS, remove it.
+ MapReduceTool.deleteFileIfExistOnHDFS( fname );
+
+ //core write
+ writeTextCellFrameToHDFS(path, job, src, src.getNumRows(), src.getNumColumns());
}
+ /**
+ *
+ * @param path
+ * @param job
+ * @param src
+ * @param rlen
+ * @param clen
+ * @param brlen
+ * @param bclen
+ * @throws IOException
+ */
+ protected void writeTextCellFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen )
+ throws IOException
+ {
+ boolean entriesWritten = false;
+ FileSystem fs = FileSystem.get(job);
+ BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
+
+ int rows = src.getNumRows();
+ int cols = src.getNumColumns();
+
+ //bound check per block
+ if( rows > rlen || cols > clen )
+ {
+ throw new IOException("Frame block [1:"+rows+",1:"+cols+"] " +
+ "out of overall frame range [1:"+rlen+",1:"+clen+"].");
+ }
+
+ try
+ {
+ //for obj reuse and preventing repeated buffer re-allocations
+ StringBuilder sb = new StringBuilder();
+
+ for( int i=0; i<rows; i++ )
+ {
+ String rowIndex = Integer.toString(i+1);
+ for( int j=0; j<cols; j++ )
+ {
+ if(src.get(i, j) != null)
+ {
+ String lvalue = src.get(i, j).toString();
+ sb.append(rowIndex);
+ sb.append(' ');
+ sb.append( j+1 );
+ sb.append(' ');
+ sb.append( lvalue );
+ sb.append('\n');
+ br.write( sb.toString() ); //same as append
+ sb.setLength(0);
+ entriesWritten = true;
+ }
+ }
+ }
+
+ //handle empty result
+ if ( !entriesWritten ) {
+ br.write("1 1 0\n");
+ }
+ }
+ finally
+ {
+ IOUtilFunctions.closeSilently(br);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/02aadb80/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index cad788e..fe66e55 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -323,7 +323,7 @@ public class UtilFunctions
switch( vt ) {
case STRING: return in;
case BOOLEAN: return Boolean.parseBoolean(in);
- case INT: return Integer.parseInt(in);
+ case INT: return Long.parseLong(in);
case DOUBLE: return Double.parseDouble(in);
default: throw new RuntimeException("Unsupported value type: "+vt);
}