You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2017/02/09 18:13:09 UTC
incubator-systemml git commit: [SYSTEMML-1234] Migrate FrameTest to
new MLContext
Repository: incubator-systemml
Updated Branches:
refs/heads/master d3cfcafcf -> 457a97db8
[SYSTEMML-1234] Migrate FrameTest to new MLContext
Migrate FrameTest from old MLContext API to new MLContext API.
Fix MLContextConversionUtil frameObjectToListStringIJV and
frameObjectToListStringCSV to not output null values.
Closes #380.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/457a97db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/457a97db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/457a97db
Branch: refs/heads/master
Commit: 457a97db8c7b5483a3750cc143c98c77a3196db5
Parents: d3cfcaf
Author: Deron Eriksson <de...@us.ibm.com>
Authored: Thu Feb 9 10:05:46 2017 -0800
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Thu Feb 9 10:05:46 2017 -0800
----------------------------------------------------------------------
.../api/mlcontext/MLContextConversionUtil.java | 20 ++-
.../functions/mlcontext/FrameTest.java | 167 ++++++++++++-------
2 files changed, 115 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/457a97db/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index cca9d2c..5414e4d 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -1100,7 +1100,9 @@ public class MLContextConversionUtil {
if (j > 0) {
sb.append(delimiter);
}
- sb.append(fb.get(i, j));
+ if (fb.get(i, j) != null) {
+ sb.append(fb.get(i, j));
+ }
}
list.add(sb.toString());
}
@@ -1185,13 +1187,15 @@ public class MLContextConversionUtil {
for (int i = 0; i < rows; i++) {
sb = new StringBuilder();
for (int j = 0; j < cols; j++) {
- sb = new StringBuilder();
- sb.append(i + 1);
- sb.append(" ");
- sb.append(j + 1);
- sb.append(" ");
- sb.append(fb.get(i, j));
- list.add(sb.toString());
+ if (fb.get(i, j) != null) {
+ sb = new StringBuilder();
+ sb.append(i + 1);
+ sb.append(" ");
+ sb.append(j + 1);
+ sb.append(" ");
+ sb.append(fb.get(i, j));
+ list.add(sb.toString());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/457a97db/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
index 1b29077..e6a947f 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
-import org.apache.spark.SparkContext;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -38,9 +38,13 @@ import org.apache.spark.sql.types.StructType;
import org.apache.sysml.api.DMLException;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.api.MLContext;
-import org.apache.sysml.api.MLContextProxy;
-import org.apache.sysml.api.MLOutput;
+import org.apache.sysml.api.mlcontext.FrameFormat;
+import org.apache.sysml.api.mlcontext.FrameMetadata;
+import org.apache.sysml.api.mlcontext.FrameSchema;
+import org.apache.sysml.api.mlcontext.MLContext;
+import org.apache.sysml.api.mlcontext.MLResults;
+import org.apache.sysml.api.mlcontext.Script;
+import org.apache.sysml.api.mlcontext.ScriptFactory;
import org.apache.sysml.parser.DataExpression;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.parser.ParseException;
@@ -49,7 +53,6 @@ import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
@@ -58,7 +61,10 @@ import org.apache.sysml.runtime.util.UtilFunctions;
import org.apache.sysml.test.integration.AutomatedTestBase;
import org.apache.sysml.test.integration.TestConfiguration;
import org.apache.sysml.test.utils.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
@@ -93,7 +99,21 @@ public class FrameTest extends AutomatedTestBase
schemaMixedLarge = new ValueType[schemaMixedLargeList.size()];
schemaMixedLarge = (ValueType[]) schemaMixedLargeList.toArray(schemaMixedLarge);
}
-
+
+ private static SparkConf conf;
+ private static JavaSparkContext sc;
+ private static MLContext ml;
+
+ @BeforeClass
+ public static void setUpClass() {
+ if (conf == null)
+ conf = SparkExecutionContext.createSystemMLSparkConf()
+ .setAppName("FrameTest").setMaster("local");
+ if (sc == null)
+ sc = new JavaSparkContext(conf);
+ ml = new MLContext(sc);
+ }
+
@Override
public void setUp() {
addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME,
@@ -154,8 +174,6 @@ public class FrameTest extends AutomatedTestBase
RUNTIME_PLATFORM oldRT = DMLScript.rtplatform;
DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
- this.scriptType = ScriptType.DML;
-
int rowstart = 234, rowend = 1478, colstart = 125, colend = 568;
int bRows = rowend-rowstart+1, bCols = colend-colstart+1;
@@ -186,7 +204,6 @@ public class FrameTest extends AutomatedTestBase
proArgs.add(Integer.toString(colstartC));
proArgs.add(Integer.toString(colendC));
proArgs.add(output("C"));
- programArgs = proArgs.toArray(new String[proArgs.size()]);
fullDMLScriptName = SCRIPT_DIR + TEST_DIR + TEST_NAME + ".dml";
@@ -199,71 +216,75 @@ public class FrameTest extends AutomatedTestBase
rCmd = "Rscript" + " " + fullRScriptName + " " +
inputDir() + " " + rowstart + " " + rowend + " " + colstart + " " + colend + " " + expectedDir()
+ " " + rowstartC + " " + rowendC + " " + colstartC + " " + colendC;
-
- double sparsity=sparsity1;//rand.nextDouble();
- double[][] A = getRandomMatrix(rows, cols, min, max, sparsity, 1111 /*\\System.currentTimeMillis()*/);
- writeInputFrameWithMTD("A", A, true, schema, oinfo);
-
- sparsity=sparsity2;//rand.nextDouble();
- double[][] B = getRandomMatrix((int)(bRows), (int)(bCols), min, max, sparsity, 2345 /*System.currentTimeMillis()*/);
- //Following way of creation causes serialization issue in frame processing
- //List<ValueType> lschemaB = lschema.subList((int)colstart-1, (int)colend);
- ValueType[] schemaB = new ValueType[bCols];
- for (int i = 0; i < bCols; ++i)
- schemaB[i] = schema[colstart-1+i];
+
+ double sparsity = sparsity1;
+ double[][] A = getRandomMatrix(rows, cols, min, max, sparsity, 1111);
+ writeInputFrameWithMTD("A", A, true, schema, oinfo);
+
+ sparsity = sparsity2;
+ double[][] B = getRandomMatrix((int) (bRows), (int) (bCols), min, max, sparsity, 2345);
+
+ ValueType[] schemaB = new ValueType[bCols];
+ for (int i = 0; i < bCols; ++i)
+ schemaB[i] = schema[colstart - 1 + i];
List<ValueType> lschemaB = Arrays.asList(schemaB);
- writeInputFrameWithMTD("B", B, true, schemaB, oinfo);
+ writeInputFrameWithMTD("B", B, true, schemaB, oinfo);
+
+ ValueType[] schemaC = new ValueType[colendC - colstartC + 1];
+ for (int i = 0; i < cCols; ++i)
+ schemaC[i] = schema[colstartC - 1 + i];
- ValueType[] schemaC = new ValueType[colendC-colstartC+1];
- for (int i = 0; i < cCols; ++i)
- schemaC[i] = schema[colstartC-1+i];
-
- MLContext mlCtx = getMLContextForTesting();
- SparkContext sc = mlCtx.getSparkContext();
- JavaSparkContext jsc = new JavaSparkContext(sc);
-
Dataset<Row> dfA = null, dfB = null;
if(bFromDataFrame)
{
//Create DataFrame for input A
SQLContext sqlContext = new SQLContext(sc);
StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema, false);
- JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.csvToRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, schema);
+ JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.csvToRowRDD(sc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, schema);
dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA);
//Create DataFrame for input B
StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schemaB, false);
- JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.csvToRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, schemaB);
+ JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.csvToRowRDD(sc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, schemaB);
dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB);
}
try
{
- mlCtx.reset(true); // Cleanup config to ensure future MLContext testcases have correct 'cp.parallel.matrixmult'
+ Script script = ScriptFactory.dmlFromFile(fullDMLScriptName);
String format = "csv";
if(oinfo == OutputInfo.TextCellOutputInfo)
format = "text";
- if(bFromDataFrame)
- mlCtx.registerFrameInput("A", dfA, false);
- else {
- JavaRDD<String> aIn = jsc.textFile(input("A"));
- mlCtx.registerInput("A", aIn, format, rows, cols, new CSVFileFormatProperties(), lschema);
+ if(bFromDataFrame) {
+ script.in("A", dfA);
+ } else {
+ JavaRDD<String> aIn = sc.textFile(input("A"));
+ FrameSchema fs = new FrameSchema(lschema);
+ FrameFormat ff = (format.equals("text")) ? FrameFormat.IJV : FrameFormat.CSV;
+ FrameMetadata fm = new FrameMetadata(ff, fs, rows, cols);
+ script.in("A", aIn, fm);
}
- if(bFromDataFrame)
- mlCtx.registerFrameInput("B", dfB, false);
- else {
- JavaRDD<String> bIn = jsc.textFile(input("B"));
- mlCtx.registerInput("B", bIn, format, bRows, bCols, new CSVFileFormatProperties(), lschemaB);
+ if(bFromDataFrame) {
+ script.in("B", dfB);
+ } else {
+ JavaRDD<String> bIn = sc.textFile(input("B"));
+ FrameSchema fs = new FrameSchema(lschemaB);
+ FrameFormat ff = (format.equals("text")) ? FrameFormat.IJV : FrameFormat.CSV;
+ FrameMetadata fm = new FrameMetadata(ff, fs, bRows, bCols);
+ script.in("B", bIn, fm);
}
// Output one frame to HDFS and get one as RDD //TODO HDFS input/output to do
- mlCtx.registerOutput("A");
- mlCtx.registerOutput("C");
+ script.out("A", "C");
- MLOutput out = mlCtx.execute(fullDMLScriptName, programArgs);
+ // set positional argument values
+ for (int argNum = 1; argNum <= proArgs.size(); argNum++) {
+ script.in("$" + argNum, proArgs.get(argNum-1));
+ }
+ MLResults results = ml.execute(script);
format = "csv";
if(iinfo == InputInfo.TextCellInputInfo)
@@ -278,15 +299,20 @@ public class FrameTest extends AutomatedTestBase
if(!bToDataFrame)
{
- JavaRDD<String> aOut = out.getStringFrameRDD("A", format, new CSVFileFormatProperties());
- aOut.saveAsTextFile(fName);
+ if (format.equals("text")) {
+ JavaRDD<String> javaRDDStringIJV = results.getJavaRDDStringIJV("A");
+ javaRDDStringIJV.saveAsTextFile(fName);
+ } else {
+ JavaRDD<String> javaRDDStringCSV = results.getJavaRDDStringCSV("A");
+ javaRDDStringCSV.saveAsTextFile(fName);
+ }
} else {
- Dataset<Row> df = out.getDataFrameRDD("A", jsc);
+ Dataset<Row> df = results.getDataFrame("A");
//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary
MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, -1, -1, -1);
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
- .dataFrameToBinaryBlock(jsc, df, mc, bFromDataFrame)
+ .dataFrameToBinaryBlock(sc, df, mc, bFromDataFrame)
.mapToPair(new LongFrameToLongWritableFrameFunction());
rddOut.saveAsHadoopFile(output("AB"), LongWritable.class, FrameBlock.class, OutputInfo.BinaryBlockOutputInfo.outputFormatClass);
}
@@ -299,15 +325,20 @@ public class FrameTest extends AutomatedTestBase
}
if(!bToDataFrame)
{
- JavaRDD<String> aOut = out.getStringFrameRDD("C", format, new CSVFileFormatProperties());
- aOut.saveAsTextFile(fName);
+ if (format.equals("text")) {
+ JavaRDD<String> javaRDDStringIJV = results.getJavaRDDStringIJV("C");
+ javaRDDStringIJV.saveAsTextFile(fName);
+ } else {
+ JavaRDD<String> javaRDDStringCSV = results.getJavaRDDStringCSV("C");
+ javaRDDStringCSV.saveAsTextFile(fName);
+ }
} else {
- Dataset<Row> df = out.getDataFrameRDD("C", jsc);
+ Dataset<Row> df = results.getDataFrame("C");
//Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary
MatrixCharacteristics mc = new MatrixCharacteristics(cRows, cCols, -1, -1, -1);
JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils
- .dataFrameToBinaryBlock(jsc, df, mc, bFromDataFrame)
+ .dataFrameToBinaryBlock(sc, df, mc, bFromDataFrame)
.mapToPair(new LongFrameToLongWritableFrameFunction());
rddOut.saveAsHadoopFile(fName, LongWritable.class, FrameBlock.class, OutputInfo.BinaryBlockOutputInfo.outputFormatClass);
}
@@ -329,20 +360,11 @@ public class FrameTest extends AutomatedTestBase
System.out.println("File " + file + " processed successfully.");
}
- //cleanup mlcontext (prevent test memory leaks)
- mlCtx.reset();
-
System.out.println("Frame MLContext test completed successfully.");
}
finally {
DMLScript.rtplatform = oldRT;
DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
-
- if (sc != null) {
- sc.stop();
- }
- SparkExecutionContext.resetSparkContextStatic();
- MLContextProxy.setActive(false);
}
}
@@ -357,4 +379,21 @@ public class FrameTest extends AutomatedTestBase
}
}
-}
\ No newline at end of file
+ @After
+ public void tearDown() {
+ super.tearDown();
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ // stop spark context to allow single jvm tests (otherwise the
+ // next test that tries to create a SparkContext would fail)
+ sc.stop();
+ sc = null;
+ conf = null;
+
+ // clear status mlcontext and spark exec context
+ ml.close();
+ ml = null;
+ }
+}