You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/04/26 00:19:54 UTC
incubator-systemml git commit: [SYSTEMML-1466] Fix rdd status
handling for exports and writes, tests
Repository: incubator-systemml
Updated Branches:
refs/heads/master 2d2196d84 -> eb35b2c90
[SYSTEMML-1466] Fix rdd status handling for exports and writes, tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/eb35b2c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/eb35b2c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/eb35b2c9
Branch: refs/heads/master
Commit: eb35b2c90cbacc674793ca4aed4583273d00fa87
Parents: 2d2196d
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue Apr 25 15:03:49 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue Apr 25 16:50:50 2017 -0700
----------------------------------------------------------------------
.../controlprogram/caching/CacheableData.java | 11 +++--
.../controlprogram/caching/MatrixObject.java | 3 ++
.../instructions/spark/data/RDDObject.java | 11 ++++-
.../mlcontext/MLContextParforDatasetTest.java | 43 ++++++++++++++++----
4 files changed, 57 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index 904eb87..c1a024a 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -204,7 +204,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
_uniqueID = (int)_seq.getNextID();
_cacheStatus = CacheStatus.EMPTY;
_numReadThreads = 0;
- _gpuObjects = new HashMap<>();
+ _gpuObjects = new HashMap<GPUContext, GPUObject>();
}
/**
@@ -835,14 +835,19 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
throw new CacheException ("Export to " + fName + " failed.", e);
}
}
- else if( getRDDHandle()!=null && //pending rdd operation
- !getRDDHandle().allowsShortCircuitRead() )
+ else if( getRDDHandle()!=null && getRDDHandle().isPending()
+ && !getRDDHandle().isHDFSFile()
+ && !getRDDHandle().allowsShortCircuitRead() )
{
//CASE 3: pending rdd operation (other than checkpoints)
try
{
+ //write matrix or frame
writeBlobFromRDDtoHDFS(getRDDHandle(), fName, outputFormat);
writeMetaData( fName, outputFormat, formatProperties );
+
+ //update rdd status
+ getRDDHandle().setPending(false);
}
catch (Exception e) {
throw new CacheException ("Export to " + fName + " failed.", e);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index 4e560c8..94bdb2d 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -488,8 +488,11 @@ public class MatrixObject extends CacheableData<MatrixBlock>
if( !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing file
long newnnz = SparkExecutionContext.writeRDDtoHDFS(lrdd, _hdfsFileName, iimd.getOutputInfo());
((MatrixDimensionsMetaData) _metaData).getMatrixCharacteristics().setNonZeros(newnnz);
+ ((RDDObject)rdd).setPending(false); //mark rdd as non-pending (for export)
((RDDObject)rdd).setHDFSFile(true); //mark rdd as hdfs file (for restore)
writeStatus.setValue(true); //mark for no cache-write on read
+ //note: the flag hdfsFile is actually not entirely correct because we still hold an rdd
+ //reference to the input not to an rdd of the hdfs file but the resulting behavior is correct
}
mb = readBlobFromHDFS(_hdfsFileName);
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
index 67f97a3..0a52323 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
@@ -29,7 +29,8 @@ public class RDDObject extends LineageObject
private boolean _checkpointed = false; //created via checkpoint instruction
private boolean _hdfsfile = false; //created from hdfs file
private String _hdfsFname = null; //hdfs filename, if created from hdfs.
- private boolean _parRDD = false;
+ private boolean _parRDD = false; //is a parallelized rdd at driver
+ private boolean _pending = true; //is a pending rdd operation
public RDDObject( JavaPairRDD<?,?> rddvar, String varName) {
super(varName);
@@ -72,6 +73,14 @@ public class RDDObject extends LineageObject
return _parRDD;
}
+ public void setPending(boolean flag) {
+ _pending = flag;
+ }
+
+ public boolean isPending() {
+ return _pending;
+ }
+
/**
* Indicates if rdd is an hdfs file or a checkpoint over an hdfs file;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
index 41b8d16..36e7990 100644
--- a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
@@ -80,25 +80,45 @@ public class MLContextParforDatasetTest extends AutomatedTestBase
@Test
public void testParforDatasetVector() {
- runMLContextParforDatasetTest(true, false);
+ runMLContextParforDatasetTest(true, false, false);
}
@Test
public void testParforDatasetRow() {
- runMLContextParforDatasetTest(false, false);
+ runMLContextParforDatasetTest(false, false, false);
}
@Test
public void testParforDatasetVectorUnkownDims() {
- runMLContextParforDatasetTest(true, true);
+ runMLContextParforDatasetTest(true, true, false);
}
@Test
public void testParforDatasetRowUnknownDims() {
- runMLContextParforDatasetTest(false, true);
+ runMLContextParforDatasetTest(false, true, false);
}
- private void runMLContextParforDatasetTest(boolean vector, boolean unknownDims)
+ @Test
+ public void testParforDatasetVectorMulti() {
+ runMLContextParforDatasetTest(true, false, true);
+ }
+
+ @Test
+ public void testParforDatasetRowMulti() {
+ runMLContextParforDatasetTest(false, false, true);
+ }
+
+ @Test
+ public void testParforDatasetVectorUnkownDimsMulti() {
+ runMLContextParforDatasetTest(true, true, true);
+ }
+
+ @Test
+ public void testParforDatasetRowUnknownDimsMulti() {
+ runMLContextParforDatasetTest(false, true, true);
+ }
+
+ private void runMLContextParforDatasetTest(boolean vector, boolean unknownDims, boolean multiInputs)
{
//modify memory budget to trigger fused datapartition-execute
long oldmem = InfrastructureAnalyzer.getLocalMaxMemory();
@@ -119,21 +139,30 @@ public class MLContextParforDatasetTest extends AutomatedTestBase
MatrixMetadata mm = new MatrixMetadata(vector ? MatrixFormat.DF_VECTOR_WITH_INDEX : MatrixFormat.DF_DOUBLES_WITH_INDEX);
mm.setMatrixCharacteristics(mc2);
- String s = "v = matrix(0, rows=nrow(X), cols=1)"
+ String s1 = "v = matrix(0, rows=nrow(X), cols=1)"
+ "parfor(i in 1:nrow(X), log=DEBUG) {"
+ " v[i, ] = sum(X[i, ]);"
+ "}"
+ "r = sum(v);";
+ String s2 = "v = matrix(0, rows=nrow(X), cols=1)"
+ +"Y = X;"
+ + "parfor(i in 1:nrow(X), log=DEBUG) {"
+ + " v[i, ] = sum(X[i, ]+Y[i, ]);"
+ + "}"
+ + "r = sum(v);";
+ String s = multiInputs ? s2 : s1;
+
Script script = dml(s).in("X", df, mm).out("r");
MLResults results = ml.execute(script);
//compare aggregation results
double sum1 = results.getDouble("r");
- double sum2 = mbA.sum();
+ double sum2 = mbA.sum() * (multiInputs ? 2 : 1);
TestUtils.compareScalars(sum2, sum1, 0.000001);
}
catch(Exception ex) {
+ ex.printStackTrace();
throw new RuntimeException(ex);
}
finally {