You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/08/29 15:42:03 UTC

incubator-systemml git commit: [SYSTEMML-568] Fix various issues w/ mlcontext frame support

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 02a9f2770 -> 1be911cc5


[SYSTEMML-568] Fix various issues w/ mlcontext frame support

The commit 02a9f277000bd144c729311dac6c04bcb520180f introduces a couple
of major and minor issues. This patch resolves resolves the following:

(1) Removed spark references from general-purpose util functions. These
references would create classnotfound exceptions in non-spark
environments.

(2) Missing delimiter handling (other than comma) in custom csv frame
conversions. 

(3) Logging instead of prints to stdout which are inconsistent with the
rest of the system and would make debugging much harder.

(4) Removed redundant code for handling of scratch space and blocksize
configuration from mlcontext utils.

(5) Removed unnecessary rdd operations from mlcontext result handling.

(6) Fix handling of csv properties in mlcontext csv frame conversion
(which only used hard-coded defaults so far) as well as handling of
quoted strings.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1be911cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1be911cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1be911cc

Branch: refs/heads/master
Commit: 1be911cc52b4217308faa074d2b0ac4d9b86c540
Parents: 02a9f27
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Aug 29 17:41:25 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Mon Aug 29 17:41:25 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/MLContext.java    | 21 ++---
 .../java/org/apache/sysml/api/MLOutput.java     | 16 +---
 .../sysml/api/mlcontext/MLContextUtil.java      |  8 +-
 .../spark/utils/FrameRDDConverterUtils.java     | 73 ++++++++++++++++-
 .../sysml/runtime/util/UtilFunctions.java       | 84 --------------------
 .../functions/frame/FrameConverterTest.java     |  4 +-
 .../functions/mlcontext/FrameTest.java          |  8 +-
 7 files changed, 96 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java
