You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ac...@apache.org on 2016/03/23 09:36:58 UTC

incubator-systemml git commit: [SYSTEMML-573] Frame IO Read

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 2e032f707 -> c1f25ef6c


[SYSTEMML-573] Frame IO Read


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c1f25ef6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c1f25ef6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c1f25ef6

Branch: refs/heads/master
Commit: c1f25ef6c40c73475e20b2af2841a2c044a8901c
Parents: 2e032f7
Author: Arvind Surve <ac...@yahoo.com>
Authored: Wed Mar 23 01:36:36 2016 -0700
Committer: Arvind Surve <ac...@yahoo.com>
Committed: Wed Mar 23 01:36:36 2016 -0700

----------------------------------------------------------------------
 .../apache/sysml/runtime/io/FrameReader.java    |   4 +-
 .../runtime/io/FrameReaderBinaryBlock.java      |  87 +++++++-
 .../sysml/runtime/io/FrameReaderTextCSV.java    | 149 ++++++++++++++
 .../sysml/runtime/io/FrameReaderTextCell.java   | 198 ++++++++++++++++++-
 .../sysml/runtime/matrix/data/FrameBlock.java   |  40 ++++
 .../sysml/runtime/util/UtilFunctions.java       |  18 ++
 .../functions/frame/FrameCopyTest.java          | 181 +++++++++++++++++
 7 files changed, 673 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index 023c0ab..a39d129 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -150,7 +150,7 @@ public abstract class FrameReader
 	 * @throws DMLRuntimeException 
 	 * @throws IOException 
 	 */
