You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/11/08 02:15:47 UTC

systemml git commit: [SYSTEMML-1984] Fix robustness JMLC config mgmt, read from input streams

Repository: systemml
Updated Branches:
  refs/heads/master bc781fcef -> 26fe72f22


[SYSTEMML-1984] Fix robustness JMLC config mgmt, read from input streams

This patch fixes special cases of JMLC configuration management,
specifically, if a connection is created in one thread, but it is used
in other threads to read data from input streams. Due to lost
thread-local configurations, this API call failed trying to read via a
multi-threaded reader. We now ensure that reads have the proper local
configurations and also avoid unnecessary exceptions by falling back to
sequential readers if necessary.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/26fe72f2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/26fe72f2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/26fe72f2

Branch: refs/heads/master
Commit: 26fe72f2283c2b3e6a7ed1f59d96c73f42e0fbef
Parents: bc781fc
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue Nov 7 18:16:53 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue Nov 7 18:17:03 2017 -0800

----------------------------------------------------------------------
 .../org/apache/sysml/api/jmlc/Connection.java   | 31 ++++++++++++--------
 .../sysml/runtime/io/FrameReaderFactory.java    |  6 ++--
 .../sysml/runtime/io/ReaderTextCSVParallel.java |  4 ++-
 .../runtime/io/ReaderTextCellParallel.java      |  4 ++-
 4 files changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/api/jmlc/Connection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
index 2568977..705d784 100644
--- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java
+++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
@@ -131,9 +131,7 @@ public class Connection implements Closeable
 		//create default configuration
 		_dmlconf = new DMLConfig();
 		
