You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ac...@apache.org on 2016/03/27 00:12:14 UTC

incubator-systemml git commit: [SYSTEMML-573] Frame write

Repository: incubator-systemml
Updated Branches:
  refs/heads/master d40081527 -> 02aadb80a


[SYSTEMML-573] Frame write


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/02aadb80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/02aadb80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/02aadb80

Branch: refs/heads/master
Commit: 02aadb80a9f7cba346d69c08e4ca2782d5fccadf
Parents: d400815
Author: Arvind Surve <ac...@yahoo.com>
Authored: Sat Mar 26 16:11:51 2016 -0700
Committer: Arvind Surve <ac...@yahoo.com>
Committed: Sat Mar 26 16:11:51 2016 -0700

----------------------------------------------------------------------
 .../apache/sysml/runtime/io/FrameWriter.java    |  11 +-
 .../runtime/io/FrameWriterBinaryBlock.java      | 104 ++++++++++++++++++-
 .../sysml/runtime/io/FrameWriterTextCSV.java    |  99 +++++++++++++++++-
 .../sysml/runtime/io/FrameWriterTextCell.java   |  88 +++++++++++++++-
 .../sysml/runtime/util/UtilFunctions.java       |   2 +-
 5 files changed, 296 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/02aadb80/src/main/java/org/apache/sysml/runtime/io/FrameWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriter.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriter.java
index 0dc528a..7f15c25 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriter.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriter.java
@@ -28,7 +28,7 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 
 /**
- * Base class for all format-specific matrix writers. Every writer is required to implement the basic 
+ * Base class for all format-specific frame writers. Every writer is required to implement the basic 
  * write functionality but might provide additional custom functionality. Any non-default parameters
  * (e.g., CSV read properties) should be passed into custom constructors. There is also a factory
  * for creating format-specific writers. 
@@ -41,12 +41,14 @@ public abstract class FrameWriter
 	 * 
 	 * @param src
 	 * @param fname
+	 * @param rlen
+	 * @param clen
 	 * @return
 	 * @throws IOException
 	 * @throws DMLUnsupportedOperationException 
 	 * @throws DMLRuntimeException 
 	 */
-	public abstract void writeFrameToHDFS( FrameBlock src, String fname )
+	public abstract void writeFrameToHDFS( FrameBlock src, String fname, long rlen, long clen )
 		throws IOException, DMLRuntimeException, DMLUnsupportedOperationException;
 	
 	/**
@@ -56,12 +58,13 @@ public abstract class FrameWriter
 	 * @return
 	 * @throws DMLRuntimeException 
 	 */
-	public static FrameBlock[] createFrameBlocksForReuse( List<ValueType> schema, List<String> names ) 
+	public static FrameBlock[] createFrameBlocksForReuse( List<ValueType> schema, List<String> names, long rlen ) 
 		throws DMLRuntimeException
 	{
 		FrameBlock frameBlock[] = new FrameBlock[1];
 		frameBlock[0] = new FrameBlock(schema, names);
 
+		frameBlock[0].ensureAllocatedColumns((int)rlen);
 		return frameBlock;
 	}
 	
@@ -70,7 +73,7 @@ public abstract class FrameWriter
 	 * @param blocks
 	 * @return
 	 */