-	protected static FrameBlock createOutputFrameBlock(List<ValueType> schema, List<String> names)
+	protected static FrameBlock createOutputFrameBlock(List<ValueType> schema, List<String> names, long nrow)
 		throws IOException, DMLRuntimeException
 	{
 		//check schema and column names
@@ -159,7 +159,7 @@ public abstract class FrameReader
 		
 		//prepare result frame block
 		FrameBlock ret = new FrameBlock(schema, names);
-		
+		ret.ensureAllocatedColumns((int)nrow);
 		return ret;
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
index 6769d21..62c7e6c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
@@ -22,9 +22,17 @@ package org.apache.sysml.runtime.io;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+
+
 
 public class FrameReaderBinaryBlock extends FrameReader
 {
@@ -45,9 +53,86 @@ public class FrameReaderBinaryBlock extends FrameReader
 			throws IOException, DMLRuntimeException 
 	{
 		//allocate output frame block
-		FrameBlock ret = null;
+		FrameBlock ret = createOutputFrameBlock(schema, names, rlen);
+		
+		//prepare file access
+		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());	
+		FileSystem fs = FileSystem.get(job);
+		Path path = new Path( fname ); 
+		
+		//check existence and non-empty file
+		checkValidInputFile(fs, path); 
+	
+		//core read 
+		readBinaryBlockFrameFromHDFS(path, job, fs, ret, rlen, clen);
 		
 		return ret;
 	}
+	
+	/**
+	 * Note: For efficiency, we directly use SequenceFile.Reader instead of SequenceFileInputFormat-
+	 * InputSplits-RecordReader (SequenceFileRecordReader). First, this has no drawbacks since the
+	 * SequenceFileRecordReader internally uses SequenceFile.Reader as well. Second, it is 
+	 * advantageous if the actual sequence files are larger than the file splits created by   
+	 * informat.getSplits (which is usually aligned to the HDFS block size) because then there is 
+	 * overhead for finding the actual split between our 1k-1k blocks. This case happens
+	 * if the read frame was create by CP or when jobs directly write to large output files 
+	 * (e.g., parfor matrix partitioning).
+	 * 
+	 * @param path
+	 * @param job
+	 * @param fs 
+	 * @param dest
+	 * @param rlen
+	 * @param clen
+	 * 
+	 * @throws IOException
+	 * @throws DMLRuntimeException 
+	 */
+	@SuppressWarnings("deprecation")
+	private static void readBinaryBlockFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, long rlen, long clen )
+		throws IOException, DMLRuntimeException
+	{
+		MatrixIndexes key = new MatrixIndexes(); 
+		FrameBlock value = new FrameBlock();
+		
+		for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files 
+		{
+			//directly read from sequence files (individual partfiles)
+			SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
+			
+			try
+			{
+				//note: next(key, value) does not yet exploit the given serialization classes, record reader does but is generally slower.
+				while( reader.next(key, value) )
+				{	
+					int row_offset = (int)(key.getRowIndex()-1)*ConfigurationManager.getBlocksize();
+					int col_offset = (int)(key.getColumnIndex()-1)*ConfigurationManager.getBlocksize();
+					
+					int rows = value.getNumRows();
+					int cols = value.getNumColumns();
+					
+					//bound check per block
+					if( row_offset + rows < 0 || row_offset + rows > rlen || col_offset + cols<0 || col_offset + cols > clen )
+					{
+						throw new IOException("Frame block ["+(row_offset+1)+":"+(row_offset+rows)+","+(col_offset+1)+":"+(col_offset+cols)+"] " +
+								              "out of overall frame range [1:"+rlen+",1:"+clen+"].");
+					}
+			
+					{
+						dest.copy( row_offset, row_offset+rows-1, 
+								   col_offset, col_offset+cols-1,
+								   value, 
+								   dest.getNumRows(), dest.getNumColumns());
+					}
+				}
+			}
+			finally
+			{
+				IOUtilFunctions.closeSilently(reader);
+			}
+		}
+		
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
index b364916..2086ea3 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
@@ -19,13 +19,24 @@
 
 package org.apache.sysml.runtime.io;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.CSVReblockMR;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class FrameReaderTextCSV extends FrameReader
 {
@@ -56,8 +67,146 @@ public class FrameReaderTextCSV extends FrameReader
 	{
 		//allocate output frame block
 		FrameBlock ret = null;
+		if( rlen>0 && clen>0 ) //otherwise CSV reblock based on file size for frame w/ unknown dimensions
+			ret = createOutputFrameBlock(schema, names, rlen);
+		
+		//prepare file access
+		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());	
+		FileSystem fs = FileSystem.get(job);
+		Path path = new Path( fname );
+		
+		//check existence and non-empty file
+		checkValidInputFile(fs, path); 
+	
+		//core read 
+		ret = readCSVFrameFromHDFS(path, job, fs, ret, schema, names, rlen, clen,  
+				   _props.hasHeader(), _props.getDelim(), _props.isFill() );
 		
 		return ret;
 	}
 	
+	/**
+	 * 
+	 * @param path
+	 * @param job
+	 * @param fs
+	 * @param dest
+	 * @param rlen
+	 * @param clen
+	 * @param hasHeader
+	 * @param delim
+	 * @param fill
+	 * @return
+	 * @throws IOException
+	 */
+	@SuppressWarnings("unchecked")
+	private FrameBlock readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, 
+			List<ValueType> schema, List<String> names, long rlen, long clen, boolean hasHeader, String delim, boolean fill)
+		throws IOException
+	{
+		ArrayList<Path> files=new ArrayList<Path>();
+		if(fs.isDirectory(path)) {
+			for(FileStatus stat: fs.listStatus(path, CSVReblockMR.hiddenFileFilter))
+				files.add(stat.getPath());
+			Collections.sort(files);
+		}
+		else
+			files.add(path);
+		
+		if ( dest == null ) {
+			dest = computeCSVSize(files, fs, schema, names, hasHeader, delim);
+			clen = dest.getNumColumns();
+		}
+		
+		/////////////////////////////////////////
+		String value = null;
+		int row = 0;
+		int col = -1;
+		
+		for(int fileNo=0; fileNo<files.size(); fileNo++)
+		{
+			BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo))));
+			if(fileNo==0 && hasHeader ) 
+				br.readLine(); //ignore header
+			
+			// Read the data
+			boolean emptyValuesFound = false;
+			try
+			{
+				while( (value=br.readLine())!=null ) //foreach line
+				{
+					String cellStr = value.toString().trim();
+					emptyValuesFound = false;
+					String[] parts = IOUtilFunctions.split(cellStr, delim);
+					col = 0;
+					
+					for( String part : parts ) //foreach cell
+					{
+						part = part.trim();
+						if ( part.isEmpty() ) {
+							//TODO: Do we need to handle empty cell condition?
+							emptyValuesFound = true;
+						}
+						else {
+							dest.set(row, col, UtilFunctions.stringToObject(schema.get(col), part));
+						}
+						col++;
+					}
+					
+					//sanity checks for empty values and number of columns
+					IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound);
+					IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, parts, clen);
+					row++;
+				}
+			}
+			finally {
+				IOUtilFunctions.closeSilently(br);
+			}
+		}
+		
+		return dest;
+	}
+	
+	/**
+	 * 
+	 * @param files
+	 * @param fs
+	 * @param schema
+	 * @param names
+	 * @param hasHeader
+	 * @param delim
+	 * @return
+	 * @throws IOException
+	 */
+	private FrameBlock computeCSVSize ( List<Path> files, FileSystem fs, List<ValueType> schema, List<String> names, boolean hasHeader, String delim) 
+		throws IOException 
+	{		
+		int nrow = 0;
+		for(int fileNo=0; fileNo<files.size(); fileNo++)
+		{
+			BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo))));	
+			try
+			{
+				// Read the header line, if there is one.
+				if(fileNo==0)
+				{
+					if ( hasHeader ) 
+						br.readLine(); //ignore header
+				}
+				
+				while ( br.readLine() != null ) {
+					nrow++;
+				}
+			}
+			finally {
+				IOUtilFunctions.closeSilently(br);
+			}
+		}
+		
+		//create new frame block
+		FrameBlock frameBlock = new FrameBlock(schema, names);
+		frameBlock.ensureAllocatedColumns(nrow);
+		return frameBlock;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
index 5488f94..91b7892 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
@@ -19,12 +19,28 @@
 
 package org.apache.sysml.runtime.io;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.List;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.FastStringTokenizer;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class FrameReaderTextCell extends FrameReader
 {
@@ -46,9 +62,189 @@ public class FrameReaderTextCell extends FrameReader
 			throws IOException, DMLRuntimeException
 	{
 		//allocate output frame block
-		FrameBlock ret = null;
+		FrameBlock ret = createOutputFrameBlock(schema, names, rlen);
+		
+		//prepare file access
+		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());	
+		FileSystem fs = FileSystem.get(job);
+		Path path = new Path( fname );
+		
+		//check existence and non-empty file
+		checkValidInputFile(fs, path); 
+	
+		//core read 
+		if( fs.isDirectory(path) )
+			readTextCellFrameFromHDFS(path, job, ret, schema, names, rlen, clen);
+		else
+			readRawTextCellFrameFromHDFS(path, job, fs, ret, schema, names, rlen, clen);
+		
+		return ret;
+	}
+
+	/**
+	 * 
+	 * @param is
+	 * @param schema
+	 * @param names
+	 * @param rlen
+	 * @param clen
+	 * @return
+	 * @throws DMLRuntimeException 
+	 * @throws IOException 
+	 */
+	public FrameBlock readFrameFromInputStream(InputStream is, List<ValueType> schema, List<String> names, long rlen, long clen) 
+			throws IOException, DMLRuntimeException 
+	{
+		//allocate output frame block
+		FrameBlock ret = createOutputFrameBlock(schema, names, rlen);
+	
+		//core read 
+		readRawTextCellFrameFromInputStream(is, ret, schema, names, rlen, clen);
 		
 		return ret;
 	}