-		//set thread-local configurations for compilation and read
-		ConfigurationManager.setLocalConfig(_dmlconf);
-		ConfigurationManager.setLocalConfig(_cconf);
+		setLocalConfigs();
 	}
 	
 	/**
@@ -149,10 +147,7 @@ public class Connection implements Closeable
 		//set optional compiler configurations in current config
 		for( ConfigType configType : configs )
 			_cconf.set(configType, true);
-		
-		//set thread-local configurations for compilation and read
-		ConfigurationManager.setLocalConfig(_dmlconf);
-		ConfigurationManager.setLocalConfig(_cconf);
+		setLocalConfigs();
 	}
 	
 	/**
@@ -214,13 +209,11 @@ public class Connection implements Closeable
 		if( invalidVars.length > 0 )
 			throw new LanguageException("Invalid variable names: "+Arrays.toString(invalidVars));
 		
+		setLocalConfigs();
+		
 		//simplified compilation chain
 		Program rtprog = null;
 		try {
-			//set thread-local configurations for compilation
-			ConfigurationManager.setLocalConfig(_dmlconf);
-			ConfigurationManager.setLocalConfig(_cconf);
-			
 			//parsing
 			ParserWrapper parser = ParserFactory.createParser(parsePyDML ? ScriptType.PYDML : ScriptType.DML);
 			DMLProgram prog = parser.parse(null, script, args);
@@ -368,6 +361,8 @@ public class Connection implements Closeable
 	public double[][] readDoubleMatrix(String fname, InputInfo iinfo, long rows, long cols, int brlen, int bclen, long nnz) 
 		throws IOException
 	{
+		setLocalConfigs();
+		
 		try {
 			MatrixReader reader = MatrixReaderFactory.createMatrixReader(iinfo);
 			MatrixBlock mb = reader.readMatrixFromHDFS(fname, rows, cols, brlen, bclen, nnz);
@@ -534,6 +529,8 @@ public class Connection implements Closeable
 			throw new IOException("Invalid input format (expected: csv, text or mm): "+format);
 		}
 		
+		setLocalConfigs();
+		
 		try {
 			//read input matrix
 			InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? 
@@ -574,7 +571,7 @@ public class Connection implements Closeable
 			long rows = jmtd.getLong(DataExpression.READROWPARAM);
 			long cols = jmtd.getLong(DataExpression.READCOLPARAM);
 			String format = jmtd.getString(DataExpression.FORMAT_TYPE);
-			InputInfo iinfo = InputInfo.stringExternalToInputInfo(format);			
+			InputInfo iinfo = InputInfo.stringExternalToInputInfo(format);
 		
 			//read frame file
 			return readStringFrame(fname, iinfo, rows, cols);
@@ -598,6 +595,8 @@ public class Connection implements Closeable
 	public String[][] readStringFrame(String fname, InputInfo iinfo, long rows, long cols) 
 		throws IOException
 	{
+		setLocalConfigs();
+		
 		try {
 			FrameReader reader = FrameReaderFactory.createFrameReader(iinfo);
 			FrameBlock mb = reader.readFrameFromHDFS(fname, rows, cols);
@@ -764,6 +763,8 @@ public class Connection implements Closeable
 			throw new IOException("Invalid input format (expected: csv, text or mm): "+format);
 		}
 		
+		setLocalConfigs();
+		
 		try {
 			//read input frame
 			InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? 
@@ -863,4 +864,10 @@ public class Connection implements Closeable
 	public FrameBlock readTransformMetaDataFromPath(String spec, String metapath, String colDelim) throws IOException {
 		return TfMetaUtils.readTransformMetaDataFromPath(spec, metapath, colDelim);
 	}
+	
+	private void setLocalConfigs() {
+		//set thread-local configurations for compilation and read
+		ConfigurationManager.setLocalConfig(_dmlconf);
+		ConfigurationManager.setLocalConfig(_cconf);
+	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
index 6300b32..efb4014 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
@@ -44,7 +44,7 @@ public class FrameReaderFactory
 		FrameReader reader = null;
 
 		if( iinfo == InputInfo.TextCellInputInfo ) {
-			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) )
+			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS) )
 				reader = new FrameReaderTextCellParallel();
 			else	
 				reader = new FrameReaderTextCell();
@@ -52,7 +52,7 @@ public class FrameReaderFactory
 		else if( iinfo == InputInfo.CSVInputInfo ) {
 			if( props!=null && !(props instanceof CSVFileFormatProperties) )
 				throw new DMLRuntimeException("Wrong type of file format properties for CSV writer.");
-			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) )
+			if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS) )
 				reader = new FrameReaderTextCSVParallel( (CSVFileFormatProperties)props );
 			else
 				reader = new FrameReaderTextCSV( (CSVFileFormatProperties)props );
@@ -65,7 +65,7 @@ public class FrameReaderFactory
 		}
 		else {
 			throw new DMLRuntimeException("Failed to create frame reader for unknown input info: "
-		                                   + InputInfo.inputInfoToString(iinfo));
+				+ InputInfo.inputInfoToString(iinfo));
 		}
 		
 		return reader;

http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index eaf5d7f..f145d0a 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -116,7 +116,9 @@ public class ReaderTextCSVParallel extends MatrixReader
 	public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) 
 		throws IOException, DMLRuntimeException 
 	{
-		throw new DMLRuntimeException("Not implemented yet.");
+		//not implemented yet, fallback to sequential reader
+		return new ReaderTextCSV(_props)
+			.readMatrixFromInputStream(is, rlen, clen, brlen, bclen, estnnz);
 	}
 	
 	private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job, 

http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index b2a9608..2afbd7e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -107,7 +107,9 @@ public class ReaderTextCellParallel extends MatrixReader
 	public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) 
 		throws IOException, DMLRuntimeException 
 	{
-		throw new DMLRuntimeException("Not implemented yet.");
+		//not implemented yet, fallback to sequential reader
+		return new ReaderTextCell(_isMMFile ? InputInfo.MatrixMarketInputInfo : InputInfo.TextCellInputInfo)
+			.readMatrixFromInputStream(is, rlen, clen, brlen, bclen, estnnz);
 	}
 	
 	private void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean matrixMarket )