You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ac...@apache.org on 2016/03/23 09:36:58 UTC
incubator-systemml git commit: [SYSTEMML-573] Frame IO Read
Repository: incubator-systemml
Updated Branches:
refs/heads/master 2e032f707 -> c1f25ef6c
[SYSTEMML-573] Frame IO Read
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c1f25ef6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c1f25ef6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c1f25ef6
Branch: refs/heads/master
Commit: c1f25ef6c40c73475e20b2af2841a2c044a8901c
Parents: 2e032f7
Author: Arvind Surve <ac...@yahoo.com>
Authored: Wed Mar 23 01:36:36 2016 -0700
Committer: Arvind Surve <ac...@yahoo.com>
Committed: Wed Mar 23 01:36:36 2016 -0700
----------------------------------------------------------------------
.../apache/sysml/runtime/io/FrameReader.java | 4 +-
.../runtime/io/FrameReaderBinaryBlock.java | 87 +++++++-
.../sysml/runtime/io/FrameReaderTextCSV.java | 149 ++++++++++++++
.../sysml/runtime/io/FrameReaderTextCell.java | 198 ++++++++++++++++++-
.../sysml/runtime/matrix/data/FrameBlock.java | 40 ++++
.../sysml/runtime/util/UtilFunctions.java | 18 ++
.../functions/frame/FrameCopyTest.java | 181 +++++++++++++++++
7 files changed, 673 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index 023c0ab..a39d129 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -150,7 +150,7 @@ public abstract class FrameReader
* @throws DMLRuntimeException
* @throws IOException
*/
- protected static FrameBlock createOutputFrameBlock(List<ValueType> schema, List<String> names)
+ protected static FrameBlock createOutputFrameBlock(List<ValueType> schema, List<String> names, long nrow)
throws IOException, DMLRuntimeException
{
//check schema and column names
@@ -159,7 +159,7 @@ public abstract class FrameReader
//prepare result frame block
FrameBlock ret = new FrameBlock(schema, names);
-
+ ret.ensureAllocatedColumns((int)nrow);
return ret;
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
index 6769d21..62c7e6c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
@@ -22,9 +22,17 @@ package org.apache.sysml.runtime.io;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+
+
public class FrameReaderBinaryBlock extends FrameReader
{
@@ -45,9 +53,86 @@ public class FrameReaderBinaryBlock extends FrameReader
throws IOException, DMLRuntimeException
{
//allocate output frame block
- FrameBlock ret = null;
+ FrameBlock ret = createOutputFrameBlock(schema, names, rlen);
+
+ //prepare file access
+ JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
+ FileSystem fs = FileSystem.get(job);
+ Path path = new Path( fname );
+
+ //check existence and non-empty file
+ checkValidInputFile(fs, path);
+
+ //core read
+ readBinaryBlockFrameFromHDFS(path, job, fs, ret, rlen, clen);
return ret;
}
+
+ /**
+ * Note: For efficiency, we directly use SequenceFile.Reader instead of SequenceFileInputFormat-
+ * InputSplits-RecordReader (SequenceFileRecordReader). First, this has no drawbacks since the
+ * SequenceFileRecordReader internally uses SequenceFile.Reader as well. Second, it is
+ * advantageous if the actual sequence files are larger than the file splits created by
+ * informat.getSplits (which is usually aligned to the HDFS block size) because then there is
+ * overhead for finding the actual split between our 1k-1k blocks. This case happens
+ * if the read frame was create by CP or when jobs directly write to large output files
+ * (e.g., parfor matrix partitioning).
+ *
+ * @param path
+ * @param job
+ * @param fs
+ * @param dest
+ * @param rlen
+ * @param clen
+ *
+ * @throws IOException
+ * @throws DMLRuntimeException
+ */
+ @SuppressWarnings("deprecation")
+ private static void readBinaryBlockFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, long rlen, long clen )
+ throws IOException, DMLRuntimeException
+ {
+ MatrixIndexes key = new MatrixIndexes();
+ FrameBlock value = new FrameBlock();
+
+ for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files
+ {
+ //directly read from sequence files (individual partfiles)
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
+
+ try
+ {
+ //note: next(key, value) does not yet exploit the given serialization classes, record reader does but is generally slower.
+ while( reader.next(key, value) )
+ {
+ int row_offset = (int)(key.getRowIndex()-1)*ConfigurationManager.getBlocksize();
+ int col_offset = (int)(key.getColumnIndex()-1)*ConfigurationManager.getBlocksize();
+
+ int rows = value.getNumRows();
+ int cols = value.getNumColumns();
+
+ //bound check per block
+ if( row_offset + rows < 0 || row_offset + rows > rlen || col_offset + cols<0 || col_offset + cols > clen )
+ {
+ throw new IOException("Frame block ["+(row_offset+1)+":"+(row_offset+rows)+","+(col_offset+1)+":"+(col_offset+cols)+"] " +
+ "out of overall frame range [1:"+rlen+",1:"+clen+"].");
+ }
+
+ {
+ dest.copy( row_offset, row_offset+rows-1,
+ col_offset, col_offset+cols-1,
+ value,
+ dest.getNumRows(), dest.getNumColumns());
+ }
+ }
+ }
+ finally
+ {
+ IOUtilFunctions.closeSilently(reader);
+ }
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
index b364916..2086ea3 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
@@ -19,13 +19,24 @@
package org.apache.sysml.runtime.io;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.CSVReblockMR;
import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.UtilFunctions;
public class FrameReaderTextCSV extends FrameReader
{
@@ -56,8 +67,146 @@ public class FrameReaderTextCSV extends FrameReader
{
//allocate output frame block
FrameBlock ret = null;
+ if( rlen>0 && clen>0 ) //otherwise CSV reblock based on file size for frame w/ unknown dimensions
+ ret = createOutputFrameBlock(schema, names, rlen);
+
+ //prepare file access
+ JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
+ FileSystem fs = FileSystem.get(job);
+ Path path = new Path( fname );
+
+ //check existence and non-empty file
+ checkValidInputFile(fs, path);
+
+ //core read
+ ret = readCSVFrameFromHDFS(path, job, fs, ret, schema, names, rlen, clen,
+ _props.hasHeader(), _props.getDelim(), _props.isFill() );
return ret;
}
+ /**
+ *
+ * @param path
+ * @param job
+ * @param fs
+ * @param dest
+ * @param rlen
+ * @param clen
+ * @param hasHeader
+ * @param delim
+ * @param fill
+ * @return
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ private FrameBlock readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest,
+ List<ValueType> schema, List<String> names, long rlen, long clen, boolean hasHeader, String delim, boolean fill)
+ throws IOException
+ {
+ ArrayList<Path> files=new ArrayList<Path>();
+ if(fs.isDirectory(path)) {
+ for(FileStatus stat: fs.listStatus(path, CSVReblockMR.hiddenFileFilter))
+ files.add(stat.getPath());
+ Collections.sort(files);
+ }
+ else
+ files.add(path);
+
+ if ( dest == null ) {
+ dest = computeCSVSize(files, fs, schema, names, hasHeader, delim);
+ clen = dest.getNumColumns();
+ }
+
+ /////////////////////////////////////////
+ String value = null;
+ int row = 0;
+ int col = -1;
+
+ for(int fileNo=0; fileNo<files.size(); fileNo++)
+ {
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo))));
+ if(fileNo==0 && hasHeader )
+ br.readLine(); //ignore header
+
+ // Read the data
+ boolean emptyValuesFound = false;
+ try
+ {
+ while( (value=br.readLine())!=null ) //foreach line
+ {
+ String cellStr = value.toString().trim();
+ emptyValuesFound = false;
+ String[] parts = IOUtilFunctions.split(cellStr, delim);
+ col = 0;
+
+ for( String part : parts ) //foreach cell
+ {
+ part = part.trim();
+ if ( part.isEmpty() ) {
+ //TODO: Do we need to handle empty cell condition?
+ emptyValuesFound = true;
+ }
+ else {
+ dest.set(row, col, UtilFunctions.stringToObject(schema.get(col), part));
+ }
+ col++;
+ }
+
+ //sanity checks for empty values and number of columns
+ IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound);
+ IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, parts, clen);
+ row++;
+ }
+ }
+ finally {
+ IOUtilFunctions.closeSilently(br);
+ }
+ }
+
+ return dest;
+ }
+
+ /**
+ *
+ * @param files
+ * @param fs
+ * @param schema
+ * @param names
+ * @param hasHeader
+ * @param delim
+ * @return
+ * @throws IOException
+ */
+ private FrameBlock computeCSVSize ( List<Path> files, FileSystem fs, List<ValueType> schema, List<String> names, boolean hasHeader, String delim)
+ throws IOException
+ {
+ int nrow = 0;
+ for(int fileNo=0; fileNo<files.size(); fileNo++)
+ {
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo))));
+ try
+ {
+ // Read the header line, if there is one.
+ if(fileNo==0)
+ {
+ if ( hasHeader )
+ br.readLine(); //ignore header
+ }
+
+ while ( br.readLine() != null ) {
+ nrow++;
+ }
+ }
+ finally {
+ IOUtilFunctions.closeSilently(br);
+ }
+ }
+
+ //create new frame block
+ FrameBlock frameBlock = new FrameBlock(schema, names);
+ frameBlock.ensureAllocatedColumns(nrow);
+ return frameBlock;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
index 5488f94..91b7892 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
@@ -19,12 +19,28 @@
package org.apache.sysml.runtime.io;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.FastStringTokenizer;
+import org.apache.sysml.runtime.util.UtilFunctions;
public class FrameReaderTextCell extends FrameReader
{
@@ -46,9 +62,189 @@ public class FrameReaderTextCell extends FrameReader
throws IOException, DMLRuntimeException
{
//allocate output frame block
- FrameBlock ret = null;
+ FrameBlock ret = createOutputFrameBlock(schema, names, rlen);
+
+ //prepare file access
+ JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
+ FileSystem fs = FileSystem.get(job);
+ Path path = new Path( fname );
+
+ //check existence and non-empty file
+ checkValidInputFile(fs, path);
+
+ //core read
+ if( fs.isDirectory(path) )
+ readTextCellFrameFromHDFS(path, job, ret, schema, names, rlen, clen);
+ else
+ readRawTextCellFrameFromHDFS(path, job, fs, ret, schema, names, rlen, clen);
+
+ return ret;
+ }
+
+ /**
+ *
+ * @param is
+ * @param schema
+ * @param names
+ * @param rlen
+ * @param clen
+ * @return
+ * @throws DMLRuntimeException
+ * @throws IOException
+ */
+ public FrameBlock readFrameFromInputStream(InputStream is, List<ValueType> schema, List<String> names, long rlen, long clen)
+ throws IOException, DMLRuntimeException
+ {
+ //allocate output frame block
+ FrameBlock ret = createOutputFrameBlock(schema, names, rlen);
+
+ //core read
+ readRawTextCellFrameFromInputStream(is, ret, schema, names, rlen, clen);
return ret;
}
+
+
+ /**
+ *
+ * @param path
+ * @param job
+ * @param dest
+ * @param schema
+ * @param names
+ * @param rlen
+ * @param clen
+ * @return
+ * @throws IOException
+ */
+ private void readTextCellFrameFromHDFS( Path path, JobConf job, FrameBlock dest,
+ List<ValueType> schema, List<String> names, long rlen, long clen)
+ throws IOException
+ {
+ FileInputFormat.addInputPath(job, path);
+ TextInputFormat informat = new TextInputFormat();
+ informat.configure(job);
+ InputSplit[] splits = informat.getSplits(job, 1);
+
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ int row = -1;
+ int col = -1;
+
+ try
+ {
+ FastStringTokenizer st = new FastStringTokenizer(' ');
+
+ for(InputSplit split: splits)
+ {
+ RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
+
+ try
+ {
+ while( reader.next(key, value) )
+ {
+ st.reset( value.toString() ); //reinit tokenizer
+ row = st.nextInt()-1;
+ col = st.nextInt()-1;
+ dest.set(row, col, UtilFunctions.stringToObject(schema.get(col), st.nextToken()));
+ }
+ }
+ finally
+ {
+ if( reader != null )
+ reader.close();
+ }
+ }
+ }
+ catch(Exception ex)
+ {
+ //post-mortem error handling and bounds checking
+ if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > clen )
+ {
+ throw new IOException("Frame cell ["+(row+1)+","+(col+1)+"] " +
+ "out of overall frame range [1:"+rlen+",1:"+clen+"].");
+ }
+ else
+ {
+ throw new IOException( "Unable to read frame in text cell format.", ex );
+ }
+ }
+ }
+
+
+ /**
+ *
+ * @param path
+ * @param job
+ * @param fs
+ * @param dest
+ * @param schema
+ * @param names
+ * @param rlen
+ * @param clen
+ * @return
+ * @throws IOException
+ */
+ private void readRawTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest,
+ List<ValueType> schema, List<String> names, long rlen, long clen)
+ throws IOException
+ {
+ //create input stream for path
+ InputStream inputStream = fs.open(path);
+
+ //actual read
+ readRawTextCellFrameFromInputStream(inputStream, dest, schema, names, rlen, clen);
+ }
+
+ /**
+ *
+ * @param is
+ * @param dest
+ * @param schema
+ * @param names
+ * @param rlen
+ * @param clen
+ * @return
+ * @throws IOException
+ */
+ private void readRawTextCellFrameFromInputStream( InputStream is, FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long clen)
+ throws IOException
+ {
+ BufferedReader br = new BufferedReader(new InputStreamReader( is ));
+
+ String value = null;
+ int row = -1;
+ int col = -1;
+
+ try
+ {
+ FastStringTokenizer st = new FastStringTokenizer(' ');
+
+ while( (value=br.readLine())!=null )
+ {
+ st.reset( value ); //reinit tokenizer
+ row = st.nextInt()-1;
+ col = st.nextInt()-1;
+ dest.set(row, col, UtilFunctions.stringToObject(schema.get(col), st.nextToken()));
+ }
+ }
+ catch(Exception ex)
+ {
+ //post-mortem error handling and bounds checking
+ if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > clen )
+ {
+ throw new IOException("Frame cell ["+(row+1)+","+(col+1)+"] " +
+ "out of overall frame range [1:"+rlen+",1:"+clen+"].", ex);
+ }
+ else
+ {
+ throw new IOException( "Unable to read frame in raw text cell format.", ex );
+ }
+ }
+ finally
+ {
+ IOUtilFunctions.closeSilently(br);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index bd786d7..4dbd398 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -65,6 +65,11 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
_coldata = new ArrayList<Array>();
}
+ public FrameBlock(FrameBlock that) {
+ this(that.getSchema());
+ copy(that);
+ }
+
public FrameBlock(int ncols, ValueType vt) {
this();
_schema.addAll(Collections.nCopies(ncols, vt));
@@ -533,6 +538,28 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
return ret;
}
+ public void copy(FrameBlock src)
+ {
+ //allocate
+ ensureAllocatedColumns(src.getNumRows());
+
+ //actual copy
+ for( int i=0; i<src.getNumColumns(); i++ )
+ _coldata.get(i).set(0, src.getNumRows()-1, src._coldata.get(i));
+ }
+
+
+ public void copy(int rl, int ru, int cl, int cu, FrameBlock src, int rlen, int clen)
+ throws DMLRuntimeException
+ {
+ ensureAllocatedColumns(rlen);
+
+ //copy values
+ for( int i=cl; i<cu; i++ )
+ _coldata.get(i).set(rl, ru-rl-1, src._coldata.get(i));
+ }
+
+
///////
// row iterators (over strings and boxed objects)
@@ -616,6 +643,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
public abstract T get(int index);
public abstract void set(int index, T value);
public abstract void set(int rl, int ru, Array value);
+ public abstract void set(int rl, int ru, Array value, int rlSrc);
public abstract void append(String value);
public abstract void append(T value);
public abstract Array clone();
@@ -641,6 +669,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
public void set(int rl, int ru, Array value) {
System.arraycopy(((StringArray)value)._data, 0, _data, rl, ru-rl+1);
}
+ public void set(int rl, int ru, Array value, int rlSrc) {
+ System.arraycopy(((StringArray)value)._data, rlSrc, _data, rl, ru-rl+1);
+ }
public void append(String value) {
if( _data.length <= _size )
_data = Arrays.copyOf(_data, newSize());
@@ -684,6 +715,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
public void set(int rl, int ru, Array value) {
System.arraycopy(((BooleanArray)value)._data, 0, _data, rl, ru-rl+1);
}
+ public void set(int rl, int ru, Array value, int rlSrc) {
+ System.arraycopy(((BooleanArray)value)._data, rlSrc, _data, rl, ru-rl+1);
+ }
public void append(String value) {
append(Boolean.parseBoolean(value));
}
@@ -728,6 +762,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
public void set(int rl, int ru, Array value) {
System.arraycopy(((LongArray)value)._data, 0, _data, rl, ru-rl+1);
}
+ public void set(int rl, int ru, Array value, int rlSrc) {
+ System.arraycopy(((LongArray)value)._data, rlSrc, _data, rl, ru-rl+1);
+ }
public void append(String value) {
append((value!=null)?Long.parseLong(value):null);
}
@@ -772,6 +809,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
public void set(int rl, int ru, Array value) {
System.arraycopy(((DoubleArray)value)._data, 0, _data, rl, ru-rl+1);
}
+ public void set(int rl, int ru, Array value, int rlSrc) {
+ System.arraycopy(((DoubleArray)value)._data, rlSrc, _data, rl, ru-rl+1);
+ }
public void append(String value) {
append((value!=null)?Double.parseDouble(value):null);
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 853915c..6801f3b 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -346,6 +346,24 @@ public class UtilFunctions
}
}
+ /**
+ *
+ * @param vt
+ * @param in1
+ * @param in2
+ * @return
+ */
+ public static int compareTo(ValueType vt, Object in1, Object in2) {
+ switch( vt ) {
+
+ case STRING: return ((String)in1).compareTo((String)in2);
+ case BOOLEAN: return ((Boolean)in1).compareTo((Boolean)in2);
+ case INT: return ((Long)in1).compareTo((Long)in2);
+ case DOUBLE: return ((Double)in1).compareTo((Double)in2);
+ default: throw new RuntimeException("Unsupported value type: "+vt);
+ }
+ }
+
public static boolean isIntegerNumber( String str )
{
byte[] c = str.getBytes();
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c1f25ef6/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
new file mode 100644
index 0000000..f475026
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.frame;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.instructions.cp.AppendCPInstruction.AppendType;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FrameCopyTest extends AutomatedTestBase
+{
+ private final static int rows = 1593;
+ private final static ValueType[] schemaStrings = new ValueType[]{ValueType.STRING, ValueType.STRING, ValueType.STRING};
+ private final static ValueType[] schemaMixed = new ValueType[]{ValueType.STRING, ValueType.DOUBLE, ValueType.INT, ValueType.BOOLEAN};
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ }
+
+ @Test
+ public void testFrameStringsStringsCBind() {
+ runFrameCopyTest(schemaStrings, schemaStrings, AppendType.CBIND);
+ }
+
+ @Test
+ public void testFrameStringsStringsRBind() { //note: ncol(A)=ncol(B)
+ runFrameCopyTest(schemaStrings, schemaStrings, AppendType.RBIND);
+ }
+
+ @Test
+ public void testFrameMixedStringsCBind() {
+ runFrameCopyTest(schemaMixed, schemaStrings, AppendType.CBIND);
+ }
+
+ @Test
+ public void testFrameStringsMixedCBind() {
+ runFrameCopyTest(schemaStrings, schemaMixed, AppendType.CBIND);
+ }
+
+ @Test
+ public void testFrameMixedMixedCBind() {
+ runFrameCopyTest(schemaMixed, schemaMixed, AppendType.CBIND);
+ }
+
+ @Test
+ public void testFrameMixedMixedRBind() { //note: ncol(A)=ncol(B)
+ runFrameCopyTest(schemaMixed, schemaMixed, AppendType.RBIND);
+ }
+
+
+ /**
+ *
+ * @param sparseM1
+ * @param sparseM2
+ * @param instType
+ */
+ private void runFrameCopyTest( ValueType[] schema1, ValueType[] schema2, AppendType atype)
+ {
+ try
+ {
+ //data generation
+ double[][] A = getRandomMatrix(rows, schema1.length, -10, 10, 0.9, 2373);
+ double[][] B = getRandomMatrix(rows, schema2.length, -10, 10, 0.9, 129);
+
+ //init data frame 1
+ List<ValueType> lschema1 = Arrays.asList(schema1);
+ FrameBlock frame1 = new FrameBlock(lschema1);
+ Object[] row1 = new Object[lschema1.size()];
+ for( int i=0; i<rows; i++ ) {
+ for( int j=0; j<lschema1.size(); j++ )
+ A[i][j] = UtilFunctions.objectToDouble(lschema1.get(j),
+ row1[j] = UtilFunctions.doubleToObject(lschema1.get(j), A[i][j]));
+ frame1.appendRow(row1);
+ }
+
+ //init data frame 2
+ List<ValueType> lschema2 = Arrays.asList(schema2);
+ FrameBlock frame2 = new FrameBlock(lschema2);
+ Object[] row2 = new Object[lschema2.size()];
+ for( int i=0; i<rows; i++ ) {
+ for( int j=0; j<lschema2.size(); j++ )
+ B[i][j] = UtilFunctions.objectToDouble(lschema2.get(j),
+ row2[j] = UtilFunctions.doubleToObject(lschema2.get(j), B[i][j]));
+ frame2.appendRow(row2);
+ }
+
+ //copy from one frame to another.
+ FrameBlock frame1Backup = new FrameBlock(frame1.getSchema(), frame1.getColumnNames());
+ frame1Backup.copy(frame1);
+
+ FrameBlock frame2Backup = new FrameBlock(frame2.getSchema(), frame2.getColumnNames());
+ frame2Backup.copy(frame2);
+
+ // Verify copied data.
+ List<ValueType> lschema = frame1.getSchema();
+ for ( int i=0; i<frame1.getNumRows(); ++i )
+ for( int j=0; j<lschema.size(); j++ ) {
+ if( UtilFunctions.compareTo(lschema.get(j), frame1.get(i, j), frame1Backup.get(i, j)) != 0)
+ Assert.fail("Target value for cell ("+ i + "," + j + ") is " + frame1.get(i, j) +
+ ", is not same as original value " + frame1Backup.get(i, j));
+ }
+
+ lschema = frame2.getSchema();
+ for ( int i=0; i<frame2.getNumRows(); ++i )
+ for( int j=0; j<lschema.size(); j++ ) {
+ if( UtilFunctions.compareTo(lschema.get(j), frame2.get(i, j), frame2Backup.get(i, j)) != 0)
+ Assert.fail("Target value for cell ("+ i + "," + j + ") is " + frame2.get(i, j) +
+ ", is not same as original value " + frame2Backup.get(i, j));
+ }
+
+ // update some data in original/backup frames
+ int updateRow = rows/2;
+ lschema = frame1.getSchema();
+ for( int j=0; j<lschema.size(); j++ ) {
+ switch( lschema.get(j) ) {
+ case STRING: frame1.set(updateRow, j, "String:"+ frame1.get(updateRow, j)); break;
+ case BOOLEAN: frame1.set(updateRow, j, ((Boolean)frame1.get(updateRow, j))?(new Boolean(false)):(new Boolean(true))); break;
+ case INT: frame1.set(updateRow, j, (Long)frame1.get(updateRow, j) * 2 + 5); break;
+ case DOUBLE: frame1.set(updateRow, j, (Double)frame1.get(updateRow, j) * 2 + 7); break;
+ default: throw new RuntimeException("Unsupported value type: "+lschema.get(j));
+ }
+ }
+
+ lschema = frame2.getSchema();
+ for( int j=0; j<lschema.size(); j++ ) {
+ switch( lschema.get(j) ) {
+ case STRING: frame2.set(updateRow, j, "String:"+ frame2.get(updateRow, j)); break;
+ case BOOLEAN: frame2.set(updateRow, j, ((Boolean)frame2.get(updateRow, j))?(new Boolean(false)):(new Boolean(true))); break;
+ case INT: frame2.set(updateRow, j, (Long)frame2.get(updateRow, j) * 2 + 4); break;
+ case DOUBLE: frame2.set(updateRow, j, (Double)frame2.get(updateRow, j) * 2 + 6); break;
+ default: throw new RuntimeException("Unsupported value type: "+lschema.get(j));
+ }
+ }
+
+ // Verify that data modified only on target frames
+ lschema = frame1.getSchema();
+ for( int j=0; j<lschema.size(); j++ ) {
+ if( UtilFunctions.compareTo(lschema.get(j), frame1.get(updateRow, j), frame1Backup.get(updateRow, j)) == 0)
+ Assert.fail("Updated value for cell ("+ updateRow + "," + j + ") is " + frame1.get(updateRow, j) +
+ ", same as original value "+frame1Backup.get(updateRow, j));
+ }
+
+ lschema = frame2.getSchema();
+ for( int j=0; j<lschema.size(); j++ ) {
+ if( UtilFunctions.compareTo(lschema.get(j), frame2.get(updateRow, j), frame2Backup.get(updateRow, j)) == 0)
+ Assert.fail("Updated value for cell ("+ updateRow + "," + j + ") is " + frame2.get(updateRow, j) +
+ ", same as original value "+frame2Backup.get(updateRow, j));
+ }
+
+ }
+ catch(Exception ex) {
+ ex.printStackTrace();
+ throw new RuntimeException(ex);
+ }
+ }
+}