+	
+
+	/**
+	 * 
+	 * @param path
+	 * @param job
+	 * @param dest
+	 * @param schema
+	 * @param names
+	 * @param rlen
+	 * @param clen
+	 * @return
+	 * @throws IOException
+	 */
+	private void readTextCellFrameFromHDFS( Path path, JobConf job, FrameBlock dest, 
+			List<ValueType> schema, List<String> names, long rlen, long clen)
+		throws IOException
+	{
+		FileInputFormat.addInputPath(job, path);
+		TextInputFormat informat = new TextInputFormat();
+		informat.configure(job);
+		InputSplit[] splits = informat.getSplits(job, 1);
+		
+		LongWritable key = new LongWritable();
+		Text value = new Text();
+		int row = -1;
+		int col = -1;
+		
+		try
+		{
+			FastStringTokenizer st = new FastStringTokenizer(' ');
+			
+			for(InputSplit split: splits)
+			{
+				RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
+			
+				try
+				{
+					while( reader.next(key, value) )
+					{
+						st.reset( value.toString() ); //reinit tokenizer
+						row = st.nextInt()-1;
+						col = st.nextInt()-1;
+						dest.set(row, col, UtilFunctions.stringToObject(schema.get(col), st.nextToken()));
+					}
+				}
+				finally
+				{
+					if( reader != null )
+						reader.close();
+				}
+			}
+		}
+		catch(Exception ex)
+		{
+			//post-mortem error handling and bounds checking
+			if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > clen )
+			{
+				throw new IOException("Frame cell ["+(row+1)+","+(col+1)+"] " +
+									  "out of overall frame range [1:"+rlen+",1:"+clen+"].");
+			}
+			else
+			{
+				throw new IOException( "Unable to read frame in text cell format.", ex );
+			}
+		}
+	}
+
+	
+	/**
+	 * 
+	 * @param path
+	 * @param job
+	 * @param fs
+	 * @param dest
+	 * @param schema
+	 * @param names
+	 * @param rlen
+	 * @param clen
+	 * @return
+	 * @throws IOException
+	 */
+	private void readRawTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, 
+			List<ValueType> schema, List<String> names, long rlen, long clen)
+		throws IOException
+	{
+		//create input stream for path
+		InputStream inputStream = fs.open(path);
+		
+		//actual read
+		readRawTextCellFrameFromInputStream(inputStream, dest, schema, names, rlen, clen);
+	}
+	
+	/**
+	 * 
+	 * @param is
+	 * @param dest
+	 * @param schema
+	 * @param names
+	 * @param rlen
+	 * @param clen
+	 * @return
+	 * @throws IOException
+	 */
+	private void readRawTextCellFrameFromInputStream( InputStream is, FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long clen)
+			throws IOException
+	{
+		BufferedReader br = new BufferedReader(new InputStreamReader( is ));	
+		
+		String value = null;
+		int row = -1;
+		int col = -1;
+		
+		try
+		{			
+			FastStringTokenizer st = new FastStringTokenizer(' ');
+			
+			while( (value=br.readLine())!=null )
+			{
+				st.reset( value ); //reinit tokenizer
+				row = st.nextInt()-1;
+				col = st.nextInt()-1;	
+				dest.set(row, col, UtilFunctions.stringToObject(schema.get(col), st.nextToken()));
+			}
+		}
+		catch(Exception ex)
+		{
+			//post-mortem error handling and bounds checking
+			if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > clen ) 
+			{
+				throw new IOException("Frame cell ["+(row+1)+","+(col+1)+"] " +
+									  "out of overall frame range [1:"+rlen+",1:"+clen+"].", ex);
+			}
+			else
+			{
+				throw new IOException( "Unable to read frame in raw text cell format.", ex );
+			}
+		}
+		finally
+		{
+			IOUtilFunctions.closeSilently(br);
+		}
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index bd786d7..4dbd398 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -65,6 +65,11 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		_coldata = new ArrayList<Array>();
 	}
 	
