You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ac...@apache.org on 2016/07/19 04:10:41 UTC
incubator-systemml git commit: [SYSTEMML-562] Frame Append operation
Repository: incubator-systemml
Updated Branches:
refs/heads/master 09477a7b0 -> c22f239e3
[SYSTEMML-562] Frame Append operation
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c22f239e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c22f239e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c22f239e
Branch: refs/heads/master
Commit: c22f239e3fd3b526190812919960684bfcf1a715
Parents: 09477a7
Author: Arvind Surve <ac...@yahoo.com>
Authored: Mon Jul 18 21:10:08 2016 -0700
Committer: Arvind Surve <ac...@yahoo.com>
Committed: Mon Jul 18 21:10:09 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/sysml/hops/BinaryOp.java | 7 +-
.../controlprogram/caching/CacheableData.java | 39 +++
.../controlprogram/caching/MatrixObject.java | 44 ---
.../instructions/SPInstructionParser.java | 18 +-
.../instructions/cp/VariableCPInstruction.java | 18 +-
.../spark/AppendMSPInstruction.java | 257 +----------------
.../spark/AppendRSPInstruction.java | 85 +-----
.../spark/FrameAppendMSPInstruction.java | 157 ++++++++++
.../spark/FrameAppendRSPInstruction.java | 170 +++++++++++
.../spark/MatrixAppendMSPInstruction.java | 284 +++++++++++++++++++
.../spark/MatrixAppendRSPInstruction.java | 112 ++++++++
.../sysml/runtime/matrix/data/FrameBlock.java | 2 +
.../sysml/runtime/util/UtilFunctions.java | 19 ++
.../functions/frame/FrameAppendDistTest.java | 226 +++++++++++++++
src/test/scripts/functions/frame/FrameAppend.R | 33 +++
.../scripts/functions/frame/FrameAppend.dml | 29 ++
16 files changed, 1108 insertions(+), 392 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/hops/BinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/BinaryOp.java b/src/main/java/org/apache/sysml/hops/BinaryOp.java
index 94de0e7..65e9232 100644
--- a/src/main/java/org/apache/sysml/hops/BinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/BinaryOp.java
@@ -1125,7 +1125,7 @@ public class BinaryOp extends Hop
Lop offset = createOffsetLop( left, cbind ); //offset 1st input
AppendMethod am = optFindAppendSPMethod(left.getDim1(), left.getDim2(), right.getDim1(), right.getDim2(),
- right.getRowsInBlock(), right.getColsInBlock(), right.getNnz(), cbind);
+ right.getRowsInBlock(), right.getColsInBlock(), right.getNnz(), cbind, dt);
switch( am )
{
@@ -1281,16 +1281,17 @@ public class BinaryOp extends Hop
return AppendMethod.MR_GAPPEND;
}
- private static AppendMethod optFindAppendSPMethod( long m1_dim1, long m1_dim2, long m2_dim1, long m2_dim2, long m1_rpb, long m1_cpb, long m2_nnz, boolean cbind )
+ private static AppendMethod optFindAppendSPMethod( long m1_dim1, long m1_dim2, long m2_dim1, long m2_dim2, long m1_rpb, long m1_cpb, long m2_nnz, boolean cbind, DataType dt )
{
if(FORCED_APPEND_METHOD != null) {
return FORCED_APPEND_METHOD;
}
//check for best case (map-only w/o shuffle)
- if( m2_dim1 >= 1 && m2_dim2 >= 1 //rhs dims known
+ if(( m2_dim1 >= 1 && m2_dim2 >= 1 //rhs dims known
&& (cbind && m2_dim2 <= m1_cpb //rhs is smaller than column block
|| !cbind && m2_dim1 <= m1_rpb) ) //rhs is smaller than row block
+ && ((dt == DataType.MATRIX) || (dt == DataType.FRAME && cbind)))
{
if( OptimizerUtils.checkSparkBroadcastMemoryBudget(m2_dim1, m2_dim2, m1_rpb, m1_cpb, m2_nnz) ) {
return AppendMethod.MR_MAPPEND;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index d60c607..2b45ddd 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -1422,4 +1422,43 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
public static synchronized void enableCaching() {
_activeFlag = true;
}
+
+ /**
+ *
+ * @param fName
+ * @param outputFormat
+ * @return
+ * @throws CacheException
+ */
+ public synchronized boolean moveData(String fName, String outputFormat)
+ throws CacheException
+ {
+ boolean ret = false;
+
+ try
+ {
+ //export or rename to target file on hdfs
+ if( (isDirty() || (!isEqualOutputFormat(outputFormat) && isEmpty(true))) ||
+ (getRDDHandle() != null && !MapReduceTool.existsFileOnHDFS(_hdfsFileName)))
+ {
+ exportData(fName, outputFormat);
+ ret = true;
+ }
+ else if( isEqualOutputFormat(outputFormat) )
+ {
+ MapReduceTool.deleteFileIfExistOnHDFS(fName);
+ MapReduceTool.deleteFileIfExistOnHDFS(fName+".mtd");
+ MapReduceTool.renameFileOnHDFS( _hdfsFileName, fName );
+ writeMetaData( fName, outputFormat, null );
+ ret = true;
+ }
+ }
+ catch (Exception e)
+ {
+ throw new CacheException ("Move to " + fName + " failed.", e);
+ }
+
+ return ret;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index 5d5f41f..4148545 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -282,50 +282,6 @@ public class MatrixObject extends CacheableData<MatrixBlock>
return str.toString();
}
-
- /**
- *
- * @param fName
- * @param outputFormat
- * @return
- * @throws CacheException
- */
- public synchronized boolean moveData(String fName, String outputFormat)
- throws CacheException
- {
- boolean ret = false;
-
- try
- {
- //ensure input file is persistent on hdfs (pending RDD operations),
- //file might have been written during export or collect via write/read
- if( getRDDHandle() != null && !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) {
- writeBlobFromRDDtoHDFS(getRDDHandle(), _hdfsFileName, outputFormat);
- }
-
- //export or rename to target file on hdfs
- if( isDirty() || (!isEqualOutputFormat(outputFormat) && isEmpty(true)))
- {
- exportData(fName, outputFormat);
- ret = true;
- }
- else if( isEqualOutputFormat(outputFormat) )
- {
- MapReduceTool.deleteFileIfExistOnHDFS(fName);
- MapReduceTool.deleteFileIfExistOnHDFS(fName+".mtd");
- MapReduceTool.renameFileOnHDFS( _hdfsFileName, fName );
- writeMetaData( fName, outputFormat, null );
- ret = true;
- }
- }
- catch (Exception e)
- {
- throw new CacheException ("Move to " + fName + " failed.", e);
- }
-
- return ret;
- }
-
// *********************************************
// *** ***
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index e8437fb..c74b44e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -34,13 +34,12 @@ import org.apache.sysml.lops.WeightedSquaredLoss;
import org.apache.sysml.lops.WeightedSquaredLossR;
import org.apache.sysml.lops.WeightedUnaryMM;
import org.apache.sysml.lops.WeightedUnaryMMR;
+import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.instructions.spark.AggregateTernarySPInstruction;
import org.apache.sysml.runtime.instructions.spark.AggregateUnarySPInstruction;
import org.apache.sysml.runtime.instructions.spark.AppendGAlignedSPInstruction;
import org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.AppendMSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.AppendRSPInstruction;
import org.apache.sysml.runtime.instructions.spark.ArithmeticBinarySPInstruction;
import org.apache.sysml.runtime.instructions.spark.BinUaggChainSPInstruction;
import org.apache.sysml.runtime.instructions.spark.BuiltinBinarySPInstruction;
@@ -54,9 +53,13 @@ import org.apache.sysml.runtime.instructions.spark.CovarianceSPInstruction;
import org.apache.sysml.runtime.instructions.spark.CpmmSPInstruction;
import org.apache.sysml.runtime.instructions.spark.CumulativeAggregateSPInstruction;
import org.apache.sysml.runtime.instructions.spark.CumulativeOffsetSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.FrameAppendMSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.FrameAppendRSPInstruction;
import org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction;
import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction;
import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.MatrixAppendMSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.MatrixAppendRSPInstruction;
import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction;
import org.apache.sysml.runtime.instructions.spark.MultiReturnParameterizedBuiltinSPInstruction;
import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction;
@@ -78,6 +81,7 @@ import org.apache.sysml.runtime.instructions.spark.QuantileSortSPInstruction;
import org.apache.sysml.runtime.instructions.spark.UaggOuterChainSPInstruction;
import org.apache.sysml.runtime.instructions.spark.WriteSPInstruction;
import org.apache.sysml.runtime.instructions.spark.ZipmmSPInstruction;
+import org.apache.sysml.runtime.util.UtilFunctions;
public class SPInstructionParser extends InstructionParser
@@ -389,7 +393,10 @@ public class SPInstructionParser extends InstructionParser
return MatrixReshapeSPInstruction.parseInstruction(str);
case MAppend:
- return AppendMSPInstruction.parseInstruction(str);
+ if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX)
+ return MatrixAppendMSPInstruction.parseInstruction(str);
+ else
+ return FrameAppendMSPInstruction.parseInstruction(str);
case GAppend:
return AppendGSPInstruction.parseInstruction(str);
@@ -398,7 +405,10 @@ public class SPInstructionParser extends InstructionParser
return AppendGAlignedSPInstruction.parseInstruction(str);
case RAppend:
- return AppendRSPInstruction.parseInstruction(str);
+ if(UtilFunctions.getDataType(str, 1) == DataType.MATRIX)
+ return MatrixAppendRSPInstruction.parseInstruction(str);
+ else
+ return FrameAppendRSPInstruction.parseInstruction(str);
case Rand:
return RandSPInstruction.parseInstruction(str);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
index 1fae8fc..ee86b59 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
@@ -28,6 +28,7 @@ import org.apache.sysml.lops.UnaryCP;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
@@ -655,6 +656,7 @@ public class VariableCPInstruction extends CPInstruction
* @param ec
* @throws DMLRuntimeException
*/
+ @SuppressWarnings("rawtypes")
private void processMoveInstruction(ExecutionContext ec) throws DMLRuntimeException {
if ( input3 == null ) {
@@ -683,16 +685,22 @@ public class VariableCPInstruction extends CPInstruction
if ( ec.getVariable(input1.getName()) == null )
throw new DMLRuntimeException("Unexpected error: could not find a data object for variable name:" + input1.getName() + ", while processing instruction " +this.toString());
- MatrixObject mo = (MatrixObject) ec.getVariable(input1.getName());
+ Object object = ec.getVariable(input1.getName());
+
if ( input3.getName().equalsIgnoreCase("binaryblock") ) {
- boolean success = mo.moveData(input2.getName(), input3.getName());
+ boolean success = false;
+ success = ((CacheableData)object).moveData(input2.getName(), input3.getName());
if (!success) {
throw new DMLRuntimeException("Failed to move var " + input1.getName() + " to file " + input2.getName() + ".");
}
}
- else
- throw new DMLRuntimeException("Unexpected formats while copying: from blocks ["
- + mo.getNumRowsPerBlock() + "," + mo.getNumColumnsPerBlock() + "] to " + input3.getName());
+ else
+ if(object instanceof MatrixObject)
+ throw new DMLRuntimeException("Unexpected formats while copying: from matrix blocks ["
+ + ((MatrixObject)object).getNumRowsPerBlock() + "," + ((MatrixObject)object).getNumColumnsPerBlock() + "] to " + input3.getName());
+ else if (object instanceof FrameObject)
+ throw new DMLRuntimeException("Unexpected formats while copying: from fram object ["
+ + ((FrameObject)object).getNumColumns() + "," + ((FrameObject)object).getNumColumns() + "] to " + input3.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
index 26be3eb..eaf23d5 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
@@ -19,35 +19,14 @@
package org.apache.sysml.runtime.instructions.spark;
-import java.util.ArrayList;
-import java.util.Iterator;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
-import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
-import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
-import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
-public class AppendMSPInstruction extends BinarySPInstruction
+public abstract class AppendMSPInstruction extends BinarySPInstruction
{
- private CPOperand _offset = null;
- private boolean _cbind = true;
+ protected CPOperand _offset = null;
+ protected boolean _cbind = true;
public AppendMSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr)
{
@@ -56,235 +35,5 @@ public class AppendMSPInstruction extends BinarySPInstruction
_offset = offset;
_cbind = cbind;
}
-
- public static AppendMSPInstruction parseInstruction ( String str )
- throws DMLRuntimeException
- {
- String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
- InstructionUtils.checkNumFields (parts, 5);
-
- String opcode = parts[0];
- CPOperand in1 = new CPOperand(parts[1]);
- CPOperand in2 = new CPOperand(parts[2]);
- CPOperand offset = new CPOperand(parts[3]);
- CPOperand out = new CPOperand(parts[4]);
- boolean cbind = Boolean.parseBoolean(parts[5]);
-
- if(!opcode.equalsIgnoreCase("mappend"))
- throw new DMLRuntimeException("Unknown opcode while parsing a AppendMSPInstruction: " + str);
-
- return new AppendMSPInstruction(
- new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)),
- in1, in2, offset, out, cbind, opcode, str);
- }
-
- @Override
- public void processInstruction(ExecutionContext ec)
- throws DMLRuntimeException
- {
- // map-only append (rhs must be vector and fit in mapper mem)
- SparkExecutionContext sec = (SparkExecutionContext)ec;
- checkBinaryAppendInputCharacteristics(sec, _cbind, false, false);
- MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName());
- MatrixCharacteristics mc2 = sec.getMatrixCharacteristics(input2.getName());
- int brlen = mc1.getRowsPerBlock();
- int bclen = mc1.getColsPerBlock();
-
- JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
- PartitionedBroadcast<MatrixBlock> in2 = sec.getBroadcastForVariable( input2.getName() );
- long off = sec.getScalarInput( _offset.getName(), _offset.getValueType(), _offset.isLiteral()).getLongValue();
-
- //execute map-append operations (partitioning preserving if #in-blocks = #out-blocks)
- JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
- if( preservesPartitioning(mc1, mc2, _cbind) ) {
- out = in1.mapPartitionsToPair(
- new MapSideAppendPartitionFunction(in2, _cbind, off, brlen, bclen), true);
- }
- else {
- out = in1.flatMapToPair(
- new MapSideAppendFunction(in2, _cbind, off, brlen, bclen));
- }
-
- //put output RDD handle into symbol table
- updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
- sec.setRDDHandleForVariable(output.getName(), out);
- sec.addLineageRDD(output.getName(), input1.getName());
- sec.addLineageBroadcast(output.getName(), input2.getName());
- }
-
- /**
- *
- * @param mcIn1
- * @param mcIn2
- * @return
- */
- private boolean preservesPartitioning( MatrixCharacteristics mcIn1, MatrixCharacteristics mcIn2, boolean cbind )
- {
- long ncblksIn1 = cbind ?
- (long)Math.ceil((double)mcIn1.getCols()/mcIn1.getColsPerBlock()) :
- (long)Math.ceil((double)mcIn1.getRows()/mcIn1.getRowsPerBlock());
- long ncblksOut = cbind ?
- (long)Math.ceil(((double)mcIn1.getCols()+mcIn2.getCols())/mcIn1.getColsPerBlock()) :
- (long)Math.ceil(((double)mcIn1.getRows()+mcIn2.getRows())/mcIn1.getRowsPerBlock());
-
- //mappend is partitioning-preserving if in-block append (e.g., common case of colvector append)
- return (ncblksIn1 == ncblksOut);
- }
-
- /**
- *
- */
- private static class MapSideAppendFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock>
- {
- private static final long serialVersionUID = 2738541014432173450L;
-
- private PartitionedBroadcast<MatrixBlock> _pm = null;
- private boolean _cbind = true;
- private long _offset;
- private int _brlen;
- private int _bclen;
- private long _lastBlockColIndex;
-
- public MapSideAppendFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)
- {
- _pm = binput;
- _cbind = cbind;
-
- _offset = offset;
- _brlen = brlen;
- _bclen = bclen;
-
- //check for boundary block
- int blen = cbind ? bclen : brlen;
- _lastBlockColIndex = (long)Math.ceil((double)_offset/blen);
- }
-
- @Override
- public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv)
- throws Exception
- {
- ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
-
- IndexedMatrixValue in1 = SparkUtils.toIndexedMatrixBlock(kv);
- MatrixIndexes ix = in1.getIndexes();
-
- //case 1: pass through of non-boundary blocks
- if( (_cbind?ix.getColumnIndex():ix.getRowIndex())!=_lastBlockColIndex )
- {
- ret.add( kv );
- }
- //case 2: pass through full input block and rhs block
- else if( _cbind && in1.getValue().getNumColumns() == _bclen
- || !_cbind && in1.getValue().getNumRows() == _brlen)
- {
- //output lhs block
- ret.add( kv );
-
- //output shallow copy of rhs block
- if( _cbind ) {
- ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
- new MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1),
- _pm.getBlock((int)ix.getRowIndex(), 1)) );
- }
- else { //rbind
- ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
- new MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()),
- _pm.getBlock(1, (int)ix.getColumnIndex())) );
- }
- }
- //case 3: append operation on boundary block
- else
- {
- //allocate space for the output value
- ArrayList<IndexedMatrixValue> outlist=new ArrayList<IndexedMatrixValue>(2);
- IndexedMatrixValue first = new IndexedMatrixValue(new MatrixIndexes(ix), new MatrixBlock());
- outlist.add(first);
-
- MatrixBlock value_in2 = null;
- if( _cbind ) {
- value_in2 = _pm.getBlock((int)ix.getRowIndex(), 1);
- if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) {
- IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
- second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1);
- outlist.add(second);
- }
- }
- else { //rbind
- value_in2 = _pm.getBlock(1, (int)ix.getColumnIndex());
- if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) {
- IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
- second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex());
- outlist.add(second);
- }
- }
-
- OperationsOnMatrixValues.performAppend(in1.getValue(), value_in2, outlist, _brlen, _bclen, _cbind, true, 0);
- ret.addAll(SparkUtils.fromIndexedMatrixBlock(outlist));
- }
-
- return ret;
- }
- }
-
- /**
- *
- */
- private static class MapSideAppendPartitionFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixBlock>>, MatrixIndexes, MatrixBlock>
- {
- private static final long serialVersionUID = 5767240739761027220L;
-
- private PartitionedBroadcast<MatrixBlock> _pm = null;
- private boolean _cbind = true;
- private long _lastBlockColIndex = -1;
-
- public MapSideAppendPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)
- {
- _pm = binput;
- _cbind = cbind;
-
- //check for boundary block
- int blen = cbind ? bclen : brlen;
- _lastBlockColIndex = (long)Math.ceil((double)offset/blen);
- }
- @Override
- public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg0)
- throws Exception
- {
- return new MapAppendPartitionIterator(arg0);
- }
-
- /**
- * Lazy mappend iterator to prevent materialization of entire partition output in-memory.
- * The implementation via mapPartitions is required to preserve partitioning information,
- * which is important for performance.
- */
- private class MapAppendPartitionIterator extends LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>>
- {
- public MapAppendPartitionIterator(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> in) {
- super(in);
- }
-
- @Override
- protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
- throws Exception
- {
- MatrixIndexes ix = arg._1();
- MatrixBlock in1 = arg._2();
-
- //case 1: pass through of non-boundary blocks
- if( (_cbind?ix.getColumnIndex():ix.getRowIndex()) != _lastBlockColIndex ) {
- return arg;
- }
- //case 3: append operation on boundary block
- else {
- int rowix = _cbind ? (int)ix.getRowIndex() : 1;
- int colix = _cbind ? 1 : (int)ix.getColumnIndex();
- MatrixBlock in2 = _pm.getBlock(rowix, colix);
- MatrixBlock out = in1.appendOperations(in2, new MatrixBlock(), _cbind);
- return new Tuple2<MatrixIndexes,MatrixBlock>(ix, out);
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
index 93fc7aa..6d3cf5e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
@@ -19,25 +19,13 @@
package org.apache.sysml.runtime.instructions.spark;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
-public class AppendRSPInstruction extends BinarySPInstruction
+
+public abstract class AppendRSPInstruction extends BinarySPInstruction
{
- private boolean _cbind = true;
+ protected boolean _cbind = true;
public AppendRSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, boolean cbind, String opcode, String istr)
{
@@ -45,72 +33,5 @@ public class AppendRSPInstruction extends BinarySPInstruction
_sptype = SPINSTRUCTION_TYPE.RAppend;
_cbind = cbind;
}
-
- public static AppendRSPInstruction parseInstruction ( String str )
- throws DMLRuntimeException
- {
- String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
- InstructionUtils.checkNumFields (parts, 4);
-
- String opcode = parts[0];
- CPOperand in1 = new CPOperand(parts[1]);
- CPOperand in2 = new CPOperand(parts[2]);
- CPOperand out = new CPOperand(parts[3]);
- boolean cbind = Boolean.parseBoolean(parts[4]);
-
- if(!opcode.equalsIgnoreCase("rappend"))
- throw new DMLRuntimeException("Unknown opcode while parsing a AppendRSPInstruction: " + str);
-
- return new AppendRSPInstruction(
- new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)),
- in1, in2, out, cbind, opcode, str);
- }
-
- @Override
- public void processInstruction(ExecutionContext ec)
- throws DMLRuntimeException
- {
- // reduce-only append (output must have at most one column block)
- SparkExecutionContext sec = (SparkExecutionContext)ec;
- checkBinaryAppendInputCharacteristics(sec, _cbind, true, false);
-
- JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
- JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
-
- //execute reduce-append operations (partitioning preserving)
- JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1
- .join(in2)
- .mapValues(new ReduceSideAppendFunction(_cbind));
-
- //put output RDD handle into symbol table
- updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
- sec.setRDDHandleForVariable(output.getName(), out);
- sec.addLineageRDD(output.getName(), input1.getName());
- sec.addLineageRDD(output.getName(), input2.getName());
- }
-
- /**
- *
- */
- private static class ReduceSideAppendFunction implements Function<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock>
- {
- private static final long serialVersionUID = -6763904972560309095L;
-
- private boolean _cbind = true;
-
- public ReduceSideAppendFunction(boolean cbind) {
- _cbind = cbind;
- }
-
- @Override
- public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0)
- throws Exception
- {
- MatrixBlock left = arg0._1();
- MatrixBlock right = arg0._2();
-
- return left.appendOperations(right, new MatrixBlock(), _cbind);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
new file mode 100644
index 0000000..b67f364
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import java.util.Iterator;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import scala.Tuple2;
+
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class FrameAppendMSPInstruction extends AppendMSPInstruction
+{
+ public FrameAppendMSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr)
+ {
+ super(op, in1, in2, offset, out, cbind, opcode, istr);
+ }
+
+ public static FrameAppendMSPInstruction parseInstruction ( String str )
+ throws DMLRuntimeException
+ {
+ String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+ InstructionUtils.checkNumFields (parts, 5);
+
+ String opcode = parts[0];
+ CPOperand in1 = new CPOperand(parts[1]);
+ CPOperand in2 = new CPOperand(parts[2]);
+ CPOperand offset = new CPOperand(parts[3]);
+ CPOperand out = new CPOperand(parts[4]);
+ boolean cbind = Boolean.parseBoolean(parts[5]);
+
+ if(!opcode.equalsIgnoreCase("mappend"))
+ throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendMSPInstruction: " + str);
+
+ return new FrameAppendMSPInstruction(
+ new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)),
+ in1, in2, offset, out, cbind, opcode, str);
+ }
+
+ @Override
+ public void processInstruction(ExecutionContext ec)
+ throws DMLRuntimeException
+ {
+ // map-only append (rhs must be vector and fit in mapper mem)
+ SparkExecutionContext sec = (SparkExecutionContext)ec;
+ checkBinaryAppendInputCharacteristics(sec, _cbind, false, false);
+
+ JavaPairRDD<Long,FrameBlock> in1 = sec.getFrameBinaryBlockRDDHandleForVariable( input1.getName() );
+ PartitionedBroadcast<FrameBlock> in2 = sec.getBroadcastForFrameVariable( input2.getName() );
+
+ //execute map-append operations (partitioning preserving if keys for blocks not changing)
+ JavaPairRDD<Long,FrameBlock> out = null;
+ if( preservesPartitioning(_cbind) ) {
+ out = in1.mapPartitionsToPair(
+ new MapSideAppendPartitionFunction(in2), true);
+ }
+ else
+ throw new DMLRuntimeException("Append type rbind not supported for frame mappend, instead use rappend");
+
+ //put output RDD handle into symbol table
+ updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+ sec.setRDDHandleForVariable(output.getName(), out);
+ sec.addLineageRDD(output.getName(), input1.getName());
+ sec.addLineageBroadcast(output.getName(), input2.getName());
+ }
+
+ /**
+ *
+ * @param cbind
+ * @return
+ */
+ private boolean preservesPartitioning( boolean cbind )
+ {
+ //Partitions for input1 will be preserved in case of cbind,
+ // where as in case of rbind partitions will not be preserved.
+ return cbind;
+ }
+
+ /**
+ *
+ */
+ private static class MapSideAppendPartitionFunction implements PairFlatMapFunction<Iterator<Tuple2<Long,FrameBlock>>, Long, FrameBlock>
+ {
+ private static final long serialVersionUID = -3997051891171313830L;
+
+ private PartitionedBroadcast<FrameBlock> _pm = null;
+
+ public MapSideAppendPartitionFunction(PartitionedBroadcast<FrameBlock> binput)
+ {
+ _pm = binput;
+ }
+
+ @Override
+ public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Long, FrameBlock>> arg0)
+ throws Exception
+ {
+ return new MapAppendPartitionIterator(arg0);
+ }
+
+ /**
+ * Lazy mappend iterator to prevent materialization of entire partition output in-memory.
+ * The implementation via mapPartitions is required to preserve partitioning information,
+ * which is important for performance.
+ */
+ private class MapAppendPartitionIterator extends LazyIterableIterator<Tuple2<Long, FrameBlock>>
+ {
+ public MapAppendPartitionIterator(Iterator<Tuple2<Long, FrameBlock>> in) {
+ super(in);
+ }
+
+ @Override
+ protected Tuple2<Long, FrameBlock> computeNext(Tuple2<Long, FrameBlock> arg)
+ throws Exception
+ {
+ Long ix = arg._1();
+ FrameBlock in1 = arg._2();
+
+ int rowix = (ix.intValue()-1)/OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1;
+ int colix = 1;
+
+ FrameBlock in2 = _pm.getBlock(rowix, colix);
+ FrameBlock out = in1.appendOperations(in2, new FrameBlock(), true); //cbind
+ return new Tuple2<Long,FrameBlock>(ix, out);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
new file mode 100644
index 0000000..c627e40
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+
+import scala.Tuple2;
+
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class FrameAppendRSPInstruction extends AppendRSPInstruction
+{
+ public FrameAppendRSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, boolean cbind, String opcode, String istr)
+ {
+ super(op, in1, in2, out, cbind, opcode, istr);
+ }
+
+ public static FrameAppendRSPInstruction parseInstruction ( String str )
+ throws DMLRuntimeException
+ {
+ String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+ InstructionUtils.checkNumFields (parts, 4);
+
+ String opcode = parts[0];
+ CPOperand in1 = new CPOperand(parts[1]);
+ CPOperand in2 = new CPOperand(parts[2]);
+ CPOperand out = new CPOperand(parts[3]);
+ boolean cbind = Boolean.parseBoolean(parts[4]);
+
+ if(!opcode.equalsIgnoreCase("rappend"))
+ throw new DMLRuntimeException("Unknown opcode while parsing a FrameAppendRSPInstruction: " + str);
+
+ return new FrameAppendRSPInstruction(
+ new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)),
+ in1, in2, out, cbind, opcode, str);
+ }
+
+ @Override
+ public void processInstruction(ExecutionContext ec)
+ throws DMLRuntimeException
+ {
+ SparkExecutionContext sec = (SparkExecutionContext)ec;
+ JavaPairRDD<Long,FrameBlock> in1 = sec.getFrameBinaryBlockRDDHandleForVariable( input1.getName() );
+ JavaPairRDD<Long,FrameBlock> in2 = sec.getFrameBinaryBlockRDDHandleForVariable( input2.getName() );
+ JavaPairRDD<Long,FrameBlock> out = null;
+ long leftRows = sec.getMatrixCharacteristics(input1.getName()).getRows();
+
+ if(_cbind) {
+ JavaPairRDD<Long,FrameBlock> in1Aligned = in1.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
+ in1Aligned = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey(in1Aligned);
+ JavaPairRDD<Long,FrameBlock> in2Aligned = in2.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
+ in2Aligned = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey(in2Aligned);
+
+ out = in1Aligned.join(in2Aligned).mapValues(new ReduceSideColumnsFunction(_cbind));
+ } else { //rbind
+ JavaPairRDD<Long,FrameBlock> right = in2.mapToPair( new ReduceSideAppendRowsFunction(leftRows));
+ out = in1.union(right);
+ }
+
+ //put output RDD handle into symbol table
+ updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+ sec.setRDDHandleForVariable(output.getName(), out);
+ sec.addLineageRDD(output.getName(), input1.getName());
+ sec.addLineageRDD(output.getName(), input2.getName());
+ }
+
+ /**
+ *
+ */
+ private static class ReduceSideColumnsFunction implements Function<Tuple2<FrameBlock, FrameBlock>, FrameBlock>
+ {
+ private static final long serialVersionUID = -97824903649667646L;
+
+ private boolean _cbind = true;
+
+ public ReduceSideColumnsFunction(boolean cbind) {
+ _cbind = cbind;
+ }
+
+ @Override
+ public FrameBlock call(Tuple2<FrameBlock, FrameBlock> arg0)
+ throws Exception
+ {
+ FrameBlock left = arg0._1();
+ FrameBlock right = arg0._2();
+
+ return left.appendOperations(right, new FrameBlock(), _cbind);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ReduceSideAppendRowsFunction implements PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock>
+ {
+ private static final long serialVersionUID = 1723795153048336791L;
+
+ private long _offset;
+
+ public ReduceSideAppendRowsFunction(long offset) {
+ _offset = offset;
+ }
+
+ @Override
+ public Tuple2<Long,FrameBlock> call(Tuple2<Long, FrameBlock> arg0)
+ throws Exception
+ {
+ return new Tuple2<Long, FrameBlock>(arg0._1()+_offset, arg0._2());
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ReduceSideAppendAlignFunction implements PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock>
+ {
+ private static final long serialVersionUID = 5850400295183766409L;
+
+ private long _rows;
+
+ public ReduceSideAppendAlignFunction(long rows) {
+ _rows = rows;
+ }
+
+ @Override
+ public Tuple2<Long,FrameBlock> call(Tuple2<Long, FrameBlock> arg0)
+ throws Exception
+ {
+ FrameBlock resultBlock = new FrameBlock(arg0._2().getSchema());
+
+ long index = (arg0._1()/OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE)*OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1;
+ int maxRows = (int) (_rows - index+1 >= OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE?OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE:_rows - index+1);
+
+ resultBlock.ensureAllocatedColumns(maxRows);
+ resultBlock = resultBlock.leftIndexingOperations(arg0._2(), 0, maxRows-1, 0, arg0._2().getNumColumns()-1, new FrameBlock());
+
+ return new Tuple2<Long, FrameBlock>(index, resultBlock);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
new file mode 100644
index 0000000..9e557bb
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
+import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class MatrixAppendMSPInstruction extends AppendMSPInstruction
+{
+ public MatrixAppendMSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr)
+ {
+ super(op, in1, in2, offset, out, cbind, opcode, istr);
+ }
+
+ public static MatrixAppendMSPInstruction parseInstruction ( String str )
+ throws DMLRuntimeException
+ {
+ String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+ InstructionUtils.checkNumFields (parts, 5);
+
+ String opcode = parts[0];
+ CPOperand in1 = new CPOperand(parts[1]);
+ CPOperand in2 = new CPOperand(parts[2]);
+ CPOperand offset = new CPOperand(parts[3]);
+ CPOperand out = new CPOperand(parts[4]);
+ boolean cbind = Boolean.parseBoolean(parts[5]);
+
+ if(!opcode.equalsIgnoreCase("mappend"))
+ throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendMSPInstruction: " + str);
+
+ return new MatrixAppendMSPInstruction(
+ new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)),
+ in1, in2, offset, out, cbind, opcode, str);
+ }
+
+ @Override
+ public void processInstruction(ExecutionContext ec)
+ throws DMLRuntimeException
+ {
+ // map-only append (rhs must be vector and fit in mapper mem)
+ SparkExecutionContext sec = (SparkExecutionContext)ec;
+ checkBinaryAppendInputCharacteristics(sec, _cbind, false, false);
+ MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName());
+ MatrixCharacteristics mc2 = sec.getMatrixCharacteristics(input2.getName());
+ int brlen = mc1.getRowsPerBlock();
+ int bclen = mc1.getColsPerBlock();
+
+ JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
+ PartitionedBroadcast<MatrixBlock> in2 = sec.getBroadcastForVariable( input2.getName() );
+ long off = sec.getScalarInput( _offset.getName(), _offset.getValueType(), _offset.isLiteral()).getLongValue();
+
+ //execute map-append operations (partitioning preserving if #in-blocks = #out-blocks)
+ JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
+ if( preservesPartitioning(mc1, mc2, _cbind) ) {
+ out = in1.mapPartitionsToPair(
+ new MapSideAppendPartitionFunction(in2, _cbind, off, brlen, bclen), true);
+ }
+ else {
+ out = in1.flatMapToPair(
+ new MapSideAppendFunction(in2, _cbind, off, brlen, bclen));
+ }
+
+ //put output RDD handle into symbol table
+ updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+ sec.setRDDHandleForVariable(output.getName(), out);
+ sec.addLineageRDD(output.getName(), input1.getName());
+ sec.addLineageBroadcast(output.getName(), input2.getName());
+ }
+
+ /**
+ *
+ * @param mcIn1
+ * @param mcIn2
+ * @return
+ */
+ private boolean preservesPartitioning( MatrixCharacteristics mcIn1, MatrixCharacteristics mcIn2, boolean cbind )
+ {
+ long ncblksIn1 = cbind ?
+ (long)Math.ceil((double)mcIn1.getCols()/mcIn1.getColsPerBlock()) :
+ (long)Math.ceil((double)mcIn1.getRows()/mcIn1.getRowsPerBlock());
+ long ncblksOut = cbind ?
+ (long)Math.ceil(((double)mcIn1.getCols()+mcIn2.getCols())/mcIn1.getColsPerBlock()) :
+ (long)Math.ceil(((double)mcIn1.getRows()+mcIn2.getRows())/mcIn1.getRowsPerBlock());
+
+ //mappend is partitioning-preserving if in-block append (e.g., common case of colvector append)
+ return (ncblksIn1 == ncblksOut);
+ }
+
+ /**
+ *
+ */
+ private static class MapSideAppendFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, MatrixBlock>
+ {
+ private static final long serialVersionUID = 2738541014432173450L;
+
+ private PartitionedBroadcast<MatrixBlock> _pm = null;
+ private boolean _cbind = true;
+ private long _offset;
+ private int _brlen;
+ private int _bclen;
+ private long _lastBlockColIndex;
+
+ public MapSideAppendFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)
+ {
+ _pm = binput;
+ _cbind = cbind;
+
+ _offset = offset;
+ _brlen = brlen;
+ _bclen = bclen;
+
+ //check for boundary block
+ int blen = cbind ? bclen : brlen;
+ _lastBlockColIndex = (long)Math.ceil((double)_offset/blen);
+ }
+
+ @Override
+ public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv)
+ throws Exception
+ {
+ ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
+
+ IndexedMatrixValue in1 = SparkUtils.toIndexedMatrixBlock(kv);
+ MatrixIndexes ix = in1.getIndexes();
+
+ //case 1: pass through of non-boundary blocks
+ if( (_cbind?ix.getColumnIndex():ix.getRowIndex())!=_lastBlockColIndex )
+ {
+ ret.add( kv );
+ }
+ //case 2: pass through full input block and rhs block
+ else if( _cbind && in1.getValue().getNumColumns() == _bclen
+ || !_cbind && in1.getValue().getNumRows() == _brlen)
+ {
+ //output lhs block
+ ret.add( kv );
+
+ //output shallow copy of rhs block
+ if( _cbind ) {
+ ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
+ new MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1),
+ _pm.getBlock((int)ix.getRowIndex(), 1)) );
+ }
+ else { //rbind
+ ret.add( new Tuple2<MatrixIndexes, MatrixBlock>(
+ new MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()),
+ _pm.getBlock(1, (int)ix.getColumnIndex())) );
+ }
+ }
+ //case 3: append operation on boundary block
+ else
+ {
+ //allocate space for the output value
+ ArrayList<IndexedMatrixValue> outlist=new ArrayList<IndexedMatrixValue>(2);
+ IndexedMatrixValue first = new IndexedMatrixValue(new MatrixIndexes(ix), new MatrixBlock());
+ outlist.add(first);
+
+ MatrixBlock value_in2 = null;
+ if( _cbind ) {
+ value_in2 = _pm.getBlock((int)ix.getRowIndex(), 1);
+ if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) {
+ IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
+ second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1);
+ outlist.add(second);
+ }
+ }
+ else { //rbind
+ value_in2 = _pm.getBlock(1, (int)ix.getColumnIndex());
+ if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) {
+ IndexedMatrixValue second=new IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
+ second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex());
+ outlist.add(second);
+ }
+ }
+
+ OperationsOnMatrixValues.performAppend(in1.getValue(), value_in2, outlist, _brlen, _bclen, _cbind, true, 0);
+ ret.addAll(SparkUtils.fromIndexedMatrixBlock(outlist));
+ }
+
+ return ret;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class MapSideAppendPartitionFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixBlock>>, MatrixIndexes, MatrixBlock>
+ {
+ private static final long serialVersionUID = 5767240739761027220L;
+
+ private PartitionedBroadcast<MatrixBlock> _pm = null;
+ private boolean _cbind = true;
+ private long _lastBlockColIndex = -1;
+
+ public MapSideAppendPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, boolean cbind, long offset, int brlen, int bclen)
+ {
+ _pm = binput;
+ _cbind = cbind;
+
+ //check for boundary block
+ int blen = cbind ? bclen : brlen;
+ _lastBlockColIndex = (long)Math.ceil((double)offset/blen);
+ }
+
+ @Override
+ public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg0)
+ throws Exception
+ {
+ return new MapAppendPartitionIterator(arg0);
+ }
+
+ /**
+ * Lazy mappend iterator to prevent materialization of entire partition output in-memory.
+ * The implementation via mapPartitions is required to preserve partitioning information,
+ * which is important for performance.
+ */
+ private class MapAppendPartitionIterator extends LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>>
+ {
+ public MapAppendPartitionIterator(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> in) {
+ super(in);
+ }
+
+ @Override
+ protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
+ throws Exception
+ {
+ MatrixIndexes ix = arg._1();
+ MatrixBlock in1 = arg._2();
+
+ //case 1: pass through of non-boundary blocks
+ if( (_cbind?ix.getColumnIndex():ix.getRowIndex()) != _lastBlockColIndex ) {
+ return arg;
+ }
+ //case 3: append operation on boundary block
+ else {
+ int rowix = _cbind ? (int)ix.getRowIndex() : 1;
+ int colix = _cbind ? 1 : (int)ix.getColumnIndex();
+ MatrixBlock in2 = _pm.getBlock(rowix, colix);
+ MatrixBlock out = in1.appendOperations(in2, new MatrixBlock(), _cbind);
+ return new Tuple2<MatrixIndexes,MatrixBlock>(ix, out);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
new file mode 100644
index 0000000..644fcd2
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class MatrixAppendRSPInstruction extends AppendRSPInstruction
+{
+ public MatrixAppendRSPInstruction(Operator op, CPOperand in1, CPOperand in2, CPOperand out, boolean cbind, String opcode, String istr)
+ {
+ super(op, in1, in2, out, cbind, opcode, istr);
+ }
+
+ public static MatrixAppendRSPInstruction parseInstruction ( String str )
+ throws DMLRuntimeException
+ {
+ String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+ InstructionUtils.checkNumFields (parts, 4);
+
+ String opcode = parts[0];
+ CPOperand in1 = new CPOperand(parts[1]);
+ CPOperand in2 = new CPOperand(parts[2]);
+ CPOperand out = new CPOperand(parts[3]);
+ boolean cbind = Boolean.parseBoolean(parts[4]);
+
+ if(!opcode.equalsIgnoreCase("rappend"))
+ throw new DMLRuntimeException("Unknown opcode while parsing a MatrixAppendRSPInstruction: " + str);
+
+ return new MatrixAppendRSPInstruction(
+ new ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)),
+ in1, in2, out, cbind, opcode, str);
+ }
+
+ @Override
+ public void processInstruction(ExecutionContext ec)
+ throws DMLRuntimeException
+ {
+ // reduce-only append (output must have at most one column block)
+ SparkExecutionContext sec = (SparkExecutionContext)ec;
+ checkBinaryAppendInputCharacteristics(sec, _cbind, true, false);
+
+ JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
+ JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
+
+ //execute reduce-append operations (partitioning preserving)
+ JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1
+ .join(in2)
+ .mapValues(new ReduceSideAppendFunction(_cbind));
+
+ //put output RDD handle into symbol table
+ updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+ sec.setRDDHandleForVariable(output.getName(), out);
+ sec.addLineageRDD(output.getName(), input1.getName());
+ sec.addLineageRDD(output.getName(), input2.getName());
+ }
+
+ /**
+ *
+ */
+ private static class ReduceSideAppendFunction implements Function<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock>
+ {
+ private static final long serialVersionUID = -6763904972560309095L;
+
+ private boolean _cbind = true;
+
+ public ReduceSideAppendFunction(boolean cbind) {
+ _cbind = cbind;
+ }
+
+ @Override
+ public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0)
+ throws Exception
+ {
+ MatrixBlock left = arg0._1();
+ MatrixBlock right = arg0._2();
+
+ return left.appendOperations(right, new MatrixBlock(), _cbind);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 64bc6fe..a4f826c 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -777,6 +777,8 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
ret._schema.addAll(that._schema);
ret._colnames = new ArrayList<String>(_colnames);
ret._colnames.addAll(that._colnames);
+ ret._colmeta = new ArrayList<ColumnMetadata>(_colmeta);
+ ret._colmeta.addAll(that._colmeta);
//concatenate column data (w/ deep copy to prevent side effects)
for( Array tmp : _coldata )
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 6bce4ff..fa17fcd 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -23,7 +23,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.NumItemsByEachReducerMetaData;
@@ -596,4 +599,20 @@ public class UtilFunctions
return Arrays.asList(schema);
}
+
+
+ /*
+ * This function will return datatype, if its Matrix or Frame
+ *
+ * @param str
+ * Instruction string to execute
+ */
+
+ public static DataType getDataType(String str, int index)
+ {
+ String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+ CPOperand in1 = new CPOperand(parts[index]);
+
+ return in1.getDataType();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
new file mode 100644
index 0000000..20c4a27
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.frame;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.BinaryOp;
+import org.apache.sysml.hops.BinaryOp.AppendMethod;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class FrameAppendDistTest extends AutomatedTestBase
+{
+ private final static String TEST_NAME = "FrameAppend";
+ private final static String TEST_DIR = "functions/frame/";
+ private final static String TEST_CLASS_DIR = TEST_DIR + FrameAppendDistTest.class.getSimpleName() + "/";
+
+ private final static double epsilon=0.0000000001;
+ private final static int min=1;
+ private final static int max=100;
+
+ private final static int rows1 = 1692;
+ private final static int rows2 = 1192;
+ //usecase a: inblock single
+ private final static int cols1a = 375;
+ private final static int cols2a = 92;
+ //usecase b: inblock multiple
+ private final static int cols1b = 1059;
+ //usecase c: outblock blocksize
+ private final static int cols1d = 1460;
+ private final static int cols3d = 990;
+
+
+ private final static double sparsity1 = 0.5;
+ private final static double sparsity2 = 0.01;
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME,
+ new String[] {"C"}));
+ }
+
+ @Test
+ public void testAppendInBlock1DenseSP() {
+ commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, cols2a, false, AppendMethod.MR_RAPPEND, false);
+ }
+
+ @Test
+ public void testAppendInBlock1SparseSP() {
+ commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, cols2a, true, AppendMethod.MR_RAPPEND, false);
+ }
+
+ @Test
+ public void testAppendInBlock1DenseRBindSP() {
+ commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows2, cols1a, cols1a, false, AppendMethod.MR_RAPPEND, true);
+ }
+
+ @Test
+ public void testAppendInBlock1SparseRBindSP() {
+ commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, cols1a, true, AppendMethod.MR_RAPPEND, true);
+ }
+
+ //NOTE: mappend only applied for m2_cols<=blocksize
+ @Test
+ public void testMapAppendInBlock2DenseSP() {
+ commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1b, cols2a, false, AppendMethod.MR_MAPPEND, false);
+ }
+
+ @Test
+ public void testMapAppendInBlock2SparseSP() {
+ commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1b, cols2a, true, AppendMethod.MR_MAPPEND, false);
+ }
+
+ @Test
+ public void testMapAppendOutBlock2DenseSP() {
+ commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1d, cols3d, false, AppendMethod.MR_MAPPEND, false);
+ }
+
+ @Test
+ public void testMapAppendOutBlock2SparseSP() {
+ commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1d, cols3d, true, AppendMethod.MR_MAPPEND, false);
+ }
+
+ /**
+ *
+ * @param platform
+ * @param rows
+ * @param cols1
+ * @param cols2
+ * @param sparse
+ */
+ public void commonAppendTest(RUNTIME_PLATFORM platform, int rows1, int rows2, int cols1, int cols2, boolean sparse, AppendMethod forcedAppendMethod, boolean rbind)
+ {
+ TestConfiguration config = getAndLoadTestConfiguration(TEST_NAME);
+
+ RUNTIME_PLATFORM prevPlfm=rtplatform;
+
+ double sparsity = (sparse) ? sparsity2 : sparsity1;
+ boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+
+ try
+ {
+ if(forcedAppendMethod != null) {
+ BinaryOp.FORCED_APPEND_METHOD = forcedAppendMethod;
+ }
+ rtplatform = platform;
+ if( rtplatform == RUNTIME_PLATFORM.SPARK )
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+ config.addVariable("rows", rows1);
+ config.addVariable("cols", cols1);
+
+ /* This is for running the junit test the new way, i.e., construct the arguments directly */
+ String RI_HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = RI_HOME + TEST_NAME + ".dml";
+ programArgs = new String[]{"-explain","-args", input("A"),
+ Long.toString(rows1),
+ Long.toString(cols1),
+ input("B"),
+ Long.toString(rows2),
+ Long.toString(cols2),
+ output("C"),
+ (rbind? "rbind": "cbind")};
+ fullRScriptName = RI_HOME + TEST_NAME + ".R";
+ rCmd = "Rscript" + " " + fullRScriptName + " " +
+ inputDir() + " " + expectedDir() + " " + (rbind? "rbind": "cbind");
+
+ //initialize the frame data.
+ List<ValueType> lschemaA = Arrays.asList(genMixSchema(cols1));
+ double[][] A = getRandomMatrix(rows1, cols1, min, max, sparsity, 1111 /*\\System.currentTimeMillis()*/);
+ writeInputFrameWithMTD("A", A, true, lschemaA, OutputInfo.BinaryBlockOutputInfo);
+
+ List<ValueType> lschemaB = Arrays.asList(genMixSchema(cols2));
+ double[][] B = getRandomMatrix(rows2, cols2, min, max, sparsity, 2345 /*\\System.currentTimeMillis()*/);
+ writeInputFrameWithMTD("B", B, true, lschemaB, OutputInfo.BinaryBlockOutputInfo);
+
+ boolean exceptionExpected = false;
+ int expectedNumberOfJobs = -1;
+ runTest(true, exceptionExpected, null, expectedNumberOfJobs);
+ runRScript(true);
+
+ List<ValueType> lschemaAB = new ArrayList<ValueType>(lschemaA);
+ lschemaAB.addAll(lschemaB);
+
+ for(String file: config.getOutputFiles())
+ {
+ FrameBlock frameBlock = readDMLFrameFromHDFS(file, InputInfo.BinaryBlockInputInfo);
+ MatrixCharacteristics md = new MatrixCharacteristics(frameBlock.getNumRows(), frameBlock.getNumColumns(), -1, -1);
+ FrameBlock frameRBlock = readRFrameFromHDFS(file+".csv", InputInfo.CSVInputInfo, md);
+ verifyFrameData(frameBlock, frameRBlock, (ValueType[]) lschemaAB.toArray(new ValueType[0]));
+ System.out.println("File processed is " + file);
+ }
+ }
+ catch(Exception ex) {
+ ex.printStackTrace();
+ throw new RuntimeException(ex);
+ }
+ finally
+ {
+ //reset execution platform
+ rtplatform = prevPlfm;
+ DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+ BinaryOp.FORCED_APPEND_METHOD = null;
+ }
+ }
+
+ ValueType[] genMixSchema(int cols)
+ {
+ List<ValueType> schemaMixedLargeListStr = Collections.nCopies(cols/4, ValueType.STRING);
+ List<ValueType> schemaMixedLargeListDble = Collections.nCopies(cols/4, ValueType.DOUBLE);
+ List<ValueType> schemaMixedLargeListInt = Collections.nCopies(cols/4, ValueType.INT);
+ List<ValueType> schemaMixedLargeListBool = Collections.nCopies(cols-(cols/4)*3, ValueType.BOOLEAN);
+
+ final List<ValueType> schemaMixedLargeList = new ArrayList<ValueType>(schemaMixedLargeListStr);
+ schemaMixedLargeList.addAll(schemaMixedLargeListDble);
+ schemaMixedLargeList.addAll(schemaMixedLargeListInt);
+ schemaMixedLargeList.addAll(schemaMixedLargeListBool);
+ ValueType[] schemaMixedLarge = new ValueType[schemaMixedLargeList.size()];
+ schemaMixedLarge = (ValueType[]) schemaMixedLargeList.toArray(schemaMixedLarge);
+
+ return schemaMixedLarge;
+ }
+
+ private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, ValueType[] schema) {
+ for ( int i=0; i<frame1.getNumRows(); ++i )
+ for( int j=0; j<frame1.getNumColumns(); j++ ) {
+ Object val1 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame1.get(i, j)));
+ Object val2 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame2.get(i, j)));
+ if( TestUtils.compareToR(schema[j], val1, val2, epsilon) != 0)
+ Assert.fail("The DML data for cell ("+ i + "," + j + ") is " + val1 +
+ ", not same as the R value " + val2);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/scripts/functions/frame/FrameAppend.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/frame/FrameAppend.R b/src/test/scripts/functions/frame/FrameAppend.R
new file mode 100644
index 0000000..f97916d
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameAppend.R
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+options(digits=22)
+library("Matrix")
+
+A=read.csv(paste(args[1], "A.csv", sep=""), header = FALSE, stringsAsFactors=FALSE)
+B=read.csv(paste(args[1], "B.csv", sep=""), header = FALSE, stringsAsFactors=FALSE)
+if(args[3] == "rbind") {
+ C=rbind(A, B)
+} else {
+ C=cbind2(A, B)
+}
+write.csv(C, paste(args[2], "C.csv", sep=""), row.names = FALSE, quote = FALSE)
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/scripts/functions/frame/FrameAppend.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/frame/FrameAppend.dml b/src/test/scripts/functions/frame/FrameAppend.dml
new file mode 100644
index 0000000..eea118e
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameAppend.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A=read($1, data_type="frame", rows=$2, cols=$3, format="binary")
+B=read($4, data_type="frame", rows=$5, cols=$6, format="binary")
+if ($8 == "rbind") {
+ C=rbind(A, B)
+} else {
+ C=cbind(A, B)
+}
+write(C, $7, format="binary")