You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/11/08 21:21:06 UTC
[2/2] systemml git commit: [SYSTEMML-2007] New order builtin w/
multiple order-by columns
[SYSTEMML-2007] New order builtin w/ multiple order-by columns
This patch generalizes the compiler and runtime of the existing order
builtin function to support multiple order-by columns. So far we only
support a single scalar column parameter - now users can either provide
a scalar or matrix argument. This avoids emulating this functionality
via multiple order calls in reverse order of order-by columns, which
works due to stable sort.
This patch modifies the compiler and CP runtime, without degrading
performance for the case of a single order-by column. The spark runtime
will follow in a subsequent patch, as it requires an second sort runtime
to avoid unnecessary overhead in case of a single order-by column.
On a scenario of a 1M x 100 dense matrix, with varying number of
distinct values, and 3 order-by columns, this patch improved performance
(compared to the emulation via 3 order calls) as follows:
1M x 10, distinct=1000: 2,271ms -> 940ms
1M x 10, distinct=100: 2,125ms -> 966ms
1M x 10, distinct=10: 2.057ms -> 958ms
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/ec024661
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/ec024661
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/ec024661
Branch: refs/heads/master
Commit: ec024661a42cdc243addb6feacac026d60313f67
Parents: 578a986
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue Nov 7 20:02:09 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Nov 8 13:22:15 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/sysml/hops/ReorgOp.java | 22 +--
.../ParameterizedBuiltinFunctionExpression.java | 11 +-
.../runtime/functionobjects/SortIndex.java | 28 ++-
.../instructions/cp/ReorgCPInstruction.java | 13 +-
.../instructions/spark/ReorgSPInstruction.java | 26 ++-
.../instructions/spark/utils/RDDSortUtils.java | 2 +-
.../runtime/matrix/data/LibMatrixReorg.java | 122 ++++++++-----
.../sysml/runtime/matrix/data/MatrixBlock.java | 4 +-
.../sysml/runtime/util/DataConverter.java | 55 +++---
.../reorg/MultipleOrderByColsTest.java | 179 +++++++++++++++++++
src/test/scripts/functions/reorg/OrderMultiBy.R | 42 +++++
.../scripts/functions/reorg/OrderMultiBy.dml | 33 ++++
12 files changed, 428 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/main/java/org/apache/sysml/hops/ReorgOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ReorgOp.java b/src/main/java/org/apache/sysml/hops/ReorgOp.java
index ef83253..4b55c9b 100644
--- a/src/main/java/org/apache/sysml/hops/ReorgOp.java
+++ b/src/main/java/org/apache/sysml/hops/ReorgOp.java
@@ -51,11 +51,8 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
public class ReorgOp extends Hop implements MultiThreadedHop
{
-
public static boolean FORCE_DIST_SORT_INDEXES = false;
- public boolean bSortSPRewriteApplicable = false;
-
private ReOrgOp op;
private int _maxNumThreads = -1; //-1 for unlimited
@@ -371,18 +368,21 @@ public class ReorgOp extends Hop implements MultiThreadedHop
setLops( mmult.constructLops() );
//cleanups
- HopRewriteUtils.removeChildReference(table, input);
+ HopRewriteUtils.removeChildReference(table, input);
}
}
- else //CP or Spark
+ else if( et==ExecType.SPARK ) {
+ boolean sortRewrite = !FORCE_DIST_SORT_INDEXES && isSortSPRewriteApplicable();
+ Lop transform1 = constructCPOrSparkSortLop(input, by, desc, ixret, et, sortRewrite);
+ setOutputDimensions(transform1);
+ setLineNumbers(transform1);
+ setLops(transform1);
+ }
+ else //CP
{
- if( et==ExecType.SPARK && !FORCE_DIST_SORT_INDEXES)
- bSortSPRewriteApplicable = isSortSPRewriteApplicable();
-
- Lop transform1 = constructCPOrSparkSortLop(input, by, desc, ixret, et, bSortSPRewriteApplicable);
+ Lop transform1 = constructCPOrSparkSortLop(input, by, desc, ixret, et, false);
setOutputDimensions(transform1);
setLineNumbers(transform1);
-
setLops(transform1);
}
break;
@@ -402,7 +402,7 @@ public class ReorgOp extends Hop implements MultiThreadedHop
throws HopsException, LopsException
{
Transform transform1 = new Transform( input.constructLops(), HopsTransf2Lops.get(ReOrgOp.SORT),
- input.getDataType(), input.getValueType(), et, bSortIndInMem);
+ input.getDataType(), input.getValueType(), et, bSortIndInMem);
for( Hop c : new Hop[]{by,desc,ixret} ) {
Lop ltmp = c.constructLops();
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
index ad95552..b365abb 100644
--- a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
@@ -444,15 +444,16 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
orderby = new IntIdentifier(1);
addVarParam("by", orderby);
}
- else if( orderby !=null && orderby.getOutput().getDataType() != DataType.SCALAR ){
- raiseValidateError("Orderby column 'by' is of type '"+orderby.getOutput().getDataType()+"'. Please, specify a scalar order by column index.", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
- }
+ else if( orderby !=null && !(orderby.getOutput().getDataType().isScalar()
+ || orderby.getOutput().getDataType().isMatrix()) ) {
+ raiseValidateError("Orderby column 'by' is of type '"+orderby.getOutput().getDataType()+"'. Please, use a scalar or row vector to specify column indexes.", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+ }
Expression decreasing = getVarParam("decreasing"); //[OPTIONAL] DECREASING
if( decreasing == null ) { //default: ascending
addVarParam("decreasing", new BooleanIdentifier(false));
}
- else if( decreasing!=null && decreasing.getOutput().getDataType() != DataType.SCALAR ){
+ else if( decreasing!=null && decreasing.getOutput().getDataType() != DataType.SCALAR ){
raiseValidateError("Ordering 'decreasing' is of type '"+decreasing.getOutput().getDataType()+"', '"+decreasing.getOutput().getValueType()+"'. Please, specify 'decreasing' as a scalar boolean.", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}
@@ -461,7 +462,7 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
indexreturn = new BooleanIdentifier(false);
addVarParam("index.return", indexreturn);
}
- else if( indexreturn!=null && indexreturn.getOutput().getDataType() != DataType.SCALAR ){
+ else if( indexreturn!=null && indexreturn.getOutput().getDataType() != DataType.SCALAR ){
raiseValidateError("Return type 'index.return' is of type '"+indexreturn.getOutput().getDataType()+"', '"+indexreturn.getOutput().getValueType()+"'. Please, specify 'indexreturn' as a scalar boolean.", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
}
long dim2 = ( indexreturn instanceof BooleanIdentifier ) ?
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/main/java/org/apache/sysml/runtime/functionobjects/SortIndex.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/functionobjects/SortIndex.java b/src/main/java/org/apache/sysml/runtime/functionobjects/SortIndex.java
index c72735a..0d6d481 100644
--- a/src/main/java/org/apache/sysml/runtime/functionobjects/SortIndex.java
+++ b/src/main/java/org/apache/sysml/runtime/functionobjects/SortIndex.java
@@ -34,26 +34,22 @@ public class SortIndex extends IndexFunction
{
private static final long serialVersionUID = -8446389232078905200L;
- private int _col = -1;
- private boolean _decreasing = false;
- private boolean _ixreturn = false;
+ private final int[] _cols;
+ private final boolean _decreasing;
+ private final boolean _ixreturn;
- private SortIndex() {
- // nothing to do here
+ public SortIndex(int col, boolean decreasing, boolean indexreturn) {
+ this(new int[]{col}, decreasing, indexreturn);
}
-
- public static SortIndex getSortIndexFnObject(int col, boolean decreasing, boolean indexreturn)
- {
- SortIndex ix = new SortIndex();
- ix._col = col;
- ix._decreasing = decreasing;
- ix._ixreturn = indexreturn;
-
- return ix;
+
+ public SortIndex(int[] cols, boolean decreasing, boolean indexreturn) {
+ _cols = cols;
+ _decreasing = decreasing;
+ _ixreturn = indexreturn;
}
- public int getCol() {
- return _col;
+ public int[] getCols() {
+ return _cols;
}
public boolean getDecreasing() {
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
index 3788d68..96b4644 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
@@ -31,6 +31,7 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysml.runtime.util.DataConverter;
public class ReorgCPInstruction extends UnaryCPInstruction {
// sort-specific attributes (to enable variable attributes)
@@ -115,8 +116,8 @@ public class ReorgCPInstruction extends UnaryCPInstruction {
CPOperand col = new CPOperand(parts[2]);
CPOperand desc = new CPOperand(parts[3]);
CPOperand ixret = new CPOperand(parts[4]);
- return new ReorgCPInstruction(new ReorgOperator(SortIndex.getSortIndexFnObject(1,false,false)),
- in, out, col, desc, ixret, opcode, str);
+ return new ReorgCPInstruction(new ReorgOperator(new SortIndex(1,false,false)),
+ in, out, col, desc, ixret, opcode, str);
}
else {
throw new DMLRuntimeException("Unknown opcode while parsing a ReorgInstruction: " + str);
@@ -132,18 +133,20 @@ public class ReorgCPInstruction extends UnaryCPInstruction {
ReorgOperator r_op = (ReorgOperator) _optr;
if( r_op.fn instanceof SortIndex ) {
//additional attributes for sort
- int col = (int)ec.getScalarInput(_col.getName(), _col.getValueType(), _col.isLiteral()).getLongValue();
+ int[] cols = _col.getDataType().isMatrix() ? DataConverter.convertToIntVector(ec.getMatrixInput(_col.getName())) :
+ new int[]{(int)ec.getScalarInput(_col.getName(), _col.getValueType(), _col.isLiteral()).getLongValue()};
boolean desc = ec.getScalarInput(_desc.getName(), _desc.getValueType(), _desc.isLiteral()).getBooleanValue();
boolean ixret = ec.getScalarInput(_ixret.getName(), _ixret.getValueType(), _ixret.isLiteral()).getBooleanValue();
- r_op = r_op.setFn(SortIndex.getSortIndexFnObject(col, desc, ixret));
+ r_op = r_op.setFn(new SortIndex(cols, desc, ixret));
}
//execute operation
MatrixBlock soresBlock = (MatrixBlock) (matBlock.reorgOperations(r_op, new MatrixBlock(), 0, 0, 0));
//release inputs/outputs
+ if( r_op.fn instanceof SortIndex && _col.getDataType().isMatrix() )
+ ec.releaseMatrixInput(_col.getName());
ec.releaseMatrixInput(input1.getName(), getExtendedOpcode());
ec.setMatrixOutput(output.getName(), soresBlock, getExtendedOpcode());
}
-
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
index f8b92ac..c742a0b 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
@@ -52,6 +52,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+import org.apache.sysml.runtime.util.DataConverter;
import org.apache.sysml.runtime.util.UtilFunctions;
public class ReorgSPInstruction extends UnarySPInstruction {
@@ -103,13 +104,13 @@ public class ReorgSPInstruction extends UnarySPInstruction {
CPOperand col = new CPOperand(parts[2]);
CPOperand desc = new CPOperand(parts[3]);
CPOperand ixret = new CPOperand(parts[4]);
- boolean bSortIndInMem = false;
+ boolean bSortIndInMem = false;
if(parts.length > 5)
bSortIndInMem = Boolean.parseBoolean(parts[6]);
-
- return new ReorgSPInstruction(new ReorgOperator(SortIndex.getSortIndexFnObject(1,false,false)),
- in, col, desc, ixret, out, opcode, bSortIndInMem, str);
+
+ return new ReorgSPInstruction(new ReorgOperator(new SortIndex(1,false,false)),
+ in, col, desc, ixret, out, opcode, bSortIndInMem, str);
}
else {
throw new DMLRuntimeException("Unknown opcode while parsing a ReorgInstruction: " + str);
@@ -156,16 +157,23 @@ public class ReorgSPInstruction extends UnarySPInstruction {
// Sort by column 'col' in ascending/descending order and return either index/value
//get parameters
- long col = ec.getScalarInput(_col.getName(), _col.getValueType(), _col.isLiteral()).getLongValue();
+ long[] cols = _col.getDataType().isMatrix() ? DataConverter.convertToLongVector(ec.getMatrixInput(_col.getName())) :
+ new long[]{ec.getScalarInput(_col.getName(), _col.getValueType(), _col.isLiteral()).getLongValue()};
boolean desc = ec.getScalarInput(_desc.getName(), _desc.getValueType(), _desc.isLiteral()).getBooleanValue();
boolean ixret = ec.getScalarInput(_ixret.getName(), _ixret.getValueType(), _ixret.isLiteral()).getBooleanValue();
boolean singleCol = (mcIn.getCols() == 1);
+ //error handling unsupported operations
+ //TODO additional spark sort runtime with multiple order columns
+ if( cols.length > 1 )
+ LOG.warn("Unsupported sort with multiple order-by columns. Falling back first sort column.");
+ long col = cols[0];
+
// extract column (if necessary) and sort
out = in1;
if( !singleCol ){
out = out.filter(new IsBlockInRange(1, mcIn.getRows(), col, col, mcIn))
- .mapValues(new ExtractColumn((int)UtilFunctions.computeCellInBlock(col, mcIn.getColsPerBlock())));
+ .mapValues(new ExtractColumn((int)UtilFunctions.computeCellInBlock(col, mcIn.getColsPerBlock())));
}
//actual index/data sort operation
@@ -177,8 +185,8 @@ public class ReorgSPInstruction extends UnarySPInstruction {
}
else { //sort multi-column matrix
if (! _bSortIndInMem)
- out = RDDSortUtils.sortDataByVal(out, in1, !desc, mcIn.getRows(), mcIn.getCols(), mcIn.getRowsPerBlock(), mcIn.getColsPerBlock());
- else
+ out = RDDSortUtils.sortDataByVal(out, in1, !desc, mcIn.getRows(), mcIn.getCols(), mcIn.getRowsPerBlock(), mcIn.getColsPerBlock());
+ else
out = RDDSortUtils.sortDataByValMemSort(out, in1, !desc, mcIn.getRows(), mcIn.getCols(), mcIn.getRowsPerBlock(), mcIn.getColsPerBlock(), sec, (ReorgOperator) _optr);
}
}
@@ -187,6 +195,8 @@ public class ReorgSPInstruction extends UnarySPInstruction {
}
//store output rdd handle
+ if( opcode.equalsIgnoreCase("rsort") && _col.getDataType().isMatrix() )
+ sec.releaseMatrixInput(_col.getName());
updateReorgMatrixCharacteristics(sec);
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
index 2fddce3..bf63f0d 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
@@ -175,7 +175,7 @@ public class RDDSortUtils
.toMatrixBlock(val, (int)rlen, 1, brlen, bclen, -1);
//in-memory sort operation (w/ index return: source index in target position)
- ReorgOperator lrop = new ReorgOperator(SortIndex.getSortIndexFnObject(1, !asc, true));
+ ReorgOperator lrop = new ReorgOperator(new SortIndex(1, !asc, true));
MatrixBlock sortedIx = (MatrixBlock) inMatBlock
.reorgOperations(lrop, new MatrixBlock(), -1, -1, -1);
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 28c3bf6..b86dc96 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -114,7 +114,7 @@ public class LibMatrixReorg
return diag(in, out);
case SORT:
SortIndex ix = (SortIndex) op.fn;
- return sort(in, out, ix.getCol(), ix.getDecreasing(), ix.getIndexReturn());
+ return sort(in, out, ix.getCols(), ix.getDecreasing(), ix.getIndexReturn());
default:
throw new DMLRuntimeException("Unsupported reorg operator: "+op.fn);
@@ -317,7 +317,7 @@ public class LibMatrixReorg
return out;
}
- public static MatrixBlock sort(MatrixBlock in, MatrixBlock out, int by, boolean desc, boolean ixret)
+ public static MatrixBlock sort(MatrixBlock in, MatrixBlock out, int[] by, boolean desc, boolean ixret)
throws DMLRuntimeException
{
//meta data gathering and preparation
@@ -328,8 +328,9 @@ public class LibMatrixReorg
out.nonZeros = ixret ? rlen : in.nonZeros;
//step 1: error handling
- if( by <= 0 || clen < by )
- throw new DMLRuntimeException("Sort configuration issue: non-existing orderby column: "+by+" ("+rlen+"x"+clen+" input).");
+ if( !isValidSortByList(by, clen) )
+ throw new DMLRuntimeException("Sort configuration issue: invalid orderby columns: "
+ + Arrays.toString(by)+" ("+rlen+"x"+clen+" input).");
//step 2: empty block / special case handling
if( !ixret ) //SORT DATA
@@ -350,8 +351,9 @@ public class LibMatrixReorg
{
if( in.isEmptyBlock(false) ) { //EMPTY INPUT BLOCK
out.allocateDenseBlock(false);
- for( int i=0; i<rlen; i++ ) //seq(1,n)
- out.setValueDenseUnsafe(i, 0, i+1);
+ double[] c = out.getDenseBlock();
+ for( int i=0; i<rlen; i++ )
+ c[i] = i+1; //seq(1,n)
return out;
}
}
@@ -363,12 +365,16 @@ public class LibMatrixReorg
double[] values = new double[rlen];
for( int i=0; i<rlen; i++ ) {
vix[i] = i;
- values[i] = in.quickGetValue(i, by-1);
+ values[i] = in.quickGetValue(i, by[0]-1);
}
//sort index vector on extracted data (unstable)
SortUtils.sortByValue(0, rlen, values, vix);
-
+
+ //sort by secondary columns if required (in-place)
+ if( by.length > 1 )
+ sortBySecondary(0, rlen, values, vix, in, by, 1);
+
//flip order if descending requested (note that this needs to happen
//before we ensure stable outputs, hence we also flip values)
if(desc) {
@@ -377,42 +383,25 @@ public class LibMatrixReorg
}
//final pass to ensure stable output
- for( int i=0; i<rlen-1; i++ ) {
- double tmp = values[i];
- //determine run of equal values
- int len = 0;
- while( i+len+1<rlen && tmp==values[i+len+1] )
- len++;
- //unstable sort of run indexes (equal value guaranteed)
- if( len>0 ) {
- Arrays.sort(vix, i, i+len+1);
- i += len; //skip processed run
- }
- }
+ sortIndexesStable(0, rlen, values, vix, in, by, 1);
//step 4: create output matrix (guaranteed non-empty, see step 2)
- if( !ixret )
- {
+ if( !ixret ) {
//copy input data in sorted order into result
- if( !sparse ) //DENSE
- {
+ if( !sparse ) { //DENSE
out.allocateDenseBlock(false);
- for( int i=0; i<rlen; i++ ) {
+ for( int i=0; i<rlen; i++ )
System.arraycopy(in.denseBlock, vix[i]*clen, out.denseBlock, i*clen, clen);
- }
}
- else //SPARSE
- {
+ else { //SPARSE
out.allocateSparseRowsBlock(false);
for( int i=0; i<rlen; i++ )
- if( !in.sparseBlock.isEmpty(vix[i]) ) {
+ if( !in.sparseBlock.isEmpty(vix[i]) )
out.sparseBlock.set(i, in.sparseBlock.get(vix[i]),
!SHALLOW_COPY_REORG); //row remains unchanged
- }
}
}
- else
- {
+ else {
//copy sorted index vector into result
out.allocateDenseBlock(false);
for( int i=0; i<rlen; i++ )
@@ -2031,11 +2020,9 @@ public class LibMatrixReorg
*
* @param m1 matrix
*/
- private static void sortReverseDense( MatrixBlock m1 )
- {
+ private static void sortReverseDense( MatrixBlock m1 ) {
int rlen = m1.rlen;
double[] a = m1.denseBlock;
-
for( int i=0; i<rlen/2; i++ ) {
double tmp = a[i];
a[i] = a[rlen - i -1];
@@ -2043,10 +2030,8 @@ public class LibMatrixReorg
}
}
- private static void sortReverseDense( int[] a )
- {
+ private static void sortReverseDense( int[] a ) {
int rlen = a.length;
-
for( int i=0; i<rlen/2; i++ ) {
int tmp = a[i];
a[i] = a[rlen - i -1];
@@ -2054,16 +2039,71 @@ public class LibMatrixReorg
}
}
- private static void sortReverseDense( double[] a )
- {
+ private static void sortReverseDense( double[] a ) {
int rlen = a.length;
-
for( int i=0; i<rlen/2; i++ ) {
double tmp = a[i];
a[i] = a[rlen - i -1];
a[rlen - i - 1] = tmp;
}
}
+
+ private static void sortBySecondary(int rl, int ru, double[] values, int[] vix, MatrixBlock in, int[] by, int off) {
+ //find runs of equal values in current offset and index range
+ //replace value by next column, sort, and recurse until single value
+ for( int i=rl; i<ru-1; i++ ) {
+ double tmp = values[i];
+ //determine run of equal values
+ int len = 0;
+ while( i+len+1<ru && tmp==values[i+len+1] )
+ len++;
+ //temp value replacement and recursive sort
+ if( len > 0 ) {
+ double old = values[i];
+ //extract values of next column
+ for(int j=i; j<i+len+1; j++)
+ values[j] = in.quickGetValue(vix[j], by[off]-1);
+ //sort values, incl recursive decent
+ SortUtils.sortByValue(i, i+len+1, values, vix);
+ if( off+1 < by.length )
+ sortBySecondary(i, i+len+1, values, vix, in, by, off+1);
+ //reset values of previous level
+ Arrays.fill(values, i, i+len+1, old);
+ i += len; //skip processed run
+ }
+ }
+ }
+
+ private static void sortIndexesStable(int rl, int ru, double[] values, int[] vix, MatrixBlock in, int[] by, int off) {
+ for( int i=rl; i<ru-1; i++ ) {
+ double tmp = values[i];
+ //determine run of equal values
+ int len = 0;
+ while( i+len+1<ru && tmp==values[i+len+1] )
+ len++;
+ //temp value replacement and recursive decent
+ if( len > 0 ) {
+ if( off < by.length ) {
+ //extract values of next column
+ for(int j=i; j<i+len+1; j++)
+ values[j] = in.quickGetValue(vix[j], by[off]-1);
+ sortIndexesStable(i, i+len+1, values, vix, in, by, off+1);
+ }
+ else //unstable sort of run indexes (equal value guaranteed)
+ Arrays.sort(vix, i, i+len+1);
+ i += len; //skip processed run
+ }
+ }
+ }
+
+ private static boolean isValidSortByList(int[] by, int clen) {
+ if( by == null || by.length==0 || by.length>clen )
+ return false;
+ for(int i=0; i<by.length; i++)
+ if( by[i] <= 0 || clen < by[i])
+ return false;
+ return true;
+ }
@SuppressWarnings("unused")
private static void countAgg( int[] c, int[] ai, final int len )
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 88efde2..924d6c5 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -3462,7 +3462,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
{
//SPECIAL case (operators with special performance requirements,
//or size-dependent special behavior)
- //currently supported opcodes: r', rdiag, rsort
+ //currently supported opcodes: r', rdiag, rsort, rev
LibMatrixReorg.reorg(this, result, op);
}
else
@@ -4755,7 +4755,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
tdw.quickSetValue(0, 1, zero_wt); //num zeros in input
// Sort td and tw based on values inside td (ascending sort), incl copy into result
- SortIndex sfn = SortIndex.getSortIndexFnObject(1, false, false);
+ SortIndex sfn = new SortIndex(1, false, false);
ReorgOperator rop = new ReorgOperator(sfn);
LibMatrixReorg.reorg(tdw, (MatrixBlock)result, rop);
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index 5a91564..af69e81 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -278,32 +278,47 @@ public class DataConverter
return ret;
}
- public static int[] convertToIntVector( MatrixBlock mb)
- {
+ public static int[] convertToIntVector( MatrixBlock mb) {
int rows = mb.getNumRows();
int cols = mb.getNumColumns();
int[] ret = new int[rows*cols]; //0-initialized
-
-
- if( mb.getNonZeros() > 0 )
- {
- if( mb.isInSparseFormat() )
- {
- Iterator<IJV> iter = mb.getSparseBlockIterator();
- while( iter.hasNext() ) {
- IJV cell = iter.next();
- ret[cell.getI()*cols+cell.getJ()] = (int)cell.getV();
- }
+ if( mb.isEmptyBlock(false) )
+ return ret;
+ if( mb.isInSparseFormat() ) {
+ Iterator<IJV> iter = mb.getSparseBlockIterator();
+ while( iter.hasNext() ) {
+ IJV cell = iter.next();
+ ret[cell.getI()*cols+cell.getJ()] = (int)cell.getV();
}
- else
- {
- //memcopy row major representation if at least 1 non-zero
- for( int i=0, cix=0; i<rows; i++ )
- for( int j=0; j<cols; j++, cix++ )
- ret[cix] = (int)(mb.getValueDenseUnsafe(i, j));
+ }
+ else {
+ //memcopy row major representation if at least 1 non-zero
+ for( int i=0, cix=0; i<rows; i++ )
+ for( int j=0; j<cols; j++, cix++ )
+ ret[cix] = (int)(mb.getValueDenseUnsafe(i, j));
+ }
+ return ret;
+ }
+
+ public static long[] convertToLongVector( MatrixBlock mb) {
+ int rows = mb.getNumRows();
+ int cols = mb.getNumColumns();
+ long[] ret = new long[rows*cols]; //0-initialized
+ if( mb.isEmptyBlock(false) )
+ return ret;
+ if( mb.isInSparseFormat() ) {
+ Iterator<IJV> iter = mb.getSparseBlockIterator();
+ while( iter.hasNext() ) {
+ IJV cell = iter.next();
+ ret[cell.getI()*cols+cell.getJ()] = (int)cell.getV();
}
}
-
+ else {
+ //memcopy row major representation if at least 1 non-zero
+ for( int i=0, cix=0; i<rows; i++ )
+ for( int j=0; j<cols; j++, cix++ )
+ ret[cix] = (int)(mb.getValueDenseUnsafe(i, j));
+ }
return ret;
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/test/java/org/apache/sysml/test/integration/functions/reorg/MultipleOrderByColsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/reorg/MultipleOrderByColsTest.java b/src/test/java/org/apache/sysml/test/integration/functions/reorg/MultipleOrderByColsTest.java
new file mode 100644
index 0000000..67c6487
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/reorg/MultipleOrderByColsTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.reorg;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class MultipleOrderByColsTest extends AutomatedTestBase
+{
+ private final static String TEST_NAME1 = "OrderMultiBy";
+
+ private final static String TEST_DIR = "functions/reorg/";
+ private static final String TEST_CLASS_DIR = TEST_DIR + MultipleOrderByColsTest.class.getSimpleName() + "/";
+ private final static double eps = 1e-10;
+
+ private final static int rows = 1017;
+ private final static int cols = 736;
+ private final static double sparsity1 = 0.7;
+ private final static double sparsity2 = 0.07;
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"B"}));
+ }
+
+ @Test
+ public void testOrderDenseAscDataCP() {
+ runOrderTest(TEST_NAME1, false, false, false, ExecType.CP);
+ }
+
+ @Test
+ public void testOrderDenseAscIxCP() {
+ runOrderTest(TEST_NAME1, false, false, true, ExecType.CP);
+ }
+
+ @Test
+ public void testOrderDenseDescDataCP() {
+ runOrderTest(TEST_NAME1, false, true, false, ExecType.CP);
+ }
+
+ @Test
+ public void testOrderDenseDescIxCP() {
+ runOrderTest(TEST_NAME1, false, true, true, ExecType.CP);
+ }
+
+ @Test
+ public void testOrderSparseAscDataCP() {
+ runOrderTest(TEST_NAME1, true, false, false, ExecType.CP);
+ }
+
+ @Test
+ public void testOrderSparseAscIxCP() {
+ runOrderTest(TEST_NAME1, true, false, true, ExecType.CP);
+ }
+
+ @Test
+ public void testOrderSparseDescDataCP() {
+ runOrderTest(TEST_NAME1, true, true, false, ExecType.CP);
+ }
+
+ @Test
+ public void testOrderSparseDescIxCP() {
+ runOrderTest(TEST_NAME1, true, true, true, ExecType.CP);
+ }
+
+//TODO enable together with additional spark sort runtime
+// @Test
+// public void testOrderDenseAscDataSP() {
+// runOrderTest(TEST_NAME1, false, false, false, ExecType.SPARK);
+// }
+//
+// @Test
+// public void testOrderDenseAscIxSP() {
+// runOrderTest(TEST_NAME1, false, false, true, ExecType.SPARK);
+// }
+//
+// @Test
+// public void testOrderDenseDescDataSP() {
+// runOrderTest(TEST_NAME1, false, true, false, ExecType.SPARK);
+// }
+//
+// @Test
+// public void testOrderDenseDescIxSP() {
+// runOrderTest(TEST_NAME1, false, true, true, ExecType.SPARK);
+// }
+//
+// @Test
+// public void testOrderSparseAscDataSP() {
+// runOrderTest(TEST_NAME1, true, false, false, ExecType.SPARK);
+// }
+//
+// @Test
+// public void testOrderSparseAscIxSP() {
+// runOrderTest(TEST_NAME1, true, false, true, ExecType.SPARK);
+// }
+//
+// @Test
+// public void testOrderSparseDescDataSP() {
+// runOrderTest(TEST_NAME1, true, true, false, ExecType.SPARK);
+// }
+//
+// @Test
+// public void testOrderSparseDescIxSP() {
+// runOrderTest(TEST_NAME1, true, true, true, ExecType.SPARK);
+// }
+
+ private void runOrderTest( String testname, boolean sparse, boolean desc, boolean ixret, ExecType et)
+ {
+ RUNTIME_PLATFORM platformOld = rtplatform;
+ switch( et ){
+ case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+ case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+ default: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+ }
+
+ boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ if( rtplatform == RUNTIME_PLATFORM.SPARK )
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+ try
+ {
+ String TEST_NAME = testname;
+ TestConfiguration config = getTestConfiguration(TEST_NAME);
+ loadTestConfiguration(config);
+
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[]{"-explain","-args", input("A"),
+ String.valueOf(desc).toUpperCase(), String.valueOf(ixret).toUpperCase(), output("B") };
+
+ fullRScriptName = HOME + TEST_NAME + ".R";
+ rCmd = "Rscript" + " " + fullRScriptName + " " + inputDir() + " " +
+ String.valueOf(desc).toUpperCase()+" "+String.valueOf(ixret).toUpperCase()+" "+expectedDir();
+
+ double sparsity = (sparse) ? sparsity2 : sparsity1; //with rounding for duplicates
+ double[][] A = TestUtils.round(getRandomMatrix(rows, cols, -10, 10, sparsity, 7));
+ writeInputMatrixWithMTD("A", A, true);
+
+ runTest(true, false, null, -1);
+ runRScript(true);
+
+ //compare matrices
+ HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("B");
+ HashMap<CellIndex, Double> rfile = readRMatrixFromFS("B");
+ TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R");
+ }
+ finally {
+ rtplatform = platformOld;
+ DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/test/scripts/functions/reorg/OrderMultiBy.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/reorg/OrderMultiBy.R b/src/test/scripts/functions/reorg/OrderMultiBy.R
new file mode 100644
index 0000000..374dad0
--- /dev/null
+++ b/src/test/scripts/functions/reorg/OrderMultiBy.R
@@ -0,0 +1,42 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+args <- commandArgs(TRUE)
+options(digits=22)
+library("Matrix")
+
+A = readMM(paste(args[1], "A.mtx", sep=""))
+desc = as.logical(args[2]);
+ixret = as.logical(args[3]);
+
+col1 = A[,3];
+col2 = A[,7];
+col3 = A[,14];
+
+
+if( ixret ) {
+ B = order(col1, col2, col3, decreasing=desc);
+} else {
+ B = A[order(col1, col2, col3, decreasing=desc),];
+}
+
+writeMM(as(B,"CsparseMatrix"), paste(args[4], "B", sep=""))
http://git-wip-us.apache.org/repos/asf/systemml/blob/ec024661/src/test/scripts/functions/reorg/OrderMultiBy.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/reorg/OrderMultiBy.dml b/src/test/scripts/functions/reorg/OrderMultiBy.dml
new file mode 100644
index 0000000..f6d2246
--- /dev/null
+++ b/src/test/scripts/functions/reorg/OrderMultiBy.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+A = read($1);
+
+ix = matrix("3 7 14", rows=1, cols=3)
+
+#B = order(target=A, by=14, decreasing=$2, index.return=$3);
+#B = order(target=B, by=7, decreasing=$2, index.return=$3);
+#B = order(target=B, by=3, decreasing=$2, index.return=$3);
+
+B = order(target=A, by=ix, decreasing=$2, index.return=$3);
+
+write(B, $4, format="text");
\ No newline at end of file