+	public FrameBlock(FrameBlock that) {
+		this(that.getSchema());
+		copy(that);
+	}
+	
 	public FrameBlock(int ncols, ValueType vt) {
 		this();
 		_schema.addAll(Collections.nCopies(ncols, vt));
@@ -533,6 +538,28 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		return ret;
 	}
 	
+	public void copy(FrameBlock src)
+	{
+		//allocate 
+		ensureAllocatedColumns(src.getNumRows());
+		
+		//actual copy 
+		for( int i=0; i<src.getNumColumns(); i++ )
+			_coldata.get(i).set(0, src.getNumRows()-1, src._coldata.get(i));
+	}
+
+	
+	public void copy(int rl, int ru, int cl, int cu, FrameBlock src, int rlen, int clen) 
+		throws DMLRuntimeException
+	{
+		ensureAllocatedColumns(rlen);
+		
+		//copy values
+		for( int i=cl; i<cu; i++ )
+			_coldata.get(i).set(rl, ru-rl-1, src._coldata.get(i));
+	}
+		
+
 	
 	///////
 	// row iterators (over strings and boxed objects)
@@ -616,6 +643,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		public abstract T get(int index);
 		public abstract void set(int index, T value);
 		public abstract void set(int rl, int ru, Array value);
+		public abstract void set(int rl, int ru, Array value, int rlSrc);
 		public abstract void append(String value);
 		public abstract void append(T value);
 		public abstract Array clone();
