You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/11/08 02:15:47 UTC
systemml git commit: [SYSTEMML-1984] Fix robustness JMLC config mgmt,
read from input streams
Repository: systemml
Updated Branches:
refs/heads/master bc781fcef -> 26fe72f22
[SYSTEMML-1984] Fix robustness JMLC config mgmt, read from input streams
This patch fixes special cases of JMLC configuration management,
specifically, if a connection is created in one thread, but it is used
in other threads to read data from input streams. Due to lost
thread-local configurations, this API call failed trying to read via a
multi-threaded reader. We now ensure that reads have the proper local
configurations and also avoid unnecessary exceptions by falling back to
sequential readers if necessary.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/26fe72f2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/26fe72f2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/26fe72f2
Branch: refs/heads/master
Commit: 26fe72f2283c2b3e6a7ed1f59d96c73f42e0fbef
Parents: bc781fc
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue Nov 7 18:16:53 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue Nov 7 18:17:03 2017 -0800
----------------------------------------------------------------------
.../org/apache/sysml/api/jmlc/Connection.java | 31 ++++++++++++--------
.../sysml/runtime/io/FrameReaderFactory.java | 6 ++--
.../sysml/runtime/io/ReaderTextCSVParallel.java | 4 ++-
.../runtime/io/ReaderTextCellParallel.java | 4 ++-
4 files changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/api/jmlc/Connection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
index 2568977..705d784 100644
--- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java
+++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
@@ -131,9 +131,7 @@ public class Connection implements Closeable
//create default configuration
_dmlconf = new DMLConfig();
- //set thread-local configurations for compilation and read
- ConfigurationManager.setLocalConfig(_dmlconf);
- ConfigurationManager.setLocalConfig(_cconf);
+ setLocalConfigs();
}
/**
@@ -149,10 +147,7 @@ public class Connection implements Closeable
//set optional compiler configurations in current config
for( ConfigType configType : configs )
_cconf.set(configType, true);
-
- //set thread-local configurations for compilation and read
- ConfigurationManager.setLocalConfig(_dmlconf);
- ConfigurationManager.setLocalConfig(_cconf);
+ setLocalConfigs();
}
/**
@@ -214,13 +209,11 @@ public class Connection implements Closeable
if( invalidVars.length > 0 )
throw new LanguageException("Invalid variable names: "+Arrays.toString(invalidVars));
+ setLocalConfigs();
+
//simplified compilation chain
Program rtprog = null;
try {
- //set thread-local configurations for compilation
- ConfigurationManager.setLocalConfig(_dmlconf);
- ConfigurationManager.setLocalConfig(_cconf);
-
//parsing
ParserWrapper parser = ParserFactory.createParser(parsePyDML ? ScriptType.PYDML : ScriptType.DML);
DMLProgram prog = parser.parse(null, script, args);
@@ -368,6 +361,8 @@ public class Connection implements Closeable
public double[][] readDoubleMatrix(String fname, InputInfo iinfo, long rows, long cols, int brlen, int bclen, long nnz)
throws IOException
{
+ setLocalConfigs();
+
try {
MatrixReader reader = MatrixReaderFactory.createMatrixReader(iinfo);
MatrixBlock mb = reader.readMatrixFromHDFS(fname, rows, cols, brlen, bclen, nnz);
@@ -534,6 +529,8 @@ public class Connection implements Closeable
throw new IOException("Invalid input format (expected: csv, text or mm): "+format);
}
+ setLocalConfigs();
+
try {
//read input matrix
InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ?
@@ -574,7 +571,7 @@ public class Connection implements Closeable
long rows = jmtd.getLong(DataExpression.READROWPARAM);
long cols = jmtd.getLong(DataExpression.READCOLPARAM);
String format = jmtd.getString(DataExpression.FORMAT_TYPE);
- InputInfo iinfo = InputInfo.stringExternalToInputInfo(format);
+ InputInfo iinfo = InputInfo.stringExternalToInputInfo(format);
//read frame file
return readStringFrame(fname, iinfo, rows, cols);
@@ -598,6 +595,8 @@ public class Connection implements Closeable
public String[][] readStringFrame(String fname, InputInfo iinfo, long rows, long cols)
throws IOException
{
+ setLocalConfigs();
+
try {
FrameReader reader = FrameReaderFactory.createFrameReader(iinfo);
FrameBlock mb = reader.readFrameFromHDFS(fname, rows, cols);
@@ -764,6 +763,8 @@ public class Connection implements Closeable
throw new IOException("Invalid input format (expected: csv, text or mm): "+format);
}
+ setLocalConfigs();
+
try {
//read input frame
InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ?
@@ -863,4 +864,10 @@ public class Connection implements Closeable
public FrameBlock readTransformMetaDataFromPath(String spec, String metapath, String colDelim) throws IOException {
return TfMetaUtils.readTransformMetaDataFromPath(spec, metapath, colDelim);
}
+
+ private void setLocalConfigs() {
+ //set thread-local configurations for compilation and read
+ ConfigurationManager.setLocalConfig(_dmlconf);
+ ConfigurationManager.setLocalConfig(_cconf);
+ }
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
index 6300b32..efb4014 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
@@ -44,7 +44,7 @@ public class FrameReaderFactory
FrameReader reader = null;
if( iinfo == InputInfo.TextCellInputInfo ) {
- if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) )
+ if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS) )
reader = new FrameReaderTextCellParallel();
else
reader = new FrameReaderTextCell();
@@ -52,7 +52,7 @@ public class FrameReaderFactory
else if( iinfo == InputInfo.CSVInputInfo ) {
if( props!=null && !(props instanceof CSVFileFormatProperties) )
throw new DMLRuntimeException("Wrong type of file format properties for CSV writer.");
- if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) )
+ if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS) )
reader = new FrameReaderTextCSVParallel( (CSVFileFormatProperties)props );
else
reader = new FrameReaderTextCSV( (CSVFileFormatProperties)props );
@@ -65,7 +65,7 @@ public class FrameReaderFactory
}
else {
throw new DMLRuntimeException("Failed to create frame reader for unknown input info: "
- + InputInfo.inputInfoToString(iinfo));
+ + InputInfo.inputInfoToString(iinfo));
}
return reader;
http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index eaf5d7f..f145d0a 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -116,7 +116,9 @@ public class ReaderTextCSVParallel extends MatrixReader
public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz)
throws IOException, DMLRuntimeException
{
- throw new DMLRuntimeException("Not implemented yet.");
+ //not implemented yet, fallback to sequential reader
+ return new ReaderTextCSV(_props)
+ .readMatrixFromInputStream(is, rlen, clen, brlen, bclen, estnnz);
}
private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job,
http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index b2a9608..2afbd7e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -107,7 +107,9 @@ public class ReaderTextCellParallel extends MatrixReader
public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz)
throws IOException, DMLRuntimeException
{
- throw new DMLRuntimeException("Not implemented yet.");
+ //not implemented yet, fallback to sequential reader
+ return new ReaderTextCell(_isMMFile ? InputInfo.MatrixMarketInputInfo : InputInfo.TextCellInputInfo)
+ .readMatrixFromInputStream(is, rlen, clen, brlen, bclen, estnnz);
}
private void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean matrixMarket )