You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2017/02/09 18:13:09 UTC

incubator-systemml git commit: [SYSTEMML-1234] Migrate FrameTest to new MLContext

Repository: incubator-systemml
Updated Branches:
  refs/heads/master d3cfcafcf -> 457a97db8


[SYSTEMML-1234] Migrate FrameTest to new MLContext

Migrate FrameTest from old MLContext API to new MLContext API.
Fix MLContextConversionUtil frameObjectToListStringIJV and
frameObjectToListStringCSV to not output null values.

Closes #380.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/457a97db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/457a97db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/457a97db

Branch: refs/heads/master
Commit: 457a97db8c7b5483a3750cc143c98c77a3196db5
Parents: d3cfcaf
Author: Deron Eriksson <de...@us.ibm.com>
Authored: Thu Feb 9 10:05:46 2017 -0800
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Thu Feb 9 10:05:46 2017 -0800

----------------------------------------------------------------------
 .../api/mlcontext/MLContextConversionUtil.java  |  20 ++-
 .../functions/mlcontext/FrameTest.java          | 167 ++++++++++++-------
 2 files changed, 115 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/457a97db/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index cca9d2c..5414e4d 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -1100,7 +1100,9 @@ public class MLContextConversionUtil {
 					if (j > 0) {
 						sb.append(delimiter);
 					}
-					sb.append(fb.get(i, j));
+					if (fb.get(i, j) != null) {
+						sb.append(fb.get(i, j));
+					}
 				}
 				list.add(sb.toString());
 			}
@@ -1185,13 +1187,15 @@ public class MLContextConversionUtil {
 			for (int i = 0; i < rows; i++) {
 				sb = new StringBuilder();
 				for (int j = 0; j < cols; j++) {
-					sb = new StringBuilder();
-					sb.append(i + 1);
-					sb.append(" ");
-					sb.append(j + 1);
-					sb.append(" ");
-					sb.append(fb.get(i, j));
-					list.add(sb.toString());
+					if (fb.get(i, j) != null) {
+						sb = new StringBuilder();
+						sb.append(i + 1);
+						sb.append(" ");
+						sb.append(j + 1);
+						sb.append(" ");
+						sb.append(fb.get(i, j));
+						list.add(sb.toString());
+					}
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/457a97db/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
index 1b29077..e6a947f 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
-import org.apache.spark.SparkContext;
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -38,9 +38,13 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.api.DMLException;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.api.MLContext;
-import org.apache.sysml.api.MLContextProxy;
-import org.apache.sysml.api.MLOutput;
+import org.apache.sysml.api.mlcontext.FrameFormat;
+import org.apache.sysml.api.mlcontext.FrameMetadata;
+import org.apache.sysml.api.mlcontext.FrameSchema;
+import org.apache.sysml.api.mlcontext.MLContext;
+import org.apache.sysml.api.mlcontext.MLResults;
+import org.apache.sysml.api.mlcontext.Script;
+import org.apache.sysml.api.mlcontext.ScriptFactory;
 import org.apache.sysml.parser.DataExpression;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.parser.ParseException;
@@ -49,7 +53,6 @@ import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
@@ -58,7 +61,10 @@ import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
 import org.apache.sysml.test.utils.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 
@@ -93,7 +99,21 @@ public class FrameTest extends AutomatedTestBase
 		schemaMixedLarge = new ValueType[schemaMixedLargeList.size()];
 		schemaMixedLarge = (ValueType[]) schemaMixedLargeList.toArray(schemaMixedLarge);
 	}
-	 
+
+	private static SparkConf conf;
+	private static JavaSparkContext sc;
+	private static MLContext ml;
+
+	@BeforeClass
+	public static void setUpClass() {
+		if (conf == null)
+			conf = SparkExecutionContext.createSystemMLSparkConf()
+				.setAppName("FrameTest").setMaster("local");
+		if (sc == null)
+			sc = new JavaSparkContext(conf);
+		ml = new MLContext(sc);
+	}
+
 	@Override
 	public void setUp() {
 		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, 
@@ -154,8 +174,6 @@ public class FrameTest extends AutomatedTestBase
 		RUNTIME_PLATFORM oldRT = DMLScript.rtplatform;
 		DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
 
-		this.scriptType = ScriptType.DML;
-		
 		int rowstart = 234, rowend = 1478, colstart = 125, colend = 568;
 		int bRows = rowend-rowstart+1, bCols = colend-colstart+1;
 
@@ -186,7 +204,6 @@ public class FrameTest extends AutomatedTestBase
 		proArgs.add(Integer.toString(colstartC));
 		proArgs.add(Integer.toString(colendC));
 		proArgs.add(output("C"));
-		programArgs = proArgs.toArray(new String[proArgs.size()]);
 		
 		fullDMLScriptName = SCRIPT_DIR + TEST_DIR + TEST_NAME + ".dml";
 
@@ -199,71 +216,75 @@ public class FrameTest extends AutomatedTestBase
 		rCmd = "Rscript" + " " + fullRScriptName + " " + 
 			inputDir() + " " + rowstart + " " + rowend + " " + colstart + " " + colend + " " + expectedDir()
 					  + " " + rowstartC + " " + rowendC + " " + colstartC + " " + colendC;
-		
-		double sparsity=sparsity1;//rand.nextDouble(); 
-        double[][] A = getRandomMatrix(rows, cols, min, max, sparsity, 1111 /*\\System.currentTimeMillis()*/);
-        writeInputFrameWithMTD("A", A, true, schema, oinfo);	        
-        
-        sparsity=sparsity2;//rand.nextDouble();
-        double[][] B = getRandomMatrix((int)(bRows), (int)(bCols), min, max, sparsity, 2345 /*System.currentTimeMillis()*/);
-        //Following way of creation causes serialization issue in frame processing
-        //List<ValueType> lschemaB = lschema.subList((int)colstart-1, (int)colend); 
-        ValueType[] schemaB = new ValueType[bCols];
-        for (int i = 0; i < bCols; ++i)
-        	schemaB[i] = schema[colstart-1+i];
+
+		double sparsity = sparsity1;
+		double[][] A = getRandomMatrix(rows, cols, min, max, sparsity, 1111);
+		writeInputFrameWithMTD("A", A, true, schema, oinfo);
+
+		sparsity = sparsity2;
+		double[][] B = getRandomMatrix((int) (bRows), (int) (bCols), min, max, sparsity, 2345);
+
+		ValueType[] schemaB = new ValueType[bCols];
+		for (int i = 0; i < bCols; ++i)
+			schemaB[i] = schema[colstart - 1 + i];
 		List<ValueType> lschemaB = Arrays.asList(schemaB);
-        writeInputFrameWithMTD("B", B, true, schemaB, oinfo);	        
+		writeInputFrameWithMTD("B", B, true, schemaB, oinfo);
+
+		ValueType[] schemaC = new ValueType[colendC - colstartC + 1];
+		for (int i = 0; i < cCols; ++i)
+			schemaC[i] = schema[colstartC - 1 + i];
 
-        ValueType[] schemaC = new ValueType[colendC-colstartC+1];
-        for (int i = 0; i < cCols; ++i)
-        	schemaC[i] = schema[colstartC-1+i];
-        
-		MLContext mlCtx = getMLContextForTesting();
-		SparkContext sc = mlCtx.getSparkContext();
-		JavaSparkContext jsc = new JavaSparkContext(sc);
-		
 		Dataset<Row> dfA = null, dfB = null; 
 		if(bFromDataFrame)
 		{
 			//Create DataFrame for input A 
 			SQLContext sqlContext = new SQLContext(sc);
 			StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema, false);
-			JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.csvToRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, schema);
+			JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.csvToRowRDD(sc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, schema);
 			dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA);
 			
 			//Create DataFrame for input B 
 			StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schemaB, false);
