You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/04/26 00:19:54 UTC

incubator-systemml git commit: [SYSTEMML-1466] Fix rdd status handling for exports and writes, tests

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 2d2196d84 -> eb35b2c90


[SYSTEMML-1466] Fix rdd status handling for exports and writes, tests

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/eb35b2c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/eb35b2c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/eb35b2c9

Branch: refs/heads/master
Commit: eb35b2c90cbacc674793ca4aed4583273d00fa87
Parents: 2d2196d
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue Apr 25 15:03:49 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue Apr 25 16:50:50 2017 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/CacheableData.java   | 11 +++--
 .../controlprogram/caching/MatrixObject.java    |  3 ++
 .../instructions/spark/data/RDDObject.java      | 11 ++++-
 .../mlcontext/MLContextParforDatasetTest.java   | 43 ++++++++++++++++----
 4 files changed, 57 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index 904eb87..c1a024a 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -204,7 +204,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 		_uniqueID = (int)_seq.getNextID();		
 		_cacheStatus = CacheStatus.EMPTY;
 		_numReadThreads = 0;
-		_gpuObjects = new HashMap<>();
+		_gpuObjects = new HashMap<GPUContext, GPUObject>();
 	}
 	
 	/**
@@ -835,14 +835,19 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 				throw new CacheException ("Export to " + fName + " failed.", e);
 			}
 		}
-		else if( getRDDHandle()!=null && //pending rdd operation
-				!getRDDHandle().allowsShortCircuitRead() )
+		else if( getRDDHandle()!=null && getRDDHandle().isPending()
+			&& !getRDDHandle().isHDFSFile() 
+			&& !getRDDHandle().allowsShortCircuitRead() )
 		{
 			//CASE 3: pending rdd operation (other than checkpoints)
 			try
 			{
+				//write matrix or frame
 				writeBlobFromRDDtoHDFS(getRDDHandle(), fName, outputFormat);
 				writeMetaData( fName, outputFormat, formatProperties );
+
+				//update rdd status
+				getRDDHandle().setPending(false);
 			}
 			catch (Exception e) {
 				throw new CacheException ("Export to " + fName + " failed.", e);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index 4e560c8..94bdb2d 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -488,8 +488,11 @@ public class MatrixObject extends CacheableData<MatrixBlock>
 				if( !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing file
 					long newnnz = SparkExecutionContext.writeRDDtoHDFS(lrdd, _hdfsFileName, iimd.getOutputInfo());
 					((MatrixDimensionsMetaData) _metaData).getMatrixCharacteristics().setNonZeros(newnnz);
+					((RDDObject)rdd).setPending(false); //mark rdd as non-pending (for export)
 					((RDDObject)rdd).setHDFSFile(true); //mark rdd as hdfs file (for restore)
 					writeStatus.setValue(true);         //mark for no cache-write on read
+					//note: the flag hdfsFile is actually not entirely correct because we still hold an rdd 
+					//reference to the input not to an rdd of the hdfs file but the resulting behavior is correct
 				}
 				mb = readBlobFromHDFS(_hdfsFileName);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
index 67f97a3..0a52323 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
@@ -29,7 +29,8 @@ public class RDDObject extends LineageObject
 	private boolean _checkpointed = false; //created via checkpoint instruction
 	private boolean _hdfsfile = false;     //created from hdfs file
 	private String  _hdfsFname = null;     //hdfs filename, if created from hdfs.  
-	private boolean _parRDD = false;
+	private boolean _parRDD = false;       //is a parallelized rdd at driver
+	private boolean _pending = true;       //is a pending rdd operation
 	
 	public RDDObject( JavaPairRDD<?,?> rddvar, String varName) {
 		super(varName);
@@ -72,6 +73,14 @@ public class RDDObject extends LineageObject
 		return _parRDD; 
 	}
 	
+	public void setPending(boolean flag) {
+		_pending = flag;
+	}
+	
+	public boolean isPending() {
+		return _pending;
+	}
+	
 
 	/**
 	 * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
index 41b8d16..36e7990 100644
--- a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
@@ -80,25 +80,45 @@ public class MLContextParforDatasetTest extends AutomatedTestBase
 
 	@Test
 	public void testParforDatasetVector() {
-		runMLContextParforDatasetTest(true, false);
+		runMLContextParforDatasetTest(true, false, false);
 	}
 	
 	@Test
 	public void testParforDatasetRow() {
-		runMLContextParforDatasetTest(false, false);
+		runMLContextParforDatasetTest(false, false, false);
 	}
 	
 	@Test
 	public void testParforDatasetVectorUnkownDims() {
-		runMLContextParforDatasetTest(true, true);
+		runMLContextParforDatasetTest(true, true, false);
 	}
 	
 	@Test
 	public void testParforDatasetRowUnknownDims() {
-		runMLContextParforDatasetTest(false, true);
+		runMLContextParforDatasetTest(false, true, false);
 	}
 	
-	private void runMLContextParforDatasetTest(boolean vector, boolean unknownDims) 
+	@Test
+	public void testParforDatasetVectorMulti() {
+		runMLContextParforDatasetTest(true, false, true);
+	}
+	
+	@Test
+	public void testParforDatasetRowMulti() {
+		runMLContextParforDatasetTest(false, false, true);
+	}
+	
+	@Test
+	public void testParforDatasetVectorUnkownDimsMulti() {
+		runMLContextParforDatasetTest(true, true, true);
+	}
+	
+	@Test
+	public void testParforDatasetRowUnknownDimsMulti() {
+		runMLContextParforDatasetTest(false, true, true);
+	}
+	
+	private void runMLContextParforDatasetTest(boolean vector, boolean unknownDims, boolean multiInputs) 
 	{
 		//modify memory budget to trigger fused datapartition-execute
 		long oldmem = InfrastructureAnalyzer.getLocalMaxMemory();
@@ -119,21 +139,30 @@ public class MLContextParforDatasetTest extends AutomatedTestBase
 			MatrixMetadata mm = new MatrixMetadata(vector ? MatrixFormat.DF_VECTOR_WITH_INDEX : MatrixFormat.DF_DOUBLES_WITH_INDEX);
 			mm.setMatrixCharacteristics(mc2);
 			
-			String s = "v = matrix(0, rows=nrow(X), cols=1)"
+			String s1 = "v = matrix(0, rows=nrow(X), cols=1)"
 					+ "parfor(i in 1:nrow(X), log=DEBUG) {"
 					+ "   v[i, ] = sum(X[i, ]);"
 					+ "}"
 					+ "r = sum(v);";
+			String s2 = "v = matrix(0, rows=nrow(X), cols=1)"
+					+"Y = X;"
+					+ "parfor(i in 1:nrow(X), log=DEBUG) {"
+					+ "   v[i, ] = sum(X[i, ]+Y[i, ]);"
+					+ "}"
+					+ "r = sum(v);";
+			String s = multiInputs ? s2 : s1;
+			
 			Script script = dml(s).in("X", df, mm).out("r");
 			MLResults results = ml.execute(script);
 			
 			//compare aggregation results
 			double sum1 = results.getDouble("r");
-			double sum2 = mbA.sum();
+			double sum2 = mbA.sum() * (multiInputs ? 2 : 1);
 			
 			TestUtils.compareScalars(sum2, sum1, 0.000001);
 		}
 		catch(Exception ex) {
+			ex.printStackTrace();
 			throw new RuntimeException(ex);
 		}
 		finally {