You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/04/01 04:32:31 UTC
[1/4] systemml git commit: [SYSTEMML-2223] Repartition ultra-sparse
matrices to preferred #parts
Repository: systemml
Updated Branches:
refs/heads/master 696e79218 -> addd6e121
[SYSTEMML-2223] Repartition ultra-sparse matrices to preferred #parts
This patch improves the spark checkpointing (i.e., distributed caching)
logic by repartitioning ultra-sparse matrices to the preferred number of
partitions as multiple of the default parallelism.
Furthermore, this also makes a minor improvement for empty block
handling on aggregating ultra-sparse matrices to avoid unnecessary GC
overhead.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/015b2731
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/015b2731
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/015b2731
Branch: refs/heads/master
Commit: 015b2731893b5f630a0bdfb8cb0efdf86e84fd05
Parents: 696e792
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Mar 31 14:54:28 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Mar 31 14:54:28 2018 -0700
----------------------------------------------------------------------
.../spark/CheckpointSPInstruction.java | 25 ++++++++++++++------
.../spark/utils/RDDAggregateUtils.java | 10 ++++----
.../runtime/matrix/MatrixCharacteristics.java | 7 ++++++
3 files changed, 31 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index ccd7319..33dd494 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -41,6 +41,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.SparseBlock;
import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.util.UtilFunctions;
public class CheckpointSPInstruction extends UnarySPInstruction {
// default storage level
@@ -100,19 +101,28 @@ public class CheckpointSPInstruction extends UnarySPInstruction {
JavaPairRDD<?,?> out = null;
if( !in.getStorageLevel().equals( _level ) )
{
- //(trigger coalesce if intended number of partitions exceeded by 20%
- //and not hash partitioned to avoid losing the existing partitioner)
+ //determine need for coalesce or repartition, and csr conversion
int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in);
boolean coalesce = ( 1.2*numPartitions < in.getNumPartitions()
&& !SparkUtils.isHashPartitioned(in) && in.getNumPartitions()
> SparkExecutionContext.getDefaultParallelism(true));
+ boolean repartition = mcIn.dimsKnown(true) && mcIn.isUltraSparse()
+ && numPartitions > in.getNumPartitions();
+ boolean mcsr2csr = input1.getDataType()==DataType.MATRIX
+ && OptimizerUtils.checkSparseBlockCSRConversion(mcIn)
+ && !_level.equals(Checkpoint.SER_STORAGE_LEVEL);
//checkpoint pre-processing rdd operations
if( coalesce ) {
//merge partitions without shuffle if too many partitions
out = in.coalesce( numPartitions );
}
- else {
+ else if( repartition ) {
+ //repartition to preferred size as multiple of default parallelism
+ out = in.repartition(UtilFunctions.roundToNext(numPartitions,
+ SparkExecutionContext.getDefaultParallelism(true)));
+ }
+ else if( !mcsr2csr ) {
//since persist is an in-place marker for a storage level, we
//apply a narrow shallow copy to allow for short-circuit collects
if( input1.getDataType() == DataType.MATRIX )
@@ -120,13 +130,14 @@ public class CheckpointSPInstruction extends UnarySPInstruction {
(JavaPairRDD<MatrixIndexes,MatrixBlock>)in, false);
else if( input1.getDataType() == DataType.FRAME)
out = ((JavaPairRDD<Long,FrameBlock>)in)
- .mapValues(new CopyFrameBlockFunction(false));
+ .mapValues(new CopyFrameBlockFunction(false));
+ }
+ else {
+ out = in;
}
//convert mcsr into memory-efficient csr if potentially sparse
- if( input1.getDataType()==DataType.MATRIX
- && OptimizerUtils.checkSparseBlockCSRConversion(mcIn)
- && !_level.equals(Checkpoint.SER_STORAGE_LEVEL) ) {
+ if( mcsr2csr ) {
out = ((JavaPairRDD<MatrixIndexes,MatrixBlock>)out)
.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR));
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 476e93f..0101e26 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -256,26 +256,28 @@ public class RDDAggregateUtils
{
private static final long serialVersionUID = 3703543699467085539L;
- private AggregateOperator _op = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.NONE);
+ private AggregateOperator _op = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.NONE);
@Override
public CorrMatrixBlock call(CorrMatrixBlock arg0, MatrixBlock arg1)
throws Exception
{
+ if( arg1.isEmptyBlock(false) )
+ return arg0;
+
//get current block and correction
MatrixBlock value = arg0.getValue();
MatrixBlock corr = arg0.getCorrection();
//correction block allocation on demand
- if( corr == null ){
+ if( corr == null )
corr = new MatrixBlock(value.getNumRows(), value.getNumColumns(), false);
- }
//aggregate other input and maintain corrections
//(existing value and corr are used in place)
OperationsOnMatrixValues.incrementalAggregation(value, corr, arg1, _op, false);
return arg0.set(value, corr);
- }
+ }
}
private static class MergeSumBlockCombinerFunction implements Function2<CorrMatrixBlock, CorrMatrixBlock, CorrMatrixBlock>
http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
index 1443a8c..91d70f8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
+import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.lops.MMTSJ.MMTSJType;
import org.apache.sysml.runtime.instructions.mr.AggregateBinaryInstruction;
import org.apache.sysml.runtime.instructions.mr.AggregateInstruction;
@@ -61,6 +62,7 @@ import org.apache.sysml.runtime.instructions.mr.UaggOuterChainInstruction;
import org.apache.sysml.runtime.instructions.mr.UnaryInstruction;
import org.apache.sysml.runtime.instructions.mr.UnaryMRInstructionBase;
import org.apache.sysml.runtime.instructions.mr.ZeroOutInstruction;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
@@ -225,6 +227,11 @@ public class MatrixCharacteristics implements Serializable
return ( !ubNnz && nonZero >= 0 );
}
+ public boolean isUltraSparse() {
+ return dimsKnown(true) && OptimizerUtils.getSparsity(this)
+ < MatrixBlock.ULTRA_SPARSITY_TURN_POINT;
+ }
+
public boolean mightHaveEmptyBlocks() {
long singleBlk = Math.max(Math.min(numRows, numRowsPerBlock),1)
* Math.max(Math.min(numColumns, numColumnsPerBlock),1);
[4/4] systemml git commit: [SYSTEMML-2225] Fix reblock ultra-sparse,
incl mem efficiency read
Posted by mb...@apache.org.
[SYSTEMML-2225] Fix reblock ultra-sparse, incl mem efficiency read
This patch fixes the robustness of reblocking ultra-sparse matrices by
hardening the CSR index lookups, and better handling of empty blocks on
reblock. Furthermore, this also includes a fix for avoiding unnecessary
csr block creation on initial read for empty blocks.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/addd6e12
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/addd6e12
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/addd6e12
Branch: refs/heads/master
Commit: addd6e121ac8d81af0f90859666b9ac1ec1e5009
Parents: c516145
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Mar 31 19:06:19 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Mar 31 21:27:34 2018 -0700
----------------------------------------------------------------------
.../spark/functions/CopyBlockPairFunction.java | 2 +-
.../functions/ExtractBlockForBinaryReblock.java | 31 ++++++++++----------
.../sysml/runtime/matrix/data/MatrixBlock.java | 8 +++--
.../runtime/matrix/data/SparseBlockCSR.java | 7 +++--
4 files changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/addd6e12/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyBlockPairFunction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyBlockPairFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyBlockPairFunction.java
index 5423a8f..8ff6c2a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyBlockPairFunction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CopyBlockPairFunction.java
@@ -71,7 +71,7 @@ public class CopyBlockPairFunction implements PairFlatMapFunction<Iterator<Tuple
MatrixIndexes ix = new MatrixIndexes(arg._1());
MatrixBlock block = null;
//always create deep copies in more memory-efficient CSR representation
- //if block is already in sparse format
+ //if block is already in sparse format
if( Checkpoint.CHECKPOINT_SPARSE_CSR && arg._2.isInSparseFormat() )
block = new MatrixBlock(arg._2, SparseBlock.Type.CSR, true);
else
http://git-wip-us.apache.org/repos/asf/systemml/blob/addd6e12/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
index a2a1ce0..66a2271 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
@@ -70,37 +70,36 @@ public class ExtractBlockForBinaryReblock implements PairFlatMapFunction<Tuple2<
long endRowGlobalCellIndex = getEndGlobalIndex(ixIn.getRowIndex(), true, true);
long startColGlobalCellIndex = UtilFunctions.computeCellIndex(ixIn.getColumnIndex(), in_bclen, 0);
long endColGlobalCellIndex = getEndGlobalIndex(ixIn.getColumnIndex(), true, false);
- assert(startRowGlobalCellIndex <= endRowGlobalCellIndex && startColGlobalCellIndex <= endColGlobalCellIndex);
long out_startRowBlockIndex = UtilFunctions.computeBlockIndex(startRowGlobalCellIndex, out_brlen);
long out_endRowBlockIndex = UtilFunctions.computeBlockIndex(endRowGlobalCellIndex, out_brlen);
long out_startColBlockIndex = UtilFunctions.computeBlockIndex(startColGlobalCellIndex, out_bclen);
long out_endColBlockIndex = UtilFunctions.computeBlockIndex(endColGlobalCellIndex, out_bclen);
- assert(out_startRowBlockIndex <= out_endRowBlockIndex && out_startColBlockIndex <= out_endColBlockIndex);
ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> retVal = new ArrayList<>();
for(long i = out_startRowBlockIndex; i <= out_endRowBlockIndex; i++) {
for(long j = out_startColBlockIndex; j <= out_endColBlockIndex; j++) {
MatrixIndexes indx = new MatrixIndexes(i, j);
- long rowLower = Math.max(UtilFunctions.computeCellIndex(i, out_brlen, 0), startRowGlobalCellIndex);
- long rowUpper = Math.min(getEndGlobalIndex(i, false, true), endRowGlobalCellIndex);
- long colLower = Math.max(UtilFunctions.computeCellIndex(j, out_bclen, 0), startColGlobalCellIndex);
- long colUpper = Math.min(getEndGlobalIndex(j, false, false), endColGlobalCellIndex);
-
int new_lrlen = UtilFunctions.computeBlockSize(rlen, i, out_brlen);
int new_lclen = UtilFunctions.computeBlockSize(clen, j, out_bclen);
MatrixBlock blk = new MatrixBlock(new_lrlen, new_lclen, true);
- int in_i1 = UtilFunctions.computeCellInBlock(rowLower, in_brlen);
- int out_i1 = UtilFunctions.computeCellInBlock(rowLower, out_brlen);
-
- for(long i1 = rowLower; i1 <= rowUpper; i1++, in_i1++, out_i1++) {
- int in_j1 = UtilFunctions.computeCellInBlock(colLower, in_bclen);
- int out_j1 = UtilFunctions.computeCellInBlock(colLower, out_bclen);
- for(long j1 = colLower; j1 <= colUpper; j1++, in_j1++, out_j1++) {
- double val = in.getValue(in_i1, in_j1);
- blk.appendValue(out_i1, out_j1, val);
+ if( !in.isEmptyBlock(false) ) {
+ long rowLower = Math.max(UtilFunctions.computeCellIndex(i, out_brlen, 0), startRowGlobalCellIndex);
+ long rowUpper = Math.min(getEndGlobalIndex(i, false, true), endRowGlobalCellIndex);
+ long colLower = Math.max(UtilFunctions.computeCellIndex(j, out_bclen, 0), startColGlobalCellIndex);
+ long colUpper = Math.min(getEndGlobalIndex(j, false, false), endColGlobalCellIndex);
+ int in_i1 = UtilFunctions.computeCellInBlock(rowLower, in_brlen);
+ int out_i1 = UtilFunctions.computeCellInBlock(rowLower, out_brlen);
+
+ for(long i1 = rowLower; i1 <= rowUpper; i1++, in_i1++, out_i1++) {
+ int in_j1 = UtilFunctions.computeCellInBlock(colLower, in_bclen);
+ int out_j1 = UtilFunctions.computeCellInBlock(colLower, out_bclen);
+ for(long j1 = colLower; j1 <= colUpper; j1++, in_j1++, out_j1++) {
+ double val = in.quickGetValue(in_i1, in_j1);
+ blk.appendValue(out_i1, out_j1, val);
+ }
}
}
retVal.add(new Tuple2<>(indx, blk));
http://git-wip-us.apache.org/repos/asf/systemml/blob/addd6e12/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index f19fbe5..efe2365 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -184,10 +184,12 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
throw new RuntimeException("Sparse matrix block expected.");
//deep copy and change sparse block type
- nonZeros = that.nonZeros;
- estimatedNNzsPerRow = that.estimatedNNzsPerRow;
- sparseBlock = SparseBlockFactory
+ if( !that.isEmptyBlock(false) ) {
+ nonZeros = that.nonZeros;
+ estimatedNNzsPerRow = that.estimatedNNzsPerRow;
+ sparseBlock = SparseBlockFactory
.copySparseBlock(stype, that.sparseBlock, deep);
+ }
}
////////
http://git-wip-us.apache.org/repos/asf/systemml/blob/addd6e12/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
index de0c34b..6bbc81d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
@@ -734,16 +734,20 @@ public class SparseBlockCSR extends SparseBlock
@Override
public double get(int r, int c) {
+ if( isEmpty(r) )
+ return 0;
int pos = pos(r);
int len = size(r);
//search for existing col index in [pos,pos+len)
- int index = Arrays.binarySearch(_indexes, pos, pos+len, c);
+ int index = Arrays.binarySearch(_indexes, pos, pos+len, c);
return (index >= 0) ? _values[index] : 0;
}
@Override
public SparseRow get(int r) {
+ if( isEmpty(r) )
+ return new SparseRowScalar();
int pos = pos(r);
int len = size(r);
@@ -751,7 +755,6 @@ public class SparseBlockCSR extends SparseBlock
System.arraycopy(_indexes, pos, row.indexes(), 0, len);
System.arraycopy(_values, pos, row.values(), 0, len);
row.setSize(len);
-
return row;
}
[2/4] systemml git commit: [HOTFIX][SYSTEMML-2219] Fix
ultra-sparse/ultra-sparse matrix multiply
Posted by mb...@apache.org.
[HOTFIX][SYSTEMML-2219] Fix ultra-sparse/ultra-sparse matrix multiply
This patch fixes the improved ultra-sparse matrix multiply for special
cases of ultra-sparse x ultra-sparse matrix multiply where the rhs has
entirely empty rows.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2b3aefe7
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2b3aefe7
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2b3aefe7
Branch: refs/heads/master
Commit: 2b3aefe79446b3b1ab13640566a37e11f620ee96
Parents: 015b273
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Mar 31 15:05:59 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Mar 31 15:05:59 2018 -0700
----------------------------------------------------------------------
.../runtime/matrix/data/LibMatrixMult.java | 7 +-
...FullMatrixMultiplicationUltraSparseTest.java | 75 ++++++++++++--------
2 files changed, 51 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/2b3aefe7/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
index 8919ab6..ef273f6 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
@@ -1512,17 +1512,18 @@ public class LibMatrixMult
if( alen==1 ) {
//row selection (now aggregation) with potential scaling
int aix = aixs[apos];
+ int lnnz = 0;
if( rightSparse ) { //sparse right matrix (full row copy)
if( !m2.sparseBlock.isEmpty(aix) ) {
ret.rlen=m;
ret.allocateSparseRowsBlock(false); //allocation on demand
boolean ldeep = (m2.sparseBlock instanceof SparseBlockMCSR);
ret.sparseBlock.set(i, m2.sparseBlock.get(aix), ldeep);
- ret.nonZeros += ret.sparseBlock.size(i);
+ ret.nonZeros += (lnnz = ret.sparseBlock.size(i));
}
}
else { //dense right matrix (append all values)
- int lnnz = (int)m2.recomputeNonZeros(aix, aix, 0, n-1);
+ lnnz = (int)m2.recomputeNonZeros(aix, aix, 0, n-1);
if( lnnz > 0 ) {
c.allocate(i, lnnz); //allocate once
double[] bvals = m2.getDenseBlock().values(aix);
@@ -1532,7 +1533,7 @@ public class LibMatrixMult
}
}
//optional scaling if not pure selection
- if( avals[apos] != 1 )
+ if( avals[apos] != 1 && lnnz > 0 )
vectMultiplyInPlace(avals[apos], c.values(i), c.pos(i), c.size(i));
}
else //GENERAL CASE
http://git-wip-us.apache.org/repos/asf/systemml/blob/2b3aefe7/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix_full_other/FullMatrixMultiplicationUltraSparseTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix_full_other/FullMatrixMultiplicationUltraSparseTest.java b/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix_full_other/FullMatrixMultiplicationUltraSparseTest.java
index 2a621fb..538ac1b 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix_full_other/FullMatrixMultiplicationUltraSparseTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix_full_other/FullMatrixMultiplicationUltraSparseTest.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-
+import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
import org.apache.sysml.lops.LopProperties.ExecType;
import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
@@ -34,7 +34,6 @@ import org.apache.sysml.test.utils.TestUtils;
public class FullMatrixMultiplicationUltraSparseTest extends AutomatedTestBase
{
-
private final static String TEST_NAME = "FullMatrixMultiplication";
private final static String TEST_DIR = "functions/binary/matrix_full_other/";
private final static String TEST_CLASS_DIR = TEST_DIR + FullMatrixMultiplicationUltraSparseTest.class.getSimpleName() + "/";
@@ -83,62 +82,77 @@ public class FullMatrixMultiplicationUltraSparseTest extends AutomatedTestBase
}
@Test
- public void testMMDenseUltraSparseCP()
- {
+ public void testMMDenseUltraSparseCP() {
runMatrixMatrixMultiplicationTest(SparsityType.DENSE, SparsityType.ULTRA_SPARSE, ExecType.CP);
}
@Test
- public void testMMSparseUltraSparseCP()
- {
+ public void testMMSparseUltraSparseCP() {
runMatrixMatrixMultiplicationTest(SparsityType.SPARSE, SparsityType.ULTRA_SPARSE, ExecType.CP);
}
@Test
- public void testMMUltraSparseDenseCP()
- {
+ public void testMMUltraSparseDenseCP() {
runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.DENSE, ExecType.CP);
}
@Test
- public void testMMUltraSparseSparseCP()
- {
+ public void testMMUltraSparseSparseCP() {
runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.SPARSE, ExecType.CP);
}
@Test
- public void testMMUltraSparseUltraSparseCP()
- {
+ public void testMMUltraSparseUltraSparseCP() {
runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.ULTRA_SPARSE, ExecType.CP);
}
@Test
- public void testMMDenseUltraSparseMR()
- {
+ public void testMMDenseUltraSparseSP() {
+ runMatrixMatrixMultiplicationTest(SparsityType.DENSE, SparsityType.ULTRA_SPARSE, ExecType.SPARK);
+ }
+
+ @Test
+ public void testMMSparseUltraSparseSP() {
+ runMatrixMatrixMultiplicationTest(SparsityType.SPARSE, SparsityType.ULTRA_SPARSE, ExecType.SPARK);
+ }
+
+ @Test
+ public void testMMUltraSparseDenseSP() {
+ runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.DENSE, ExecType.SPARK);
+ }
+
+ @Test
+ public void testMMUltraSparseSparseSP() {
+ runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.SPARSE, ExecType.SPARK);
+ }
+
+ @Test
+ public void testMMUltraSparseUltraSparseSP() {
+ runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.ULTRA_SPARSE, ExecType.SPARK);
+ }
+
+ @Test
+ public void testMMDenseUltraSparseMR() {
runMatrixMatrixMultiplicationTest(SparsityType.DENSE, SparsityType.ULTRA_SPARSE, ExecType.MR);
}
@Test
- public void testMMSparseUltraSparseMR()
- {
+ public void testMMSparseUltraSparseMR() {
runMatrixMatrixMultiplicationTest(SparsityType.SPARSE, SparsityType.ULTRA_SPARSE, ExecType.MR);
}
@Test
- public void testMMUltraSparseDenseMR()
- {
+ public void testMMUltraSparseDenseMR() {
runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.DENSE, ExecType.MR);
}
@Test
- public void testMMUltraSparseSparseMR()
- {
+ public void testMMUltraSparseSparseMR() {
runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.SPARSE, ExecType.MR);
}
@Test
- public void testMMUltraSparseUltraSparseMR()
- {
+ public void testMMUltraSparseUltraSparseMR() {
runMatrixMatrixMultiplicationTest(SparsityType.ULTRA_SPARSE, SparsityType.ULTRA_SPARSE, ExecType.MR);
}
@@ -150,12 +164,17 @@ public class FullMatrixMultiplicationUltraSparseTest extends AutomatedTestBase
*/
private void runMatrixMatrixMultiplicationTest( SparsityType sparseM1, SparsityType sparseM2, ExecType instType)
{
- //setup exec type, rows, cols
-
- //rtplatform for MR
RUNTIME_PLATFORM platformOld = rtplatform;
- rtplatform = (instType==ExecType.MR) ? RUNTIME_PLATFORM.HADOOP : RUNTIME_PLATFORM.HYBRID;
+ switch( instType ){
+ case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+ case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+ default: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+ }
+ boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ if( rtplatform == RUNTIME_PLATFORM.SPARK )
+ DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
try
{
TestConfiguration config = getTestConfiguration(TEST_NAME);
@@ -194,8 +213,8 @@ public class FullMatrixMultiplicationUltraSparseTest extends AutomatedTestBase
HashMap<CellIndex, Double> rfile = readRMatrixFromFS("C");
TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R");
}
- finally
- {
+ finally {
+ DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
rtplatform = platformOld;
}
}
[3/4] systemml git commit: [SYSTEMML-2224] Remove unnecessary fields
from matrix blocks
Posted by mb...@apache.org.
[SYSTEMML-2224] Remove unnecessary fields from matrix blocks
This patch moves meta data information such as dig (i.e., that a matrix
only has non-zeros on the diagonal) from the matrix block to matrix
object to make the core block more compact. This is especially important
for large distributed ultra-sparse matrices, where these fields lead to
unnecessary GC overhead in case of a large fraction of empty blocks that
cannot be filtered out to guarantee correctness.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/c5161456
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/c5161456
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/c5161456
Branch: refs/heads/master
Commit: c5161456fe5d1a4e9cad7a2ce93dbdec291c2db1
Parents: 2b3aefe
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Mar 31 16:23:46 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Mar 31 16:23:46 2018 -0700
----------------------------------------------------------------------
.../controlprogram/caching/MatrixObject.java | 19 +++++++--
.../instructions/cp/ReorgCPInstruction.java | 2 +
.../apache/sysml/runtime/io/MatrixWriter.java | 33 +++++++-------
.../sysml/runtime/io/WriterBinaryBlock.java | 4 +-
.../sysml/runtime/io/WriterBinaryCell.java | 3 +-
.../sysml/runtime/io/WriterMatrixMarket.java | 2 +-
.../apache/sysml/runtime/io/WriterTextCSV.java | 2 +-
.../apache/sysml/runtime/io/WriterTextCell.java | 2 +-
.../runtime/matrix/data/LibMatrixReorg.java | 4 +-
.../sysml/runtime/matrix/data/MatrixBlock.java | 45 +++++---------------
.../sysml/runtime/util/DataConverter.java | 21 ++++-----
11 files changed, 58 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index de49222..9714a19 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -71,14 +71,15 @@ public class MatrixObject extends CacheableData<MatrixBlock>
//additional matrix-specific flags
private UpdateType _updateType = UpdateType.COPY;
-
+ private boolean _diag = false;
+
//information relevant to partitioned matrices.
private boolean _partitioned = false; //indicates if obj partitioned
private PDataPartitionFormat _partitionFormat = null; //indicates how obj partitioned
private int _partitionSize = -1; //indicates n for BLOCKWISE_N
private String _partitionCacheName = null; //name of cache block
private MatrixBlock _partitionInMemory = null;
-
+
/**
* Constructor that takes the value type and the HDFS filename.
*
@@ -119,6 +120,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
metaOld.getOutputInfo(), metaOld.getInputInfo());
_updateType = mo._updateType;
+ _diag = mo._diag;
_partitioned = mo._partitioned;
_partitionFormat = mo._partitionFormat;
_partitionSize = mo._partitionSize;
@@ -133,6 +135,14 @@ public class MatrixObject extends CacheableData<MatrixBlock>
return _updateType;
}
+ public boolean isDiag() {
+ return _diag;
+ }
+
+ public void setDiag(boolean diag) {
+ _diag = diag;
+ }
+
@Override
public void updateMatrixCharacteristics (MatrixCharacteristics mc) {
_metaData.getMatrixCharacteristics().set(mc);
@@ -531,10 +541,11 @@ public class MatrixObject extends CacheableData<MatrixBlock>
if ( oinfo == OutputInfo.BinaryBlockOutputInfo && DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE &&
(mc.getRowsPerBlock() != ConfigurationManager.getBlocksize() || mc.getColsPerBlock() != ConfigurationManager.getBlocksize()) )
{
- DataConverter.writeMatrixToHDFS(_data, fname, oinfo, new MatrixCharacteristics(mc.getRows(), mc.getCols(), ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), mc.getNonZeros()), rep, fprop);
+ DataConverter.writeMatrixToHDFS(_data, fname, oinfo, new MatrixCharacteristics(mc.getRows(), mc.getCols(),
+ ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), mc.getNonZeros()), rep, fprop, _diag);
}
else {
- DataConverter.writeMatrixToHDFS(_data, fname, oinfo, mc, rep, fprop);
+ DataConverter.writeMatrixToHDFS(_data, fname, oinfo, mc, rep, fprop, _diag);
}
if( LOG.isTraceEnabled() )
http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
index af4c12e..89d99b7 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java
@@ -144,5 +144,7 @@ public class ReorgCPInstruction extends UnaryCPInstruction {
ec.releaseMatrixInput(_col.getName());
ec.releaseMatrixInput(input1.getName(), getExtendedOpcode());
ec.setMatrixOutput(output.getName(), soresBlock, getExtendedOpcode());
+ if( r_op.fn instanceof DiagIndex && soresBlock.getNumColumns()>1 ) //diagV2M
+ ec.getMatrixObject(output.getName()).setDiag(true);
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java b/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java
index 45ba2d9..126a772 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java
@@ -21,7 +21,6 @@ package org.apache.sysml.runtime.io;
import java.io.IOException;
-import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
/**
@@ -33,9 +32,15 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
*/
public abstract class MatrixWriter
{
+ public void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz )
+ throws IOException
+ {
+ writeMatrixToHDFS(src, fname, rlen, clen, brlen, bclen, nnz, false);
+ }
- public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz )
- throws IOException, DMLRuntimeException;
+ public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag )
+ throws IOException;
+
/**
* Writes a minimal entry to represent an empty matrix on hdfs.
@@ -50,33 +55,28 @@ public abstract class MatrixWriter
public abstract void writeEmptyMatrixToHDFS( String fname, long rlen, long clen, int brlen, int bclen )
throws IOException;
- public static MatrixBlock[] createMatrixBlocksForReuse( long rlen, long clen, int brlen, int bclen, boolean sparse, long nonZeros )
- {
+ public static MatrixBlock[] createMatrixBlocksForReuse( long rlen, long clen, int brlen, int bclen, boolean sparse, long nonZeros ) {
MatrixBlock[] blocks = new MatrixBlock[4];
double sparsity = ((double)nonZeros)/(rlen*clen);
long estNNZ = -1;
//full block
- if( rlen >= brlen && clen >= bclen )
- {
+ if( rlen >= brlen && clen >= bclen ) {
estNNZ = (long) (brlen*bclen*sparsity);
blocks[0] = new MatrixBlock( brlen, bclen, sparse, (int)estNNZ );
}
//partial col block
- if( rlen >= brlen && clen%bclen!=0 )
- {
+ if( rlen >= brlen && clen%bclen!=0 ) {
estNNZ = (long) (brlen*(clen%bclen)*sparsity);
blocks[1] = new MatrixBlock( brlen, (int)(clen%bclen), sparse, (int)estNNZ );
}
//partial row block
- if( rlen%brlen!=0 && clen>=bclen )
- {
+ if( rlen%brlen!=0 && clen>=bclen ) {
estNNZ = (long) ((rlen%brlen)*bclen*sparsity);
blocks[2] = new MatrixBlock( (int)(rlen%brlen), bclen, sparse, (int)estNNZ );
}
//partial row/col block
- if( rlen%brlen!=0 && clen%bclen!=0 )
- {
+ if( rlen%brlen!=0 && clen%bclen!=0 ) {
estNNZ = (long) ((rlen%brlen)*(clen%bclen)*sparsity);
blocks[3] = new MatrixBlock( (int)(rlen%brlen), (int)(clen%bclen), sparse, (int)estNNZ );
}
@@ -85,16 +85,14 @@ public abstract class MatrixWriter
for( MatrixBlock b : blocks )
if( b != null )
if( !sparse )
- b.allocateDenseBlockUnsafe(b.getNumRows(), b.getNumColumns());
+ b.allocateDenseBlockUnsafe(b.getNumRows(), b.getNumColumns());
//NOTE: no preallocation for sparse (preallocate sparserows with estnnz) in order to reduce memory footprint
return blocks;
}
- public static MatrixBlock getMatrixBlockForReuse( MatrixBlock[] blocks, int rows, int cols, int brlen, int bclen )
- {
+ public static MatrixBlock getMatrixBlockForReuse( MatrixBlock[] blocks, int rows, int cols, int brlen, int bclen ) {
int index = -1;
-
if( rows==brlen && cols==bclen )
index = 0;
else if( rows==brlen && cols<bclen )
@@ -103,7 +101,6 @@ public abstract class MatrixWriter
index = 2;
else //if( rows<brlen && cols<bclen )
index = 3;
-
return blocks[ index ];
}
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
index 59fe573..777828c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
@@ -45,7 +45,7 @@ public class WriterBinaryBlock extends MatrixWriter
}
@Override
- public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz)
+ public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag)
throws IOException, DMLRuntimeException
{
//prepare file access
@@ -61,7 +61,7 @@ public class WriterBinaryBlock extends MatrixWriter
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
//core write sequential/parallel
- if( src.isDiag() )
+ if( diag )
writeDiagBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, brlen, bclen);
else
writeBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, brlen, bclen);
http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
index a072a6b..b4fe69f 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
@@ -36,9 +36,8 @@ import org.apache.sysml.runtime.util.MapReduceTool;
public class WriterBinaryCell extends MatrixWriter
{
-
@Override
- public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz)
+ public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag)
throws IOException, DMLRuntimeException
{
//prepare file access
http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
index c9f42bf..2115a19 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
@@ -42,7 +42,7 @@ import org.apache.sysml.runtime.util.MapReduceTool;
public class WriterMatrixMarket extends MatrixWriter
{
@Override
- public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz)
+ public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag)
throws IOException, DMLRuntimeException
{
//validity check matrix dimensions
http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
index a3015f2..3e89a52 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
@@ -54,7 +54,7 @@ public class WriterTextCSV extends MatrixWriter
}
@Override
- public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz)
+ public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag)
throws IOException, DMLRuntimeException
{
//validity check matrix dimensions
http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
index 0438f46..413a165 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
@@ -37,7 +37,7 @@ import org.apache.sysml.runtime.util.MapReduceTool;
public class WriterTextCell extends MatrixWriter
{
@Override
- public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz)
+ public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag)
throws IOException, DMLRuntimeException
{
//validity check matrix dimensions
http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 8f7bda9..42c56c4 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -290,10 +290,8 @@ public class LibMatrixReorg
int rlen = in.rlen;
int clen = in.clen;
- if( clen == 1 ){ //diagV2M
+ if( clen == 1 ) //diagV2M
diagV2M( in, out );
- out.setDiag();
- }
else if ( rlen == clen ) //diagM2V
diagM2V( in, out );
else
http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 9807340..f19fbe5 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -131,13 +131,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
//sparse-block-specific attributes (allocation only)
protected int estimatedNNzsPerRow = -1;
- //grpaggregate-specific attributes (optional)
- protected int numGroups = -1;
-
- //diag-specific attributes (optional)
- protected boolean diag = false;
-
-
////////
// Matrix Constructors
//
@@ -253,16 +246,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
(int)Math.ceil((double)estnnz/(double)rlen);
//reset sparse/dense blocks
- if( sparse ) {
+ if( sparse )
resetSparse();
- }
- else {
+ else
resetDense(val);
- }
-
- //reset operation-specific attributes
- numGroups = -1;
- diag = false;
}
private void resetSparse() {
@@ -505,15 +492,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
}
return ret;
}
-
- public void setDiag() {
- diag = true;
- }
-
- public boolean isDiag() {
- return diag;
- }
-
+
////////
// Data handling
@@ -4723,35 +4702,31 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
if( this.getNumRows() != target.getNumRows() && this.getNumRows() !=Math.max(target.getNumRows(),target.getNumColumns()) || (weights != null && this.getNumRows() != weights.getNumRows()) )
throw new DMLRuntimeException("groupedAggregate can only operate on matrices with equal dimensions.");
- // obtain numGroups from instruction, if provided
- if (ngroups > 0)
- numGroups = ngroups;
-
// Determine the number of groups
- if( numGroups <= 0 ) { //reuse if available
+ if( ngroups <= 0 ) { //reuse if available
double min = this.min();
double max = this.max();
if ( min <= 0 )
throw new DMLRuntimeException("Invalid value (" + min + ") encountered in 'groups' while computing groupedAggregate");
if ( max <= 0 )
throw new DMLRuntimeException("Invalid value (" + max + ") encountered in 'groups' while computing groupedAggregate.");
- numGroups = (int) max;
+ ngroups = (int) max;
}
// Allocate result matrix
boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1);
MatrixBlock result = checkType(ret);
- boolean result_sparsity = estimateSparsityOnGroupedAgg(rlen, numGroups);
+ boolean result_sparsity = estimateSparsityOnGroupedAgg(rlen, ngroups);
if(result==null)
- result=new MatrixBlock(numGroups, rowVector?1:target.getNumColumns(), result_sparsity);
+ result=new MatrixBlock(ngroups, rowVector?1:target.getNumColumns(), result_sparsity);
else
- result.reset(numGroups, rowVector?1:target.getNumColumns(), result_sparsity);
+ result.reset(ngroups, rowVector?1:target.getNumColumns(), result_sparsity);
//execute grouped aggregate operation
if( k > 1 )
- LibMatrixAgg.groupedAggregate(this, target, weights, result, numGroups, op, k);
+ LibMatrixAgg.groupedAggregate(this, target, weights, result, ngroups, op, k);
else
- LibMatrixAgg.groupedAggregate(this, target, weights, result, numGroups, op);
+ LibMatrixAgg.groupedAggregate(this, target, weights, result, ngroups, op);
return result;
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index 82eee9d..8748de2 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -67,22 +67,19 @@ public class DataConverter
///////
public static void writeMatrixToHDFS(MatrixBlock mat, String dir, OutputInfo outputinfo, MatrixCharacteristics mc )
- throws IOException
- {
+ throws IOException {
writeMatrixToHDFS(mat, dir, outputinfo, mc, -1, null);
}
public static void writeMatrixToHDFS(MatrixBlock mat, String dir, OutputInfo outputinfo, MatrixCharacteristics mc, int replication, FileFormatProperties formatProperties)
- throws IOException
- {
- try {
- MatrixWriter writer = MatrixWriterFactory.createMatrixWriter( outputinfo, replication, formatProperties );
- writer.writeMatrixToHDFS(mat, dir, mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros());
- }
- catch(Exception e)
- {
- throw new IOException(e);
- }
+ throws IOException {
+ writeMatrixToHDFS(mat, dir, outputinfo, mc, -1, null, false);
+ }
+
+ public static void writeMatrixToHDFS(MatrixBlock mat, String dir, OutputInfo outputinfo, MatrixCharacteristics mc, int replication, FileFormatProperties formatProperties, boolean diag)
+ throws IOException {
+ MatrixWriter writer = MatrixWriterFactory.createMatrixWriter( outputinfo, replication, formatProperties );
+ writer.writeMatrixToHDFS(mat, dir, mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros(), diag);
}
public static MatrixBlock readMatrixFromHDFS(String dir, InputInfo inputinfo, long rlen, long clen, int brlen, int bclen, boolean localFS)