@@ -641,6 +669,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		public void set(int rl, int ru, Array value) {
 			System.arraycopy(((StringArray)value)._data, 0, _data, rl, ru-rl+1);
 		}
+		public void set(int rl, int ru, Array value, int rlSrc) {
+			System.arraycopy(((StringArray)value)._data, rlSrc, _data, rl, ru-rl+1);
+		}
 		public void append(String value) {
 			if( _data.length <= _size )
 				_data = Arrays.copyOf(_data, newSize());
@@ -684,6 +715,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		public void set(int rl, int ru, Array value) {
 			System.arraycopy(((BooleanArray)value)._data, 0, _data, rl, ru-rl+1);
 		}
+		public void set(int rl, int ru, Array value, int rlSrc) {
+			System.arraycopy(((BooleanArray)value)._data, rlSrc, _data, rl, ru-rl+1);
+		}
 		public void append(String value) {
 			append(Boolean.parseBoolean(value));
 		}
@@ -728,6 +762,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		public void set(int rl, int ru, Array value) {
 			System.arraycopy(((LongArray)value)._data, 0, _data, rl, ru-rl+1);
 		}
+		public void set(int rl, int ru, Array value, int rlSrc) {
+			System.arraycopy(((LongArray)value)._data, rlSrc, _data, rl, ru-rl+1);
+		}
 		public void append(String value) {
 			append((value!=null)?Long.parseLong(value):null);
 		}
@@ -772,6 +809,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		public void set(int rl, int ru, Array value) {
 			System.arraycopy(((DoubleArray)value)._data, 0, _data, rl, ru-rl+1);
 		}
+		public void set(int rl, int ru, Array value, int rlSrc) {
+			System.arraycopy(((DoubleArray)value)._data, rlSrc, _data, rl, ru-rl+1);
+		}
 		public void append(String value) {
 			append((value!=null)?Double.parseDouble(value):null);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 853915c..6801f3b 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -346,6 +346,24 @@ public class UtilFunctions
 		}
 	}
 	