-			JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.csvToRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, schemaB);
+			JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.csvToRowRDD(sc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, schemaB);
 			dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB);
 		}
 
 		try 
 		{
-			mlCtx.reset(true); // Cleanup config to ensure future MLContext testcases have correct 'cp.parallel.matrixmult'
+			Script script = ScriptFactory.dmlFromFile(fullDMLScriptName);
 			
 			String format = "csv";
 			if(oinfo == OutputInfo.TextCellOutputInfo)
 				format = "text";
 
-			if(bFromDataFrame)
-				mlCtx.registerFrameInput("A", dfA, false);
-			else {
-				JavaRDD<String> aIn =  jsc.textFile(input("A"));
-				mlCtx.registerInput("A", aIn, format, rows, cols, new CSVFileFormatProperties(), lschema);
+			if(bFromDataFrame) {
+				script.in("A", dfA);
+			} else {
+				JavaRDD<String> aIn =  sc.textFile(input("A"));
+				FrameSchema fs = new FrameSchema(lschema);
+				FrameFormat ff = (format.equals("text")) ? FrameFormat.IJV : FrameFormat.CSV;
+				FrameMetadata fm = new FrameMetadata(ff, fs, rows, cols);
+				script.in("A", aIn, fm);
 			}
 
-			if(bFromDataFrame)
-				mlCtx.registerFrameInput("B", dfB, false);
-			else {
-				JavaRDD<String> bIn =  jsc.textFile(input("B"));
-				mlCtx.registerInput("B", bIn, format, bRows, bCols, new CSVFileFormatProperties(), lschemaB);
+			if(bFromDataFrame) {
+				script.in("B", dfB);
+			} else {
+				JavaRDD<String> bIn =  sc.textFile(input("B"));
+				FrameSchema fs = new FrameSchema(lschemaB);
+				FrameFormat ff = (format.equals("text")) ? FrameFormat.IJV : FrameFormat.CSV;
+				FrameMetadata fm = new FrameMetadata(ff, fs, bRows, bCols);
+				script.in("B", bIn, fm);
 			}
 
 			// Output one frame to HDFS and get one as RDD //TODO HDFS input/output to do
-			mlCtx.registerOutput("A");
-			mlCtx.registerOutput("C");
+			script.out("A", "C");
 			
-			MLOutput out = mlCtx.execute(fullDMLScriptName, programArgs);
+			// set positional argument values
+			for (int argNum = 1; argNum <= proArgs.size(); argNum++) {
+				script.in("$" + argNum, proArgs.get(argNum-1));
+			}
+			MLResults results = ml.execute(script);
 			
 			format = "csv";
 			if(iinfo == InputInfo.TextCellInputInfo)
@@ -278,15 +299,20 @@ public class FrameTest extends AutomatedTestBase
 			
 			if(!bToDataFrame)
 			{
-				JavaRDD<String> aOut = out.getStringFrameRDD("A", format, new CSVFileFormatProperties());
-				aOut.saveAsTextFile(fName);
+				if (format.equals("text")) {
+					JavaRDD<String> javaRDDStringIJV = results.getJavaRDDStringIJV("A");
+					javaRDDStringIJV.saveAsTextFile(fName);
+				} else {
+					JavaRDD<String> javaRDDStringCSV = results.getJavaRDDStringCSV("A");
+					javaRDDStringCSV.saveAsTextFile(fName);
+				}
 			} else {
-				Dataset<Row> df = out.getDataFrameRDD("A", jsc);
+				Dataset<Row> df = results.getDataFrame("A");
 				
 				//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary 
 				MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, -1, -1, -1);
 				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
-						.dataFrameToBinaryBlock(jsc, df, mc, bFromDataFrame)
+						.dataFrameToBinaryBlock(sc, df, mc, bFromDataFrame)
 						.mapToPair(new LongFrameToLongWritableFrameFunction());
 				rddOut.saveAsHadoopFile(output("AB"), LongWritable.class, FrameBlock.class, OutputInfo.BinaryBlockOutputInfo.outputFormatClass);
 			}
@@ -299,15 +325,20 @@ public class FrameTest extends AutomatedTestBase
 			} 
 			if(!bToDataFrame)
 			{
-				JavaRDD<String> aOut = out.getStringFrameRDD("C", format, new CSVFileFormatProperties());
-				aOut.saveAsTextFile(fName);
+				if (format.equals("text")) {
+					JavaRDD<String> javaRDDStringIJV = results.getJavaRDDStringIJV("C");
+					javaRDDStringIJV.saveAsTextFile(fName);
+				} else {
+					JavaRDD<String> javaRDDStringCSV = results.getJavaRDDStringCSV("C");
+					javaRDDStringCSV.saveAsTextFile(fName);
+				}
 			} else {
-				Dataset<Row> df = out.getDataFrameRDD("C", jsc);
+				Dataset<Row> df = results.getDataFrame("C");
 				
 				//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary 
 				MatrixCharacteristics mc = new MatrixCharacteristics(cRows, cCols, -1, -1, -1);
 				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
-						.dataFrameToBinaryBlock(jsc, df, mc, bFromDataFrame)
+						.dataFrameToBinaryBlock(sc, df, mc, bFromDataFrame)
 						.mapToPair(new LongFrameToLongWritableFrameFunction());
 				rddOut.saveAsHadoopFile(fName, LongWritable.class, FrameBlock.class, OutputInfo.BinaryBlockOutputInfo.outputFormatClass);
 			}
@@ -329,20 +360,11 @@ public class FrameTest extends AutomatedTestBase
 				System.out.println("File " + file +  " processed successfully.");
 			}
 
-			//cleanup mlcontext (prevent test memory leaks)
-			mlCtx.reset();
-			
 			System.out.println("Frame MLContext test completed successfully.");
 		}
 		finally {
 			DMLScript.rtplatform = oldRT;
 			DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
-
-			if (sc != null) {
-				sc.stop();
-			}
-			SparkExecutionContext.resetSparkContextStatic();
-			MLContextProxy.setActive(false);
 		}
 	}
 	
@@ -357,4 +379,21 @@ public class FrameTest extends AutomatedTestBase
 			}
 	}
 
-}
\ No newline at end of file
+	@After
+	public void tearDown() {
+		super.tearDown();
+	}
+
+	@AfterClass
+	public static void tearDownClass() {
+		// stop spark context to allow single jvm tests (otherwise the
+		// next test that tries to create a SparkContext would fail)
+		sc.stop();
+		sc = null;
+		conf = null;
+
+		// clear status mlcontext and spark exec context
+		ml.close();
+		ml = null;
+	}
+}