-	public static FrameBlock getMatrixBlockForReuse( FrameBlock[] blocks) //TODO do we need this function?
+	public static FrameBlock getFrameBlockForReuse( FrameBlock[] blocks) //TODO do we need this function?
 	{
 		return blocks[ 0 ];
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/02aadb80/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
index 46e12fa..88dc8ca 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
@@ -21,24 +21,126 @@ package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.util.MapReduceTool;
+
+
+/*
+ * This write uses fixed size blocks with block-encoded keys.
+ * 
+ * 
+ */
 
 public class FrameWriterBinaryBlock extends FrameWriter
 {
 	/**
 	 * @param src
 	 * @param fname
+	 * @param rlen
+	 * @param clen
 	 * @return
 	 * @throws IOException 
 	 * @throws DMLRuntimeException 
 	 * @throws DMLUnsupportedOperationException 
 	 */
 	@Override
-	public void writeFrameToHDFS( FrameBlock src, String fname )
+	public void writeFrameToHDFS( FrameBlock src, String fname, long rlen, long clen )
 		throws IOException, DMLRuntimeException, DMLUnsupportedOperationException 
 	{
+		//prepare file access
+		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
+		Path path = new Path( fname );
+
+		//if the file already exists on HDFS, remove it.
+		MapReduceTool.deleteFileIfExistOnHDFS( fname );
+			
+		//core write
+		writeBinaryBlockFrameToHDFS(path, job, src, rlen, clen);
 	}
 
+	/**
+	 * 
+	 * @param path
+	 * @param job
+	 * @param src
+	 * @param rlen
+	 * @param clen
+	 * @return 
+	 * @throws IOException
+	 * @throws DMLUnsupportedOperationException 
+	 * @throws DMLRuntimeException 
+	 */
+	@SuppressWarnings("deprecation")
+	protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen )
+		throws IOException, DMLRuntimeException, DMLUnsupportedOperationException
+	{
+		FileSystem fs = FileSystem.get(job);
+		int brlen = ConfigurationManager.getBlocksize();
+		int bclen = ConfigurationManager.getBlocksize();
+
+		// 1) create sequence file writer 
+		SequenceFile.Writer writer = null;
+		writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class);
+		
+		try
+		{
+			// 2) bound check for src block
+			if( src.getNumRows() > rlen || src.getNumColumns() > clen )
+			{
+				throw new IOException("Frame block [1:"+src.getNumRows()+",1:"+src.getNumColumns()+"] " +
+						              "out of overall frame range [1:"+rlen+",1:"+clen+"].");
+			}
+		
+			//3) reblock and write
+			MatrixIndexes indexes = new MatrixIndexes();
+
+			if( rlen <= brlen && clen <= bclen ) //opt for single block
+			{
+				//directly write single block
+				indexes.setIndexes(1, 1);
+				writer.append(indexes, src);
+			}
+			else //general case
+			{
+				//initialize blocks for reuse (at most 4 different blocks required)
+				FrameBlock[] blocks = createFrameBlocksForReuse(src.getSchema(), src.getColumnNames(), rlen);  
+				
+				//create and write subblocks of frame
+				for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++)
+					for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++)
+					{
+						int maxRow = (blockRow*brlen + brlen < src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen;
+						int maxCol = (blockCol*bclen + bclen < src.getNumColumns()) ? bclen : src.getNumColumns() - blockCol*bclen;
+				
+						int row_offset = blockRow*brlen;
+						int col_offset = blockCol*bclen;
+						
+						//get reuse frame block
+						FrameBlock block = getFrameBlockForReuse(blocks);
+	
+						//copy subpart to block
+						src.sliceOperations( row_offset, row_offset+maxRow-1, 
+								             col_offset, col_offset+maxCol-1, block );
+						
+						//append block to sequence file
+						indexes.setIndexes(blockRow+1, blockCol+1);
+						writer.append(indexes, block);
+					}
+			}
+		
+		}
+		finally
+		{
+			IOUtilFunctions.closeSilently(writer);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/02aadb80/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
index f118b43..650778d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
@@ -19,12 +19,19 @@
 
 package org.apache.sysml.runtime.io;
 
+import java.io.BufferedWriter;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.MapReduceTool;
 
 /**
  * 
@@ -44,15 +51,105 @@ public class FrameWriterTextCSV extends FrameWriter
 	/**
 	 * @param src
 	 * @param fname
+	 * @param rlen
+	 * @param clen
 	 * @return
 	 * @throws IOException 
 	 * @throws DMLRuntimeException 
 	 * @throws DMLUnsupportedOperationException 
 	 */
 	@Override
-	public void writeFrameToHDFS( FrameBlock src, String fname )
+	public void writeFrameToHDFS(FrameBlock src, String fname, long rlen, long clen) 
 		throws IOException, DMLRuntimeException, DMLUnsupportedOperationException 
 	{
+		//validity check frame dimensions
+		if( src.getNumRows() != rlen || src.getNumColumns() != clen ) {
+			throw new IOException("Frame dimensions mismatch with metadata: "+src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+".");
+		}
+		
+		//prepare file access
+		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
+		Path path = new Path( fname );
+
+		//if the file already exists on HDFS, remove it.
+		MapReduceTool.deleteFileIfExistOnHDFS( fname );
+			
+		//core write
+		writeCSVFrameToHDFS(path, job, src, rlen, clen, _props);
+	}
+
+	/**
+	 * 
+	 * @param path
+	 * @param job
+	 * @param src
+	 * @param rlen
+	 * @param clen
+	 * @param props
+	 * @return
+	 * @throws IOException
+	 */
+	protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen, CSVFileFormatProperties props )
+		throws IOException
+	{
+		FileSystem fs = FileSystem.get(job);
+        BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));		
+		
+		try
+		{
+			//for obj reuse and preventing repeated buffer re-allocations
+			StringBuilder sb = new StringBuilder();
+			
+			props = (props==null)? new CSVFileFormatProperties() : props;
+			String delim = props.getDelim();
+			
+			// Write header line, if needed
+			if( props.hasHeader() ) 
+			{
+				//write row chunk-wise to prevent OOM on large number of columns
+				for( int bj=0; bj<clen; bj+=BLOCKSIZE_J )
+				{
+					for( int j=bj; j < Math.min(clen,bj+BLOCKSIZE_J); j++) 
+					{
+						sb.append("C"+ (j+1));
+						if ( j < clen-1 )
+							sb.append(delim);
+					}
+					br.write( sb.toString() );
+		            sb.setLength(0);	
+				}
+				sb.append('\n');
+				br.write( sb.toString() );
+	            sb.setLength(0);
+			}
+			
+			// Write data lines
+			for( int i=0; i<rlen; i++ ) 
+			{
+				//write row chunk-wise to prevent OOM on large number of columns
+				for( int bj=0; bj<clen; bj+=BLOCKSIZE_J )
+				{
+					for( int j=bj; j<Math.min(clen,bj+BLOCKSIZE_J); j++ )
+					{
+						if(src.get(i, j) != null)
+							sb.append(src.get(i, j).toString());
+						
+						if( j != clen-1 )
+							sb.append(delim);
+					}
+					br.write( sb.toString() );
+		            sb.setLength(0);
+				}
+				
+				sb.append('\n');
+				br.write( sb.toString() ); //same as append
+				sb.setLength(0); 
+			}
+		}
+		finally
+		{
+			IOUtilFunctions.closeSilently(br);
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/02aadb80/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
index 065a11b..2af9a60 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
@@ -19,11 +19,18 @@
 
 package org.apache.sysml.runtime.io;
 
+import java.io.BufferedWriter;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.MapReduceTool;
 
 public class FrameWriterTextCell extends FrameWriter
 {
@@ -36,9 +43,88 @@ public class FrameWriterTextCell extends FrameWriter
 	 * @throws DMLUnsupportedOperationException 
 	 */
 	@Override
-	public void writeFrameToHDFS( FrameBlock src, String fname )
+	public void writeFrameToHDFS( FrameBlock src, String fname, long rlen, long clen )
 		throws IOException, DMLRuntimeException, DMLUnsupportedOperationException 
 	{
+		//validity check frame dimensions
+		if( src.getNumRows() != rlen || src.getNumColumns() != clen ) {
+			throw new IOException("Frame dimensions mismatch with metadata: "+src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+".");
+		}
+				
+		//prepare file access
+		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
+		Path path = new Path( fname );
+
+		//if the file already exists on HDFS, remove it.
+		MapReduceTool.deleteFileIfExistOnHDFS( fname );
+			
+		//core write
+		writeTextCellFrameToHDFS(path, job, src, src.getNumRows(), src.getNumColumns());
 	}
 
+	/**
+	 * 
+	 * @param path
+	 * @param job
+	 * @param src
+	 * @param rlen
+	 * @param clen
+	 * @param brlen
+	 * @param bclen
+	 * @throws IOException
+	 */
+	protected void writeTextCellFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen )
+		throws IOException
+	{
+		boolean entriesWritten = false;
+		FileSystem fs = FileSystem.get(job);
+        BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));		
+		
+    	int rows = src.getNumRows();
+		int cols = src.getNumColumns();
+
+		//bound check per block
+		if( rows > rlen || cols > clen )
+		{
+			throw new IOException("Frame block [1:"+rows+",1:"+cols+"] " +
+					              "out of overall frame range [1:"+rlen+",1:"+clen+"].");
+		}
+		
+		try
+		{
+			//for obj reuse and preventing repeated buffer re-allocations
+			StringBuilder sb = new StringBuilder();
+			
+			for( int i=0; i<rows; i++ )
+			{
+				String rowIndex = Integer.toString(i+1);					
+				for( int j=0; j<cols; j++ )
+				{
+					if(src.get(i, j) != null)
+					{
+						String lvalue = src.get(i, j).toString();
+						sb.append(rowIndex);
+						sb.append(' ');
+						sb.append( j+1 );
+						sb.append(' ');
+						sb.append( lvalue );
+						sb.append('\n');
+						br.write( sb.toString() ); //same as append
+						sb.setLength(0); 
+						entriesWritten = true;
+					}
+				}
+			}
+	
+			//handle empty result
+			if ( !entriesWritten ) {
+				br.write("1 1 0\n");
+			}
+		}
+		finally
+		{
+			IOUtilFunctions.closeSilently(br);
+		}
+	}	
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/02aadb80/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index cad788e..fe66e55 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -323,7 +323,7 @@ public class UtilFunctions
 		switch( vt ) {
 			case STRING:  return in;
 			case BOOLEAN: return Boolean.parseBoolean(in);
-			case INT:     return Integer.parseInt(in);
+			case INT:     return Long.parseLong(in);
 			case DOUBLE:  return Double.parseDouble(in);
 			default: throw new RuntimeException("Unsupported value type: "+vt);
 		}