+	/**
+	 * 
+	 * @param vt
+	 * @param in1
+	 * @param in2
+	 * @return
+	 */
+	public static int compareTo(ValueType vt, Object in1, Object in2) {
+		switch( vt ) {
+		
+			case STRING:  return ((String)in1).compareTo((String)in2);
+			case BOOLEAN: return ((Boolean)in1).compareTo((Boolean)in2);
+			case INT:     return ((Long)in1).compareTo((Long)in2);
+			case DOUBLE:  return ((Double)in1).compareTo((Double)in2);
+			default: throw new RuntimeException("Unsupported value type: "+vt);
+		}
+	}
+	
 	public static boolean isIntegerNumber( String str )
 	{
 		byte[] c = str.getBytes();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
new file mode 100644
index 0000000..f475026
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.frame;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.instructions.cp.AppendCPInstruction.AppendType;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FrameCopyTest extends AutomatedTestBase
+{
+	private final static int rows = 1593;
+	private final static ValueType[] schemaStrings = new ValueType[]{ValueType.STRING, ValueType.STRING, ValueType.STRING};	
+	private final static ValueType[] schemaMixed = new ValueType[]{ValueType.STRING, ValueType.DOUBLE, ValueType.INT, ValueType.BOOLEAN};	
+	
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+	}
+
+	@Test
+	public void testFrameStringsStringsCBind()  {
+		runFrameCopyTest(schemaStrings, schemaStrings, AppendType.CBIND);
+	}
+	
+	@Test
+	public void testFrameStringsStringsRBind()  { //note: ncol(A)=ncol(B)
+		runFrameCopyTest(schemaStrings, schemaStrings, AppendType.RBIND);
+	}
+	
+	@Test
+	public void testFrameMixedStringsCBind()  {
+		runFrameCopyTest(schemaMixed, schemaStrings, AppendType.CBIND);
+	}
+	
+	@Test
+	public void testFrameStringsMixedCBind()  {
+		runFrameCopyTest(schemaStrings, schemaMixed, AppendType.CBIND);
+	}
+	
+	@Test
+	public void testFrameMixedMixedCBind()  {
+		runFrameCopyTest(schemaMixed, schemaMixed, AppendType.CBIND);
+	}
+	
+	@Test
+	public void testFrameMixedMixedRBind()  { //note: ncol(A)=ncol(B)
+		runFrameCopyTest(schemaMixed, schemaMixed, AppendType.RBIND);
+	}
+
+	
+	/**
+	 * 
+	 * @param sparseM1
+	 * @param sparseM2
+	 * @param instType
+	 */
+	private void runFrameCopyTest( ValueType[] schema1, ValueType[] schema2, AppendType atype)
+	{
+		try
+		{
+			//data generation
+			double[][] A = getRandomMatrix(rows, schema1.length, -10, 10, 0.9, 2373); 
+			double[][] B = getRandomMatrix(rows, schema2.length, -10, 10, 0.9, 129); 
+			
+			//init data frame 1
+			List<ValueType> lschema1 = Arrays.asList(schema1);
+			FrameBlock frame1 = new FrameBlock(lschema1);
+			Object[] row1 = new Object[lschema1.size()];
+			for( int i=0; i<rows; i++ ) {
+				for( int j=0; j<lschema1.size(); j++ )
+					A[i][j] = UtilFunctions.objectToDouble(lschema1.get(j), 
+							row1[j] = UtilFunctions.doubleToObject(lschema1.get(j), A[i][j]));
+				frame1.appendRow(row1);
+			}
+			
+			//init data frame 2
+			List<ValueType> lschema2 = Arrays.asList(schema2);
+			FrameBlock frame2 = new FrameBlock(lschema2);
+			Object[] row2 = new Object[lschema2.size()];
+			for( int i=0; i<rows; i++ ) {
+				for( int j=0; j<lschema2.size(); j++ )
+					B[i][j] = UtilFunctions.objectToDouble(lschema2.get(j), 
+							row2[j] = UtilFunctions.doubleToObject(lschema2.get(j), B[i][j]));
+				frame2.appendRow(row2);
+			}
+			
+			//copy from one frame to another.
+			FrameBlock frame1Backup = new FrameBlock(frame1.getSchema(), frame1.getColumnNames());
+			frame1Backup.copy(frame1);
+			
+			FrameBlock frame2Backup = new FrameBlock(frame2.getSchema(), frame2.getColumnNames());
+			frame2Backup.copy(frame2);
+			
+			// Verify copied data.
+			List<ValueType> lschema = frame1.getSchema();
+			for ( int i=0; i<frame1.getNumRows(); ++i )
+				for( int j=0; j<lschema.size(); j++ )	{
+					if( UtilFunctions.compareTo(lschema.get(j), frame1.get(i, j), frame1Backup.get(i, j)) != 0)
+						Assert.fail("Target value for cell ("+ i + "," + j + ") is " + frame1.get(i,  j) + 
+								", is not same as original value " + frame1Backup.get(i, j));
+				}		
+			
+			lschema = frame2.getSchema();
+			for ( int i=0; i<frame2.getNumRows(); ++i )
+				for( int j=0; j<lschema.size(); j++ )	{
+					if( UtilFunctions.compareTo(lschema.get(j), frame2.get(i, j), frame2Backup.get(i, j)) != 0)
+						Assert.fail("Target value for cell ("+ i + "," + j + ") is " + frame2.get(i,  j) + 
+								", is not same as original value " + frame2Backup.get(i, j));
+				}		
+
+			// update some data in original/backup frames
+			int updateRow = rows/2;
+			lschema = frame1.getSchema();
+			for( int j=0; j<lschema.size(); j++ )	{
+				switch( lschema.get(j) ) {
+					case STRING:  frame1.set(updateRow,  j,  "String:"+ frame1.get(updateRow, j)); break;
+					case BOOLEAN: frame1.set(updateRow,  j, ((Boolean)frame1.get(updateRow, j))?(new Boolean(false)):(new Boolean(true))); break;
+					case INT:     frame1.set(updateRow,  j, (Long)frame1.get(updateRow, j) * 2 + 5); break;
+					case DOUBLE:  frame1.set(updateRow,  j, (Double)frame1.get(updateRow, j) * 2 + 7); break;
+					default: throw new RuntimeException("Unsupported value type: "+lschema.get(j));
+				}
+			}		
+			
+			lschema = frame2.getSchema();
+			for( int j=0; j<lschema.size(); j++ )	{
+				switch( lschema.get(j) ) {
+					case STRING:  frame2.set(updateRow,  j,  "String:"+ frame2.get(updateRow, j)); break;
+					case BOOLEAN: frame2.set(updateRow,  j, ((Boolean)frame2.get(updateRow, j))?(new Boolean(false)):(new Boolean(true))); break;
+					case INT:     frame2.set(updateRow,  j, (Long)frame2.get(updateRow, j) * 2 + 4); break;
+					case DOUBLE:  frame2.set(updateRow,  j, (Double)frame2.get(updateRow, j) * 2 + 6); break;
+					default: throw new RuntimeException("Unsupported value type: "+lschema.get(j));
+				}
+			}		
+
+			// Verify that data modified only on target frames
+			lschema = frame1.getSchema();
+			for( int j=0; j<lschema.size(); j++ )	{
+				if( UtilFunctions.compareTo(lschema.get(j), frame1.get(updateRow, j), frame1Backup.get(updateRow, j)) == 0)
+					Assert.fail("Updated value for cell ("+ updateRow + "," + j + ") is " + frame1.get(updateRow,  j) + 
+							", same as original value "+frame1Backup.get(updateRow, j));
+			}		
+			
+			lschema = frame2.getSchema();
+			for( int j=0; j<lschema.size(); j++ )	{
+				if( UtilFunctions.compareTo(lschema.get(j), frame2.get(updateRow, j), frame2Backup.get(updateRow, j)) == 0)
+					Assert.fail("Updated value for cell ("+ updateRow + "," + j + ") is " + frame2.get(updateRow,  j) + 
+							", same as original value "+frame2Backup.get(updateRow, j));
+			}		
+			
+		}
+		catch(Exception ex) {
+			ex.printStackTrace();
+			throw new RuntimeException(ex);
+		}
+	}
+}