index 8f6e95f..297d3a5 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -524,16 +524,16 @@ public class MLContext {
 		
 		MatrixObject mo;
 		if( format.equals("csv") ) {
-			//TODO replace default block size
-			MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, OptimizerUtils.DEFAULT_BLOCKSIZE, OptimizerUtils.DEFAULT_BLOCKSIZE, nnz);
+			int blksz = ConfigurationManager.getBlocksize();
+			MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, blksz, blksz, nnz);
 			mo = new MatrixObject(ValueType.DOUBLE, null, new MatrixFormatMetaData(mc, OutputInfo.CSVOutputInfo, InputInfo.CSVInputInfo));
 		}
 		else if( format.equals("text") ) {
 			if(rlen == -1 || clen == -1) {
 				throw new DMLRuntimeException("The metadata is required in registerInput for format:" + format);
 			}
-			//TODO replace default block size
-			MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, OptimizerUtils.DEFAULT_BLOCKSIZE, OptimizerUtils.DEFAULT_BLOCKSIZE, nnz);
+			int blksz = ConfigurationManager.getBlocksize();
+			MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, blksz, blksz, nnz);
 			mo = new MatrixObject(ValueType.DOUBLE, null, new MatrixFormatMetaData(mc, OutputInfo.TextCellOutputInfo, InputInfo.TextCellInputInfo));
 		}
 		else if( format.equals("mm") ) {
@@ -583,21 +583,24 @@ public class MLContext {
 		if(_inVarnames == null)
 			_inVarnames = new ArrayList<String>();
 		
+		//FIXME: MB why does the register input for frames implicitly convert the data to binary block,
+		//while the register input for matrices does not? FIXME
+		
 		JavaPairRDD<LongWritable, Text> rddText = rddIn.mapToPair(new ConvertStringToLongTextPair());
 		
-		MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, OptimizerUtils.DEFAULT_BLOCKSIZE, OptimizerUtils.DEFAULT_BLOCKSIZE, nnz);
+		int blksz = ConfigurationManager.getBlocksize();
+		MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, blksz, blksz, nnz);
 		FrameObject fo = new FrameObject(null, new MatrixFormatMetaData(mc, OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo));
 		JavaPairRDD<Long, FrameBlock> rdd = null; 
 		if( format.equals("csv") ) {
-			//TODO replace default block size
-			
-			rdd = FrameRDDConverterUtils.csvToBinaryBlock(new JavaSparkContext(getSparkContext()), rddText, mc, false, ",", false, -1);
+			CSVFileFormatProperties csvprops = (props!=null) ? (CSVFileFormatProperties)props: new CSVFileFormatProperties();
+			rdd = FrameRDDConverterUtils.csvToBinaryBlock(new JavaSparkContext(getSparkContext()), 
+					rddText, mc, csvprops.hasHeader(), csvprops.getDelim(), csvprops.isFill(), csvprops.getFillValue());
 		}
 		else if( format.equals("text") ) {
 			if(rlen == -1 || clen == -1) {
 				throw new DMLRuntimeException("The metadata is required in registerInput for format:" + format);
 			}
-			//TODO replace default block size
 			rdd = FrameRDDConverterUtils.textCellToBinaryBlock(new JavaSparkContext(getSparkContext()), rddText, mc, schema);
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java
index 916a652..a18ab60 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -79,25 +79,17 @@ public class MLOutput {
 	@SuppressWarnings("unchecked")
 	public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockedRDD(String varName) throws DMLRuntimeException {
 		if(_outputs.containsKey(varName)) {
-			JavaPairRDD<?,?> tmp = _outputs.get(varName);
-			if (tmp.first()._2() instanceof MatrixBlock)
-				return (JavaPairRDD<MatrixIndexes,MatrixBlock>)tmp;
-			else
-				return null;
+			return (JavaPairRDD<MatrixIndexes,MatrixBlock>) _outputs.get(varName);
 		}
-		throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
+		throw new DMLRuntimeException("Variable " + varName + " not found in the outputs.");
 	}
 	
 	@SuppressWarnings("unchecked")
 	public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockedRDD(String varName) throws DMLRuntimeException {
 		if(_outputs.containsKey(varName)) {
-			JavaPairRDD<?,?> tmp = _outputs.get(varName);
-			if (tmp.first()._2() instanceof FrameBlock)
-				return (JavaPairRDD<Long,FrameBlock>)tmp;
-			else
-				return null;
+			return (JavaPairRDD<Long,FrameBlock>)_outputs.get(varName);
 		}
-		throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table.");
+		throw new DMLRuntimeException("Variable " + varName + " not found in the outputs.");
 	}
 	
 	public MatrixCharacteristics getMatrixCharacteristics(String varName) throws DMLRuntimeException {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
index 120df32..6da6468 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
@@ -515,9 +515,7 @@ public final class MLContextUtil {
 	 * @return the default matrix block size
 	 */
 	public static int defaultBlockSize() {
-		DMLConfig conf = ConfigurationManager.getDMLConfig();
-		int blockSize = conf.getIntValue(DMLConfig.DEFAULT_BLOCK_SIZE);
-		return blockSize;
+		return ConfigurationManager.getBlocksize();
 	}
 
 	/**
@@ -526,9 +524,7 @@ public final class MLContextUtil {
 	 * @return the lcoation of the scratch space directory
 	 */
 	public static String scratchSpace() {
-		DMLConfig conf = ConfigurationManager.getDMLConfig();
-		String scratchSpace = conf.getTextValue(DMLConfig.SCRATCH_SPACE);
-		return scratchSpace;
+		return ConfigurationManager.getScratchSpace(); 
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 3bd163c..fa8c48f 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -27,6 +27,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Arrays;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -69,6 +71,8 @@ import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class FrameRDDConverterUtils 
 {
+	private static final Log LOG = LogFactory.getLog(FrameRDDConverterUtils.class.getName());
+	
 	//=====================================
 	// CSV <--> Binary block
 
@@ -363,12 +367,79 @@ public class FrameRDDConverterUtils
 		JavaRDD<Row> rowRDD = in.flatMap(new BinaryBlockToDataFrameFunction());
 				
 		SQLContext sqlContext = new SQLContext(sc);
-		StructType dfSchema = UtilFunctions.convertFrameSchemaToDFSchema(schema);
+		StructType dfSchema = convertFrameSchemaToDFSchema(schema);
 		DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
 	
 		return df;
 	}
 	
+	
+	/*
+	 * This function will convert Frame schema into DataFrame schema 
+	 * 
+	 *  @param	schema
+	 *  		Frame schema in the form of List<ValueType>
+	 *  @return
+	 *  		Returns the DataFrame schema (StructType)
+	 */
+	public static StructType convertFrameSchemaToDFSchema(List<ValueType> lschema)
+	{
+		// Generate the schema based on the string of schema
+		List<StructField> fields = new ArrayList<StructField>();
+		
+		int i = 1;
+		for (ValueType schema : lschema) {
+			org.apache.spark.sql.types.DataType dataType = DataTypes.StringType;
+			switch(schema) {
+				case STRING:  dataType = DataTypes.StringType; break;
+				case DOUBLE:  dataType = DataTypes.DoubleType; break;
+				case INT:     dataType = DataTypes.LongType; break;
+				case BOOLEAN: dataType = DataTypes.BooleanType; break;
+				default:
+					LOG.warn("Using default type String for " + schema.toString());
+			}
+			fields.add(DataTypes.createStructField("C"+i++, dataType, true));
+		}
+		
+		return DataTypes.createStructType(fields);
+	}
+	
+	/* 
+	 * It will return JavaRDD<Row> based on csv data input file.
+	 */
+	public static JavaRDD<Row> getRowRDD(JavaSparkContext sc, String fnameIn, String delim, List<ValueType> schema)
+	{
+		// Load a text file and convert each line to a java rdd.
+		JavaRDD<String> dataRdd = sc.textFile(fnameIn);
+		return dataRdd.map(new RowGenerator(schema, delim));
+	}
+	
+	/* 
+	 * Row Generator class based on individual line in CSV file.
+	 */
+	private static class RowGenerator implements Function<String,Row> 
+	{
+		private static final long serialVersionUID = -6736256507697511070L;
+
+		private List<ValueType> _schema = null;
+		private String _delim = null; 
+		
+		public RowGenerator(List<ValueType> schema, String delim) {
+			_schema = schema;
+			_delim = delim;
+		}		
+		 
+		@Override
+		public Row call(String record) throws Exception {
+		      String[] fields = IOUtilFunctions.splitCSV(record, _delim);
+		      Object[] objects = new Object[fields.length]; 
+		      for (int i=0; i<fields.length; i++) {
+			      objects[i] = UtilFunctions.stringToObject(_schema.get(i), fields[i]);
+		      }
+		      return RowFactory.create(objects);
+		}
+	}
+	
 
 	/////////////////////////////////
 	// CSV-SPECIFIC FUNCTIONS

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 4b98f88..fa17fcd 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -23,14 +23,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -623,80 +615,4 @@ public class UtilFunctions
 	
 		return in1.getDataType();
 	}
-	
-	/*
-	 * This function will convert Frame schema into DataFrame schema 
-	 * 
-	 *  @param	schema
-	 *  		Frame schema in the form of List<ValueType>
-	 *  @return
-	 *  		Returns the DataFrame schema (StructType)
-	 */
-	public static StructType convertFrameSchemaToDFSchema(List<ValueType> lschema)
-	{
-		// Generate the schema based on the string of schema
-		List<StructField> fields = new ArrayList<StructField>();
-		
-		int i = 1;
-		for (ValueType schema : lschema)
-		{
-			org.apache.spark.sql.types.DataType dataType = DataTypes.StringType;
-			switch(schema)
-			{
-				case STRING:
-					dataType = DataTypes.StringType;
-					break;
-				case DOUBLE:
-					dataType = DataTypes.DoubleType;
-					break;
-				case INT:
-					dataType = DataTypes.LongType;
-					break;
-				case BOOLEAN:
-					dataType = DataTypes.BooleanType;
-					break;
-				default:
-					System.out.println("Default schema type is String.");
-			}
-			fields.add(DataTypes.createStructField("C"+i++, dataType, true));
-		}
-		
-		return DataTypes.createStructType(fields);
-	}
-	
-	/* 
-	 * It will return JavaRDD<Row> based on csv data input file.
-	 */
-	public static JavaRDD<Row> getRowRDD(JavaSparkContext sc, String fnameIn, String separator, List<ValueType> schema)
-	{
-		// Load a text file and convert each line to a java rdd.
-		JavaRDD<String> dataRdd = sc.textFile(fnameIn);
-		return dataRdd.map(new RowGenerator(schema));
-	}
-	
-	/* 
-	 * Row Generator class based on individual line in CSV file.
-	 */
-	private static class RowGenerator implements Function<String,Row> 
-	{
-		private static final long serialVersionUID = -6736256507697511070L;
-
-		List<ValueType> _schema = null;
-		 
-		public RowGenerator(List<ValueType> schema)
-		{
-			_schema = schema;
-		}		
-		 
-		@Override
-		public Row call(String record) throws Exception {
-		      String[] fields = record.split(",");
-		      Object[] objects = new Object[fields.length]; 
-		      for (int i=0; i<fields.length; i++) {
-			      objects[i] = UtilFunctions.stringToObject(_schema.get(i), fields[i]);
-		      }
-		      return RowFactory.create(objects);
-		}
-	}
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index fb076bd..107dee3 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -519,8 +519,8 @@ public class FrameConverterTest extends AutomatedTestBase
 
 				//Create DataFrame 
 				SQLContext sqlContext = new SQLContext(sc);
-				StructType dfSchema = UtilFunctions.convertFrameSchemaToDFSchema(schema);
-				JavaRDD<Row> rowRDD = UtilFunctions.getRowRDD(sc, fnameIn, separator, schema);
+				StructType dfSchema = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema);
+				JavaRDD<Row> rowRDD = FrameRDDConverterUtils.getRowRDD(sc, fnameIn, separator, schema);
 				DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
 				
 				JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
index b6184cf..b3324df 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
@@ -224,13 +224,13 @@ public class FrameTest extends AutomatedTestBase
 		{
 			//Create DataFrame for input A 
 			SQLContext sqlContext = new SQLContext(sc);
-			StructType dfSchemaA = UtilFunctions.convertFrameSchemaToDFSchema(lschema);
-			JavaRDD<Row> rowRDDA = UtilFunctions.getRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, lschema);
+			StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema);
+			JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.getRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, lschema);
 			dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA);
 			
 			//Create DataFrame for input B 
-			StructType dfSchemaB = UtilFunctions.convertFrameSchemaToDFSchema(lschemaB);
-			JavaRDD<Row> rowRDDB = UtilFunctions.getRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB);
+			StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB);
+			JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.getRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB);
 			dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB);
 		}