You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/20 06:45:17 UTC
incubator-systemml git commit: [SYSTEMML-1044] Generalized spark
mapleftindex (left/right broadcasts)
Repository: incubator-systemml
Updated Branches:
refs/heads/master 7d3a50d26 -> 9d1816131
[SYSTEMML-1044] Generalized spark mapleftindex (left/right broadcasts)
This patch generalizes our existing spark mapleftindex to allow not just
the rhs but now also the lhs (under special conditions) to be broadcast.
This can significantly reduce the memory pressure for sparse-dense left
indexing as the sparse left-hand-side matrix might actually be much
smaller in absolute size, although regarding dimensions, the lhs is
always larger than the rhs.
This fixes OOMs of our data generator for descriptive statistics
(SYSTEMML-1044).
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/9d181613
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/9d181613
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/9d181613
Branch: refs/heads/master
Commit: 9d18161318ffea55cb5e700f29eb719be803b5fc
Parents: 7d3a50d
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sun Feb 19 22:05:26 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sun Feb 19 22:05:26 2017 -0800
----------------------------------------------------------------------
.../org/apache/sysml/hops/LeftIndexingOp.java | 52 +++-
.../java/org/apache/sysml/lops/LeftIndex.java | 49 ++--
.../spark/IndexingSPInstruction.java | 6 +-
.../spark/MatrixIndexingSPInstruction.java | 278 ++++++++++---------
.../functions/frame/FrameIndexingDistTest.java | 2 +-
.../indexing/LeftIndexingSparseDenseTest.java | 32 ++-
.../indexing/LeftIndexingSparseSparseTest.java | 10 +-
.../functions/indexing/LeftIndexingTest.java | 2 +-
8 files changed, 256 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9d181613/src/main/java/org/apache/sysml/hops/LeftIndexingOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/LeftIndexingOp.java b/src/main/java/org/apache/sysml/hops/LeftIndexingOp.java
index 93d4da4..c82d63a 100644
--- a/src/main/java/org/apache/sysml/hops/LeftIndexingOp.java
+++ b/src/main/java/org/apache/sysml/hops/LeftIndexingOp.java
@@ -23,6 +23,7 @@ import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.lops.Binary;
import org.apache.sysml.lops.Group;
import org.apache.sysml.lops.LeftIndex;
+import org.apache.sysml.lops.LeftIndex.LixCacheType;
import org.apache.sysml.lops.Lop;
import org.apache.sysml.lops.LopsException;
import org.apache.sysml.lops.RangeBasedReIndex;
@@ -39,8 +40,9 @@ public class LeftIndexingOp extends Hop
public static LeftIndexingMethod FORCED_LEFT_INDEXING = null;
public enum LeftIndexingMethod {
- SP_GLEFTINDEX, // general case
- SP_MLEFTINDEX //map-only left index where we broadcast right hand side matrix
+ SP_GLEFTINDEX, //general case
+ SP_MLEFTINDEX_R, //map-only left index, broadcast rhs
+ SP_MLEFTINDEX_L, //map-only left index, broadcast lhs
}
public static String OPSTRING = "lix"; //"LeftIndexing";
@@ -186,9 +188,9 @@ public class LeftIndexingOp extends Hop
Hop left = getInput().get(0);
Hop right = getInput().get(1);
- LeftIndexingMethod method = getOptMethodLeftIndexingMethod( right.getDim1(), right.getDim2(),
- right.getRowsInBlock(), right.getColsInBlock(), right.getNnz(), getDataType()==DataType.SCALAR );
- boolean isBroadcast = (method == LeftIndexingMethod.SP_MLEFTINDEX);
+ LeftIndexingMethod method = getOptMethodLeftIndexingMethod(
+ left.getDim1(), left.getDim2(), left.getRowsInBlock(), left.getColsInBlock(), left.getNnz(),
+ right.getDim1(), right.getDim2(), right.getNnz(), right.getDataType() );
//insert cast to matrix if necessary (for reuse broadcast runtime)
Lop rightInput = right.constructLops();
@@ -203,7 +205,7 @@ public class LeftIndexingOp extends Hop
left.constructLops(), rightInput,
getInput().get(2).constructLops(), getInput().get(3).constructLops(),
getInput().get(4).constructLops(), getInput().get(5).constructLops(),
- getDataType(), getValueType(), et, isBroadcast);
+ getDataType(), getValueType(), et, getSpLixCacheType(method));
setOutputDimensions(leftIndexLop);
setLineNumbers(leftIndexLop);
@@ -240,6 +242,14 @@ public class LeftIndexingOp extends Hop
return (rightHandSide.getDataType() == DataType.SCALAR);
}
+ private LixCacheType getSpLixCacheType(LeftIndexingMethod method) {
+ switch( method ) {
+ case SP_MLEFTINDEX_L: return LixCacheType.LEFT;
+ case SP_MLEFTINDEX_R: return LixCacheType.RIGHT;
+ default: return LixCacheType.NONE;
+ }
+ }
+
@Override
public String getOpString() {
String s = new String("");
@@ -398,19 +408,33 @@ public class LeftIndexingOp extends Hop
return _etype;
}
- private LeftIndexingMethod getOptMethodLeftIndexingMethod( long m2_dim1, long m2_dim2,
- long m2_rpb, long m2_cpb, long m2_nnz, boolean isScalar)
+ private static LeftIndexingMethod getOptMethodLeftIndexingMethod(
+ long m1_dim1, long m1_dim2, long m1_rpb, long m1_cpb, long m1_nnz,
+ long m2_dim1, long m2_dim2, long m2_nnz, DataType rhsDt)
{
if(FORCED_LEFT_INDEXING != null) {
return FORCED_LEFT_INDEXING;
}
- // broadcast-based left indexing has memory constraints but is more efficient
- // since it does not require shuffle
- if( isScalar || m2_dim1 >= 1 && m2_dim2 >= 1 // rhs dims known
- && OptimizerUtils.checkSparkBroadcastMemoryBudget(m2_dim1, m2_dim2, m2_rpb, m2_cpb, m2_nnz) )
- {
- return LeftIndexingMethod.SP_MLEFTINDEX;
+ // broadcast-based left indexing w/o shuffle for scalar rhs
+ if( rhsDt == DataType.SCALAR ) {
+ return LeftIndexingMethod.SP_MLEFTINDEX_R;
+ }
+
+ // broadcast-based left indexing w/o shuffle for small left/right inputs
+ if( m2_dim1 >= 1 && m2_dim2 >= 1 && m2_dim1 >= 1 && m2_dim2 >= 1 ) { //lhs/rhs known
+ boolean isAligned = (rhsDt == DataType.MATRIX) &&
+ ((m1_dim1 == m2_dim1 && m1_dim2 <= m1_cpb) || (m1_dim2 == m2_dim2 && m1_dim1 <= m1_rpb));
+ boolean broadcastRhs = OptimizerUtils.checkSparkBroadcastMemoryBudget(m2_dim1, m2_dim2, m1_rpb, m1_cpb, m2_nnz);
+ double m1SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m1_dim1, m1_dim2, m1_rpb, m1_cpb, m1_nnz);
+ double m2SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m2_dim1, m2_dim2, m1_rpb, m1_cpb, m2_nnz);
+
+ if( broadcastRhs ) {
+ if( isAligned && m1SizeP<m2SizeP ) //e.g., sparse-dense lix
+ return LeftIndexingMethod.SP_MLEFTINDEX_L;
+ else //all other cases, where rhs smaller than lhs
+ return LeftIndexingMethod.SP_MLEFTINDEX_R;
+ }
}
// default general case
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9d181613/src/main/java/org/apache/sysml/lops/LeftIndex.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/LeftIndex.java b/src/main/java/org/apache/sysml/lops/LeftIndex.java
index 71e4882..7c6117a 100644
--- a/src/main/java/org/apache/sysml/lops/LeftIndex.java
+++ b/src/main/java/org/apache/sysml/lops/LeftIndex.java
@@ -28,9 +28,32 @@ import org.apache.sysml.parser.Expression.ValueType;
public class LeftIndex extends Lop
{
+ public enum LixCacheType {
+ RIGHT,
+ LEFT,
+ NONE
+ }
+
+ private LixCacheType _type;
+
+ public LeftIndex(
+ Lop lhsInput, Lop rhsInput, Lop rowL, Lop rowU, Lop colL, Lop colU, DataType dt, ValueType vt, ExecType et)
+ throws LopsException {
+ super(Lop.Type.LeftIndex, dt, vt);
+ _type = LixCacheType.NONE;
+ init(lhsInput, rhsInput, rowL, rowU, colL, colU, et);
+ }
+
+ public LeftIndex(
+ Lop lhsInput, Lop rhsInput, Lop rowL, Lop rowU, Lop colL, Lop colU, DataType dt, ValueType vt, ExecType et, LixCacheType type)
+ throws LopsException {
+ super(Lop.Type.LeftIndex, dt, vt);
+ _type = type;
+ init(lhsInput, rhsInput, rowL, rowU, colL, colU, et);
+ }
/**
- * Constructor to setup a LeftIndexing operation.
+ * Setup a LeftIndexing operation.
* Example: A[i:j, k:l] = B;
*
* @param lhsMatrix left matrix lop
@@ -77,24 +100,8 @@ public class LeftIndex extends Lop
}
}
- public LeftIndex(
- Lop lhsInput, Lop rhsInput, Lop rowL, Lop rowU, Lop colL, Lop colU, DataType dt, ValueType vt, ExecType et)
- throws LopsException {
- super(Lop.Type.LeftIndex, dt, vt);
- init(lhsInput, rhsInput, rowL, rowU, colL, colU, et);
- }
-
- boolean isBroadcast = false;
- public LeftIndex(
- Lop lhsInput, Lop rhsInput, Lop rowL, Lop rowU, Lop colL, Lop colU, DataType dt, ValueType vt, ExecType et, boolean isBroadcast)
- throws LopsException {
- super(Lop.Type.LeftIndex, dt, vt);
- this.isBroadcast = isBroadcast;
- init(lhsInput, rhsInput, rowL, rowU, colL, colU, et);
- }
-
private String getOpcode() {
- if(isBroadcast)
+ if( _type != LixCacheType.NONE )
return "mapLeftIndex";
else
return "leftIndex";
@@ -135,6 +142,11 @@ public class LeftIndex extends Lop
sb.append( OPERAND_DELIMITOR );
sb.append( this.prepOutputOperand(output));
+
+ if( getExecType() == ExecType.SPARK ) {
+ sb.append( OPERAND_DELIMITOR );
+ sb.append(_type.toString());
+ }
return sb.toString();
}
@@ -143,5 +155,4 @@ public class LeftIndex extends Lop
public String toString() {
return "leftIndex";
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9d181613/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java
index 04433dc..8d20a05 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/IndexingSPInstruction.java
@@ -20,6 +20,7 @@
package org.apache.sysml.runtime.instructions.spark;
import org.apache.sysml.hops.AggBinaryOp.SparkAggType;
+import org.apache.sysml.lops.LeftIndex.LixCacheType;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -94,7 +95,7 @@ public abstract class IndexingSPInstruction extends UnarySPInstruction
}
}
else if ( opcode.equalsIgnoreCase("leftIndex") || opcode.equalsIgnoreCase("mapLeftIndex")) {
- if ( parts.length == 8 ) {
+ if ( parts.length == 9 ) {
// Example: leftIndex:mVar1:mvar2:Var3:Var4:Var5:Var6:mVar7
CPOperand lhsInput = new CPOperand(parts[1]);
CPOperand rhsInput = new CPOperand(parts[2]);
@@ -103,8 +104,9 @@ public abstract class IndexingSPInstruction extends UnarySPInstruction
CPOperand cl = new CPOperand(parts[5]);
CPOperand cu = new CPOperand(parts[6]);
CPOperand out = new CPOperand(parts[7]);
+ LixCacheType lixtype = LixCacheType.valueOf(parts[8]);
if( lhsInput.getDataType()==DataType.MATRIX )
- return new MatrixIndexingSPInstruction(new SimpleOperator(null), lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, str);
+ return new MatrixIndexingSPInstruction(new SimpleOperator(null), lhsInput, rhsInput, rl, ru, cl, cu, out, lixtype, opcode, str);
else
return new FrameIndexingSPInstruction(new SimpleOperator(null), lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, str);
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9d181613/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index 71bb5ee..0d4d3c4 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -37,6 +37,7 @@ import scala.reflect.ClassManifestFactory;
import scala.runtime.AbstractFunction1;
import org.apache.sysml.hops.AggBinaryOp.SparkAggType;
+import org.apache.sysml.lops.LeftIndex.LixCacheType;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
@@ -57,109 +58,35 @@ import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.runtime.util.IndexRange;
import org.apache.sysml.runtime.util.UtilFunctions;
+/**
+ * This class implements the matrix indexing functionality inside CP.
+ * Example instructions:
+ * rangeReIndex:mVar1:Var2:Var3:Var4:Var5:mVar6
+ * input=mVar1, output=mVar6,
+ * bounds = (Var2,Var3,Var4,Var5)
+ * rowindex_lower: Var2, rowindex_upper: Var3
+ * colindex_lower: Var4, colindex_upper: Var5
+ * leftIndex:mVar1:mVar2:Var3:Var4:Var5:Var6:mVar7
+ * triggered by "mVar1[Var3:Var4, Var5:Var6] = mVar2"
+ * the result is stored in mVar7
+ *
+ */
public class MatrixIndexingSPInstruction extends IndexingSPInstruction
{
- /*
- * This class implements the matrix indexing functionality inside CP.
- * Example instructions:
- * rangeReIndex:mVar1:Var2:Var3:Var4:Var5:mVar6
- * input=mVar1, output=mVar6,
- * bounds = (Var2,Var3,Var4,Var5)
- * rowindex_lower: Var2, rowindex_upper: Var3
- * colindex_lower: Var4, colindex_upper: Var5
- * leftIndex:mVar1:mVar2:Var3:Var4:Var5:Var6:mVar7
- * triggered by "mVar1[Var3:Var4, Var5:Var6] = mVar2"
- * the result is stored in mVar7
- *
- */
+ private final LixCacheType _type;
public MatrixIndexingSPInstruction(Operator op, CPOperand in, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu,
CPOperand out, SparkAggType aggtype, String opcode, String istr)
{
super(op, in, rl, ru, cl, cu, out, aggtype, opcode, istr);
+ _type = LixCacheType.NONE;
}
public MatrixIndexingSPInstruction(Operator op, CPOperand lhsInput, CPOperand rhsInput, CPOperand rl, CPOperand ru, CPOperand cl, CPOperand cu,
- CPOperand out, String opcode, String istr)
+ CPOperand out, LixCacheType type, String opcode, String istr)
{
super(op, lhsInput, rhsInput, rl, ru, cl, cu, out, opcode, istr);
- }
-
- public static MatrixBlock inmemoryIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
- MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
- if( isSingleBlockLookup(mcIn, ixrange) ) {
- return singleBlockIndexing(in1, mcIn, mcOut, ixrange);
- }
- else if( isMultiBlockLookup(in1, mcIn, mcOut, ixrange) ) {
- return multiBlockIndexing(in1, mcIn, mcOut, ixrange);
- }
- else
- throw new DMLRuntimeException("Incorrect usage of inmemoryIndexing");
- }
-
- private static MatrixBlock multiBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
- MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
- //create list of all required matrix indexes
- List<MatrixIndexes> filter = new ArrayList<MatrixIndexes>();
- long rlix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
- long ruix = UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock());
- long clix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
- long cuix = UtilFunctions.computeBlockIndex(ixrange.colEnd, mcIn.getColsPerBlock());
- for( long r=rlix; r<=ruix; r++ )
- for( long c=clix; c<=cuix; c++ )
- filter.add( new MatrixIndexes(r,c) );
-
- //wrap PartitionPruningRDD around input to exploit pruning for out-of-core datasets
- JavaPairRDD<MatrixIndexes,MatrixBlock> out = createPartitionPruningRDD(in1, filter);
- out = out.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut)) //filter unnecessary blocks
- .mapToPair(new SliceBlock2(ixrange, mcOut)); //slice relevant blocks
-
- //collect output without shuffle to avoid side-effects with custom PartitionPruningRDD
- MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, (int)mcOut.getRows(),
- (int)mcOut.getCols(), mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
- return mbout;
- }
-
- private static MatrixBlock singleBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
- MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
- //single block output via lookup (on partitioned inputs, this allows for single partition
- //access to avoid a full scan of the input; note that this is especially important for
- //out-of-core datasets as entire partitions are read, not just keys as in the in-memory setting.
- long rix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
- long cix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
- List<MatrixBlock> list = in1.lookup(new MatrixIndexes(rix, cix));
- if( list.size() != 1 )
- throw new DMLRuntimeException("Block lookup returned "+list.size()+" blocks (expected 1).");
-
- MatrixBlock tmp = list.get(0);
- MatrixBlock mbout = (tmp.getNumRows()==mcOut.getRows() && tmp.getNumColumns()==mcOut.getCols()) ?
- tmp : tmp.sliceOperations( //reference full block or slice out sub-block
- UtilFunctions.computeCellInBlock(ixrange.rowStart, mcIn.getRowsPerBlock()),
- UtilFunctions.computeCellInBlock(ixrange.rowEnd, mcIn.getRowsPerBlock()),
- UtilFunctions.computeCellInBlock(ixrange.colStart, mcIn.getColsPerBlock()),
- UtilFunctions.computeCellInBlock(ixrange.colEnd, mcIn.getColsPerBlock()), new MatrixBlock());
- return mbout;
- }
-
- public static JavaPairRDD<MatrixIndexes,MatrixBlock> generalCaseRightIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
- MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange, SparkAggType aggType) {
- JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
-
- if( isPartitioningPreservingRightIndexing(mcIn, ixrange) ) {
- out = in1.mapPartitionsToPair(
- new SliceBlockPartitionFunction(ixrange, mcOut), true);
- }
- else if( aggType == SparkAggType.NONE
- || OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn) ) {
- out = in1.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))
- .mapToPair(new SliceSingleBlock(ixrange, mcOut));
- }
- else {
- out = in1.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))
- .flatMapToPair(new SliceMultipleBlocks(ixrange, mcOut));
- out = RDDAggregateUtils.mergeByKey(out);
- }
- return out;
+ _type = type;
}
@Override
@@ -205,7 +132,9 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction
//left indexing
else if ( opcode.equalsIgnoreCase("leftIndex") || opcode.equalsIgnoreCase("mapLeftIndex"))
{
- JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
+ String rddVar = (_type==LixCacheType.LEFT) ? input2.getName() : input1.getName();
+ String bcVar = (_type==LixCacheType.LEFT) ? input1.getName() : input2.getName();
+ JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( rddVar );
PartitionedBroadcast<MatrixBlock> broadcastIn2 = null;
JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = null;
JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
@@ -229,11 +158,11 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction
if(opcode.equalsIgnoreCase("mapLeftIndex"))
{
- broadcastIn2 = sec.getBroadcastForVariable( input2.getName() );
+ broadcastIn2 = sec.getBroadcastForVariable( bcVar );
//partitioning-preserving mappartitions (key access required for broadcast loopkup)
out = in1.mapPartitionsToPair(
- new LeftIndexPartitionFunction(broadcastIn2, ixrange, mcOut), true);
+ new LeftIndexPartitionFunction(broadcastIn2, ixrange, _type, mcOut), true);
}
else { //general case
// zero-out lhs
@@ -246,9 +175,9 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction
}
sec.setRDDHandleForVariable(output.getName(), out);
- sec.addLineageRDD(output.getName(), input1.getName());
+ sec.addLineageRDD(output.getName(), rddVar);
if( broadcastIn2 != null)
- sec.addLineageBroadcast(output.getName(), input2.getName());
+ sec.addLineageBroadcast(output.getName(), bcVar);
if(in2 != null)
sec.addLineageRDD(output.getName(), input2.getName());
}
@@ -256,6 +185,84 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction
throw new DMLRuntimeException("Invalid opcode (" + opcode +") encountered in MatrixIndexingSPInstruction.");
}
+
+ public static MatrixBlock inmemoryIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
+ MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
+ if( isSingleBlockLookup(mcIn, ixrange) ) {
+ return singleBlockIndexing(in1, mcIn, mcOut, ixrange);
+ }
+ else if( isMultiBlockLookup(in1, mcIn, mcOut, ixrange) ) {
+ return multiBlockIndexing(in1, mcIn, mcOut, ixrange);
+ }
+ else
+ throw new DMLRuntimeException("Incorrect usage of inmemoryIndexing");
+ }
+
+ private static MatrixBlock multiBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
+ MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
+ //create list of all required matrix indexes
+ List<MatrixIndexes> filter = new ArrayList<MatrixIndexes>();
+ long rlix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
+ long ruix = UtilFunctions.computeBlockIndex(ixrange.rowEnd, mcIn.getRowsPerBlock());
+ long clix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
+ long cuix = UtilFunctions.computeBlockIndex(ixrange.colEnd, mcIn.getColsPerBlock());
+ for( long r=rlix; r<=ruix; r++ )
+ for( long c=clix; c<=cuix; c++ )
+ filter.add( new MatrixIndexes(r,c) );
+
+ //wrap PartitionPruningRDD around input to exploit pruning for out-of-core datasets
+ JavaPairRDD<MatrixIndexes,MatrixBlock> out = createPartitionPruningRDD(in1, filter);
+ out = out.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut)) //filter unnecessary blocks
+ .mapToPair(new SliceBlock2(ixrange, mcOut)); //slice relevant blocks
+
+ //collect output without shuffle to avoid side-effects with custom PartitionPruningRDD
+ MatrixBlock mbout = SparkExecutionContext.toMatrixBlock(out, (int)mcOut.getRows(),
+ (int)mcOut.getCols(), mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1);
+ return mbout;
+ }
+
+ private static MatrixBlock singleBlockIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
+ MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange) throws DMLRuntimeException {
+ //single block output via lookup (on partitioned inputs, this allows for single partition
+ //access to avoid a full scan of the input; note that this is especially important for
+ //out-of-core datasets as entire partitions are read, not just keys as in the in-memory setting.
+ long rix = UtilFunctions.computeBlockIndex(ixrange.rowStart, mcIn.getRowsPerBlock());
+ long cix = UtilFunctions.computeBlockIndex(ixrange.colStart, mcIn.getColsPerBlock());
+ List<MatrixBlock> list = in1.lookup(new MatrixIndexes(rix, cix));
+ if( list.size() != 1 )
+ throw new DMLRuntimeException("Block lookup returned "+list.size()+" blocks (expected 1).");
+
+ MatrixBlock tmp = list.get(0);
+ MatrixBlock mbout = (tmp.getNumRows()==mcOut.getRows() && tmp.getNumColumns()==mcOut.getCols()) ?
+ tmp : tmp.sliceOperations( //reference full block or slice out sub-block
+ UtilFunctions.computeCellInBlock(ixrange.rowStart, mcIn.getRowsPerBlock()),
+ UtilFunctions.computeCellInBlock(ixrange.rowEnd, mcIn.getRowsPerBlock()),
+ UtilFunctions.computeCellInBlock(ixrange.colStart, mcIn.getColsPerBlock()),
+ UtilFunctions.computeCellInBlock(ixrange.colEnd, mcIn.getColsPerBlock()), new MatrixBlock());
+ return mbout;
+ }
+
+ private static JavaPairRDD<MatrixIndexes,MatrixBlock> generalCaseRightIndexing(JavaPairRDD<MatrixIndexes,MatrixBlock> in1,
+ MatrixCharacteristics mcIn, MatrixCharacteristics mcOut, IndexRange ixrange, SparkAggType aggType) {
+ JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
+
+ if( isPartitioningPreservingRightIndexing(mcIn, ixrange) ) {
+ out = in1.mapPartitionsToPair(
+ new SliceBlockPartitionFunction(ixrange, mcOut), true);
+ }
+ else if( aggType == SparkAggType.NONE
+ || OptimizerUtils.isIndexingRangeBlockAligned(ixrange, mcIn) ) {
+ out = in1.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))
+ .mapToPair(new SliceSingleBlock(ixrange, mcOut));
+ }
+ else {
+ out = in1.filter(new IsBlockInRange(ixrange.rowStart, ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))
+ .flatMapToPair(new SliceMultipleBlocks(ixrange, mcOut));
+ out = RDDAggregateUtils.mergeByKey(out);
+ }
+ return out;
+ }
+
private static void checkValidOutputDimensions(MatrixCharacteristics mcOut)
throws DMLRuntimeException
{
@@ -373,15 +380,17 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction
{
private static final long serialVersionUID = 1757075506076838258L;
- private PartitionedBroadcast<MatrixBlock> _binput;
- private IndexRange _ixrange = null;
- private int _brlen = -1;
- private int _bclen = -1;
+ private final PartitionedBroadcast<MatrixBlock> _binput;
+ private final IndexRange _ixrange;
+ private final LixCacheType _type;
+ private final int _brlen;
+ private final int _bclen;
- public LeftIndexPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, IndexRange ixrange, MatrixCharacteristics mc)
+ public LeftIndexPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, IndexRange ixrange, LixCacheType type, MatrixCharacteristics mc)
{
_binput = binput;
_ixrange = ixrange;
+ _type = type;
_brlen = mc.getRowsPerBlock();
_bclen = mc.getColsPerBlock();
}
@@ -403,32 +412,53 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction
protected Tuple2<MatrixIndexes, MatrixBlock> computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
throws Exception
{
- if(!UtilFunctions.isInBlockRange(arg._1(), _brlen, _bclen, _ixrange)) {
+ if(_type==LixCacheType.RIGHT && !UtilFunctions.isInBlockRange(arg._1(), _brlen, _bclen, _ixrange)) {
return arg;
}
- // Calculate global index of left hand side block
- long lhs_rl = Math.max(_ixrange.rowStart, (arg._1.getRowIndex()-1)*_brlen + 1);
- long lhs_ru = Math.min(_ixrange.rowEnd, arg._1.getRowIndex()*_brlen);
- long lhs_cl = Math.max(_ixrange.colStart, (arg._1.getColumnIndex()-1)*_bclen + 1);
- long lhs_cu = Math.min(_ixrange.colEnd, arg._1.getColumnIndex()*_bclen);
-
- // Calculate global index of right hand side block
- long rhs_rl = lhs_rl - _ixrange.rowStart + 1;
- long rhs_ru = rhs_rl + (lhs_ru - lhs_rl);
- long rhs_cl = lhs_cl - _ixrange.colStart + 1;
- long rhs_cu = rhs_cl + (lhs_cu - lhs_cl);
-
- // Provide global zero-based index to sliceOperations
- MatrixBlock slicedRHSMatBlock = _binput.sliceOperations(rhs_rl, rhs_ru, rhs_cl, rhs_cu, new MatrixBlock());
-
- // Provide local zero-based index to leftIndexingOperations
- int lhs_lrl = UtilFunctions.computeCellInBlock(lhs_rl, _brlen);
- int lhs_lru = UtilFunctions.computeCellInBlock(lhs_ru, _brlen);
- int lhs_lcl = UtilFunctions.computeCellInBlock(lhs_cl, _bclen);
- int lhs_lcu = UtilFunctions.computeCellInBlock(lhs_cu, _bclen);
- MatrixBlock ret = arg._2.leftIndexingOperations(slicedRHSMatBlock, lhs_lrl, lhs_lru, lhs_lcl, lhs_lcu, new MatrixBlock(), UpdateType.COPY);
- return new Tuple2<MatrixIndexes, MatrixBlock>(arg._1, ret);
+ if( _type == LixCacheType.LEFT )
+ {
+ // LixCacheType.LEFT guarantees aligned blocks, so for each rhs inputs block
+ // the the corresponding left block and perform blockwise left indexing
+ MatrixIndexes ix = arg._1();
+ MatrixBlock right = arg._2();
+
+ int rl = UtilFunctions.computeCellInBlock(_ixrange.rowStart, _brlen);
+ int ru = (int)Math.min(_ixrange.rowEnd, rl+right.getNumRows())-1;
+ int cl = UtilFunctions.computeCellInBlock(_ixrange.colStart, _brlen);
+ int cu = (int)Math.min(_ixrange.colEnd, cl+right.getNumColumns())-1;
+
+ MatrixBlock left = _binput.getBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex());
+ MatrixBlock tmp = left.leftIndexingOperations(right,
+ rl, ru, cl, cu, new MatrixBlock(), UpdateType.COPY);
+
+ return new Tuple2<MatrixIndexes, MatrixBlock>(ix, tmp);
+ }
+ else //LixCacheType.RIGHT
+ {
+ // Calculate global index of left hand side block
+ long lhs_rl = Math.max(_ixrange.rowStart, (arg._1.getRowIndex()-1)*_brlen + 1);
+ long lhs_ru = Math.min(_ixrange.rowEnd, arg._1.getRowIndex()*_brlen);
+ long lhs_cl = Math.max(_ixrange.colStart, (arg._1.getColumnIndex()-1)*_bclen + 1);
+ long lhs_cu = Math.min(_ixrange.colEnd, arg._1.getColumnIndex()*_bclen);
+
+ // Calculate global index of right hand side block
+ long rhs_rl = lhs_rl - _ixrange.rowStart + 1;
+ long rhs_ru = rhs_rl + (lhs_ru - lhs_rl);
+ long rhs_cl = lhs_cl - _ixrange.colStart + 1;
+ long rhs_cu = rhs_cl + (lhs_cu - lhs_cl);
+
+ // Provide global zero-based index to sliceOperations
+ MatrixBlock slicedRHSMatBlock = _binput.sliceOperations(rhs_rl, rhs_ru, rhs_cl, rhs_cu, new MatrixBlock());
+
+ // Provide local zero-based index to leftIndexingOperations
+ int lhs_lrl = UtilFunctions.computeCellInBlock(lhs_rl, _brlen);
+ int lhs_lru = UtilFunctions.computeCellInBlock(lhs_ru, _brlen);
+ int lhs_lcl = UtilFunctions.computeCellInBlock(lhs_cl, _bclen);
+ int lhs_lcu = UtilFunctions.computeCellInBlock(lhs_cu, _bclen);
+ MatrixBlock ret = arg._2.leftIndexingOperations(slicedRHSMatBlock, lhs_lrl, lhs_lru, lhs_lcl, lhs_lcu, new MatrixBlock(), UpdateType.COPY);
+ return new Tuple2<MatrixIndexes, MatrixBlock>(arg._1, ret);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9d181613/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
index c1f12e7..1d435c4 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
@@ -98,7 +98,7 @@ public class FrameIndexingDistTest extends AutomatedTestBase
// Left Indexing Spark test cases
@Test
public void testMapLeftIndexingSP() throws DMLRuntimeException, IOException {
- runTestLeftIndexing(ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX, schemaMixedLarge, IXType.LIX, true);
+ runTestLeftIndexing(ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R, schemaMixedLarge, IXType.LIX, true);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9d181613/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingSparseDenseTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingSparseDenseTest.java b/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingSparseDenseTest.java
index b55b737..8f48cbe 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingSparseDenseTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingSparseDenseTest.java
@@ -56,6 +56,7 @@ public class LeftIndexingSparseDenseTest extends AutomatedTestBase
RIGHT_ALIGNED,
RIGHT2_ALIGNED,
CENTERED,
+ SINGLE_BLOCK,
}
@Override
@@ -84,27 +85,32 @@ public class LeftIndexingSparseDenseTest extends AutomatedTestBase
@Test
public void testSparseMapLeftIndexingLeftAlignedSP() {
- runLeftIndexingSparseSparseTest(LixType.LEFT_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX);
+ runLeftIndexingSparseSparseTest(LixType.LEFT_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
}
@Test
public void testSparseMapLeftIndexingLeft2AlignedSP() {
- runLeftIndexingSparseSparseTest(LixType.LEFT2_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX);
+ runLeftIndexingSparseSparseTest(LixType.LEFT2_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
}
@Test
public void testSparseMapLeftIndexingRightAlignedSP() {
- runLeftIndexingSparseSparseTest(LixType.RIGHT_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX);
+ runLeftIndexingSparseSparseTest(LixType.RIGHT_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
}
@Test
public void testSparseMapLeftIndexingRight2AlignedSP() {
- runLeftIndexingSparseSparseTest(LixType.RIGHT2_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX);
+ runLeftIndexingSparseSparseTest(LixType.RIGHT2_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
}
@Test
public void testSparseMapLeftIndexingCenteredSP() {
- runLeftIndexingSparseSparseTest(LixType.CENTERED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX);
+ runLeftIndexingSparseSparseTest(LixType.CENTERED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
+ }
+
+ @Test
+ public void testSparseMapLeftIndexingSingleBlockSP() {
+ runLeftIndexingSparseSparseTest(LixType.SINGLE_BLOCK, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
}
// ----
@@ -134,6 +140,11 @@ public class LeftIndexingSparseDenseTest extends AutomatedTestBase
}
@Test
+ public void testSparseLeftIndexingSingleBlockSP() {
+ runLeftIndexingSparseSparseTest(LixType.SINGLE_BLOCK, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_L);
+ }
+
+ @Test
public void testSparseLeftIndexingLeftAligned() {
runLeftIndexingSparseSparseTest(LixType.LEFT_ALIGNED, ExecType.MR, null);
}
@@ -165,12 +176,15 @@ public class LeftIndexingSparseDenseTest extends AutomatedTestBase
public void runLeftIndexingSparseSparseTest(LixType type, ExecType et, LeftIndexingOp.LeftIndexingMethod indexingMethod)
{
int cl = -1;
+ int lcols1 = cols1;
switch( type ){
case LEFT_ALIGNED: cl = 1; break;
case LEFT2_ALIGNED: cl = 2; break;
case RIGHT_ALIGNED: cl = cols1-cols2+1; break;
case RIGHT2_ALIGNED: cl = cols1-cols2; break;
case CENTERED: cl = (cols1-cols2)/2; break;
+ case SINGLE_BLOCK: cl = 3; lcols1=cols2+7; break;
+
}
int cu = cl+cols2-1;
@@ -215,10 +229,10 @@ public class LeftIndexingSparseDenseTest extends AutomatedTestBase
inputDir() + " " + cl + " " + cu + " " + expectedDir();
//generate input data sets
- double[][] A = getRandomMatrix(rows1, cols1, -1, 1, sparsity1, 1234);
- writeInputMatrixWithMTD("A", A, true);
+ double[][] A = getRandomMatrix(rows1, lcols1, -1, 1, sparsity1, 1234);
+ writeInputMatrixWithMTD("A", A, true);
double[][] B = getRandomMatrix(rows2, cols2, -1, 1, sparsity2, 5678);
- writeInputMatrixWithMTD("B", B, true);
+ writeInputMatrixWithMTD("B", B, true);
runTest(true, false, null, 1); //REBLOCK
runRScript(true);
@@ -226,7 +240,7 @@ public class LeftIndexingSparseDenseTest extends AutomatedTestBase
HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
HashMap<CellIndex, Double> rfile = readRMatrixFromFS("R");
TestUtils.compareMatrices(dmlfile, rfile, 0, "DML", "R");
- checkDMLMetaDataFile("R", new MatrixCharacteristics(rows1,cols1,1,1));
+ checkDMLMetaDataFile("R", new MatrixCharacteristics(rows1,lcols1,1,1));
}
finally {
rtplatform = oldRTP;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9d181613/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingSparseSparseTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingSparseSparseTest.java b/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingSparseSparseTest.java
index 80b941d..0abdb07 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingSparseSparseTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingSparseSparseTest.java
@@ -111,27 +111,27 @@ public class LeftIndexingSparseSparseTest extends AutomatedTestBase
// ----
@Test
public void testSparseMapLeftIndexingLeftAlignedSP() {
- runLeftIndexingSparseSparseTest(LixType.LEFT_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX);
+ runLeftIndexingSparseSparseTest(LixType.LEFT_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
}
@Test
public void testSparseMapLeftIndexingLeft2AlignedSP() {
- runLeftIndexingSparseSparseTest(LixType.LEFT2_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX);
+ runLeftIndexingSparseSparseTest(LixType.LEFT2_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
}
@Test
public void testSparseMapLeftIndexingRightAlignedSP() {
- runLeftIndexingSparseSparseTest(LixType.RIGHT_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX);
+ runLeftIndexingSparseSparseTest(LixType.RIGHT_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
}
@Test
public void testSparseMapLeftIndexingRight2AlignedSP() {
- runLeftIndexingSparseSparseTest(LixType.RIGHT2_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX);
+ runLeftIndexingSparseSparseTest(LixType.RIGHT2_ALIGNED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
}
@Test
public void testSparseMapLeftIndexingCenteredSP() {
- runLeftIndexingSparseSparseTest(LixType.CENTERED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX);
+ runLeftIndexingSparseSparseTest(LixType.CENTERED, ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9d181613/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingTest.java b/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingTest.java
index b498186..82781ce 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/indexing/LeftIndexingTest.java
@@ -58,7 +58,7 @@ public class LeftIndexingTest extends AutomatedTestBase
@Test
public void testMapLeftIndexingSP() {
- runTestLeftIndexing(ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX);
+ runTestLeftIndexing(ExecType.SPARK, LeftIndexingMethod.SP_MLEFTINDEX_R);
}
@Test