You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2017/05/26 06:52:15 UTC
[4/4] incubator-systemml git commit: [SYSTEMML-1303] Remove
deprecated old MLContext API
[SYSTEMML-1303] Remove deprecated old MLContext API
Remove deprecated old MLContext API, scheduled to be removed in version 1.0.0.
Closes #511.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/7ba17c7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/7ba17c7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/7ba17c7f
Branch: refs/heads/master
Commit: 7ba17c7f6604171b6c569f33a3823cf660d536d9
Parents: 0a89676
Author: Deron Eriksson <de...@us.ibm.com>
Authored: Thu May 25 23:45:41 2017 -0700
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Thu May 25 23:45:41 2017 -0700
----------------------------------------------------------------------
src/main/java/org/apache/sysml/api/MLBlock.java | 280 ---
.../java/org/apache/sysml/api/MLContext.java | 1608 ------------------
.../org/apache/sysml/api/MLContextProxy.java | 50 +-
.../java/org/apache/sysml/api/MLMatrix.java | 428 -----
.../java/org/apache/sysml/api/MLOutput.java | 267 ---
.../org/apache/sysml/api/python/SystemML.py | 232 ---
.../context/SparkExecutionContext.java | 694 ++++----
.../spark/functions/GetMLBlock.java | 43 -
.../spark/utils/RDDConverterUtilsExt.java | 166 +-
.../test/integration/AutomatedTestBase.java | 433 +++--
10 files changed, 622 insertions(+), 3579 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/api/MLBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLBlock.java b/src/main/java/org/apache/sysml/api/MLBlock.java
deleted file mode 100644
index 69dc5fc..0000000
--- a/src/main/java/org/apache/sysml/api/MLBlock.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sysml.api;
-
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-import scala.collection.mutable.Buffer;
-
-/**
- * @deprecated This will be removed in SystemML 1.0. Please migrate to {@link org.apache.sysml.api.mlcontext.MLContext}
- */
-@Deprecated
-public class MLBlock implements Row {
-
- private static final long serialVersionUID = -770986277854643424L;
-
- public MatrixIndexes indexes;
- public MatrixBlock block;
-
- public MLBlock(MatrixIndexes indexes, MatrixBlock block) {
- this.indexes = indexes;
- this.block = block;
- }
-
- @Override
- public boolean anyNull() {
- // TODO
- return false;
- }
-
- @Override
- public Object apply(int arg0) {
- if(arg0 == 0) {
- return indexes;
- }
- else if(arg0 == 1) {
- return block;
- }
- // TODO: For now not supporting any operations
- return 0;
- }
-
- @Override
- public Row copy() {
- return new MLBlock(new MatrixIndexes(indexes), new MatrixBlock(block));
- }
-
- @Override
- public Object get(int arg0) {
- if(arg0 == 0) {
- return indexes;
- }
- else if(arg0 == 1) {
- return block;
- }
- // TODO: For now not supporting any operations
- return 0;
- }
-
- @Override
- public <T> T getAs(int arg0) {
- // TODO
- return null;
- }
-
- @Override
- public <T> T getAs(String arg0) {
- // TODO
- return null;
- }
-
- @Override
- public boolean getBoolean(int arg0) {
- // TODO
- return false;
- }
-
- @Override
- public byte getByte(int arg0) {
- // TODO
- return 0;
- }
-
- @Override
- public Date getDate(int arg0) {
- // TODO
- return null;
- }
-
- @Override
- public BigDecimal getDecimal(int arg0) {
- // TODO
- return null;
- }
-
- @Override
- public double getDouble(int arg0) {
- // TODO
- return 0;
- }
-
- @Override
- public float getFloat(int arg0) {
- // TODO
- return 0;
- }
-
- @Override
- public int getInt(int arg0) {
- // TODO
- return 0;
- }
-
- @Override
- public <K, V> Map<K, V> getJavaMap(int arg0) {
- return null;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T> List<T> getList(int arg0) {
- ArrayList<Object> retVal = new ArrayList<Object>();
- retVal.add(indexes);
- retVal.add(block);
- //retVal.add(new Tuple2<MatrixIndexes, MatrixBlock>(indexes, block));
- return (List<T>) scala.collection.JavaConversions.asScalaBuffer(retVal).toList();
- }
-
- @Override
- public long getLong(int arg0) {
- // TODO
- return 0;
- }
-
- @Override
- public int fieldIndex(String arg0) {
- // TODO
- return 0;
- }
-
- @Override
- public <K, V> scala.collection.Map<K, V> getMap(int arg0) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public <T> scala.collection.immutable.Map<String, T> getValuesMap(Seq<String> arg0) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T> Seq<T> getSeq(int arg0) {
- ArrayList<Object> retVal = new ArrayList<Object>();
- retVal.add(indexes);
- retVal.add(block);
- // retVal.add(new Tuple2<MatrixIndexes, MatrixBlock>(indexes, block));
- @SuppressWarnings("rawtypes")
- Buffer scBuf = JavaConversions.asScalaBuffer(retVal);
- return scBuf.toSeq();
- }
-
- @Override
- public short getShort(int arg0) {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public String getString(int arg0) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Row getStruct(int arg0) {
- return this;
- }
-
- @Override
- public boolean isNullAt(int arg0) {
- // TODO Auto-generated method stub
- return false;
- }
-
- @Override
- public int length() {
- return 2;
- }
-
- @Override
- public String mkString() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public String mkString(String arg0) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public String mkString(String arg0, String arg1, String arg2) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public StructType schema() {
- return getDefaultSchemaForBinaryBlock();
- }
-
-
- @Override
- public int size() {
- return 2;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Seq<Object> toSeq() {
- ArrayList<Object> retVal = new ArrayList<Object>();
- retVal.add(indexes);
- retVal.add(block);
- // retVal.add(new Tuple2<MatrixIndexes, MatrixBlock>(indexes, block));
- @SuppressWarnings("rawtypes")
- Buffer scBuf = JavaConversions.asScalaBuffer(retVal);
- return scBuf.toSeq();
- }
-
- public static StructType getDefaultSchemaForBinaryBlock() {
- // TODO:
- StructField[] fields = new StructField[2];
- fields[0] = new StructField("IgnoreSchema", DataType.fromJson("DoubleType"), true, null);
- fields[1] = new StructField("IgnoreSchema1", DataType.fromJson("DoubleType"), true, null);
- return new StructType(fields);
- }
-
- // required for Spark 1.6+
- public Timestamp getTimestamp(int position) {
- // position 0 = MatrixIndexes and position 1 = MatrixBlock,
- // so return null since neither is of date type
- return null;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java
deleted file mode 100644
index b3102e9..0000000
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ /dev/null
@@ -1,1608 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.api;
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Scanner;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.SparkSession;
-import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.api.jmlc.JMLCUtils;
-import org.apache.sysml.api.mlcontext.ScriptType;
-import org.apache.sysml.conf.CompilerConfig;
-import org.apache.sysml.conf.CompilerConfig.ConfigType;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.conf.DMLConfig;
-import org.apache.sysml.hops.OptimizerUtils;
-import org.apache.sysml.hops.OptimizerUtils.OptimizationLevel;
-import org.apache.sysml.hops.globalopt.GlobalOptimizerWrapper;
-import org.apache.sysml.hops.rewrite.ProgramRewriter;
-import org.apache.sysml.hops.rewrite.RewriteRemovePersistentReadWrite;
-import org.apache.sysml.parser.DMLProgram;
-import org.apache.sysml.parser.DMLTranslator;
-import org.apache.sysml.parser.DataExpression;
-import org.apache.sysml.parser.Expression;
-import org.apache.sysml.parser.Expression.ValueType;
-import org.apache.sysml.parser.IntIdentifier;
-import org.apache.sysml.parser.LanguageException;
-import org.apache.sysml.parser.ParseException;
-import org.apache.sysml.parser.ParserFactory;
-import org.apache.sysml.parser.ParserWrapper;
-import org.apache.sysml.parser.StringIdentifier;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
-import org.apache.sysml.runtime.controlprogram.Program;
-import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.Instruction;
-import org.apache.sysml.runtime.instructions.cp.Data;
-import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
-import org.apache.sysml.runtime.instructions.spark.functions.ConvertStringToLongTextPair;
-import org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction;
-import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
-import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-import org.apache.sysml.runtime.matrix.data.FileFormatProperties;
-import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.matrix.data.InputInfo;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.utils.Explain;
-import org.apache.sysml.utils.Explain.ExplainCounts;
-import org.apache.sysml.utils.Statistics;
-
-/**
- * MLContext is useful for passing RDDs as input/output to SystemML. This API avoids the need to read/write
- * from HDFS (which is another way to pass inputs to SystemML).
- * <p>
- * Typical usage for MLContext is as follows:
- * <pre><code>
- * scala> import org.apache.sysml.api.MLContext
- * </code></pre>
- * <p>
- * Create input DataFrame from CSV file and potentially perform some feature transformation
- * <pre><code>
- * scala> val W = sparkSession.load("com.databricks.spark.csv", Map("path" -> "W.csv", "header" -> "false"))
- * scala> val H = sparkSession.load("com.databricks.spark.csv", Map("path" -> "H.csv", "header" -> "false"))
- * scala> val V = sparkSession.load("com.databricks.spark.csv", Map("path" -> "V.csv", "header" -> "false"))
- * </code></pre>
- * <p>
- * Create MLContext
- * <pre><code>
- * scala> val ml = new MLContext(sc)
- * </code></pre>
- * <p>
- * Register input and output DataFrame/RDD
- * Supported format:
- * <ol>
- * <li> DataFrame
- * <li> CSV/Text (as JavaRDD<String> or JavaPairRDD<LongWritable, Text>)
- * <li> Binary blocked RDD (JavaPairRDD<MatrixIndexes,MatrixBlock>))
- * </ol>
- * Also overloaded to support metadata information such as format, rlen, clen, ...
- * Please note the variable names given below in quotes correspond to the variables in DML script.
- * These variables need to have corresponding read/write associated in DML script.
- * Currently, only matrix variables are supported through registerInput/registerOutput interface.
- * To pass scalar variables, use named/positional arguments (described later) or wrap them into matrix variable.
- * <pre><code>
- * scala> ml.registerInput("V", V)
- * scala> ml.registerInput("W", W)
- * scala> ml.registerInput("H", H)
- * scala> ml.registerOutput("H")
- * scala> ml.registerOutput("W")
- * </code></pre>
- * <p>
- * Call script with default arguments:
- * <pre><code>
- * scala> val outputs = ml.execute("GNMF.dml")
- * </code></pre>
- * <p>
- * Also supported: calling script with positional arguments (args) and named arguments (nargs):
- * <pre><code>
- * scala> val args = Array("V.mtx", "W.mtx", "H.mtx", "2000", "1500", "50", "1", "WOut.mtx", "HOut.mtx")
- * scala> val nargs = Map("maxIter"->"1", "V" -> "")
- * scala> val outputs = ml.execute("GNMF.dml", args) # or ml.execute("GNMF_namedArgs.dml", nargs)
- * </code></pre>
- * <p>
- * To run the script again using different (or even same arguments), but using same registered input/outputs:
- * <pre><code>
- * scala> val new_outputs = ml.execute("GNMF.dml", new_args)
- * </code></pre>
- * <p>
- * However, to register new input/outputs, you need to first reset MLContext
- * <pre><code>
- * scala> ml.reset()
- * scala> ml.registerInput("V", newV)
- * </code></pre>
- * <p>
- * Experimental API:
- * To monitor performance (only supported for Spark 1.4.0 or higher),
- * <pre><code>
- * scala> val ml = new MLContext(sc, true)
- * </code></pre>
- * <p>
- * If monitoring performance is enabled,
- * <pre><code>
- * scala> print(ml.getMonitoringUtil().getExplainOutput())
- * scala> ml.getMonitoringUtil().getRuntimeInfoInHTML("runtime.html")
- * </code></pre>
- * <p>
- * Note: The execute(...) methods does not support parallel calls from same or different MLContext.
- * This is because current SystemML engine does not allow multiple invocation in same JVM.
- * So, if you plan to create a system which potentially creates multiple MLContext,
- * it is recommended to guard the execute(...) call using
- * <pre><code>
- * synchronized(MLContext.class) { ml.execute(...); }
- * </code></pre>
- *
- * @deprecated This will be removed in SystemML 1.0. Please migrate to {@link org.apache.sysml.api.mlcontext.MLContext}
- */
-@Deprecated
-public class MLContext {
-
- // ----------------------------------------------------
- // TODO: To make MLContext multi-threaded, track getCurrentMLContext and also all singletons and
- // static variables in SystemML codebase.
- private static MLContext _activeMLContext = null;
-
- // Package protected so as to maintain a clean public API for MLContext.
- // Use MLContextProxy.getActiveMLContext() if necessary
- static MLContext getActiveMLContext() {
- return _activeMLContext;
- }
- // ----------------------------------------------------
-
- private SparkContext _sc = null; // Read while creating SystemML's spark context
- public SparkContext getSparkContext() {
- if(_sc == null) {
- throw new RuntimeException("No spark context set in MLContext");
- }
- return _sc;
- }
- private ArrayList<String> _inVarnames = null;
- private ArrayList<String> _outVarnames = null;
- private LocalVariableMap _variables = null; // temporary symbol table
- private Program _rtprog = null;
-
- private Map<String, String> _additionalConfigs = new HashMap<String, String>();
-
- /**
- * Create an associated MLContext for given spark session.
- * @param sc SparkContext
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public MLContext(SparkContext sc) throws DMLRuntimeException {
- initializeSpark(sc, false, false);
- }
-
- /**
- * Create an associated MLContext for given spark session.
- * @param sc JavaSparkContext
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public MLContext(JavaSparkContext sc) throws DMLRuntimeException {
- initializeSpark(sc.sc(), false, false);
- }
-
- /**
- * Allow users to provide custom named-value configuration.
- * @param paramName parameter name
- * @param paramVal parameter value
- */
- public void setConfig(String paramName, String paramVal) {
- _additionalConfigs.put(paramName, paramVal);
- }
-
- // ====================================================================================
- // Register input APIs
- // 1. DataFrame
-
- /**
- * Register DataFrame as input. DataFrame is assumed to be in row format and each cell can be converted into double
- * through Double.parseDouble(cell.toString()). This is suitable for passing dense matrices. For sparse matrices,
- * consider passing through text format (using JavaRDD<String>, format="text")
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param df the DataFrame
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, Dataset<Row> df) throws DMLRuntimeException {
- registerInput(varName, df, false);
- }
-
- /**
- * Register DataFrame as input. DataFrame is assumed to be in row format and each cell can be converted into
- * SystemML frame row. Each column could be of type, Double, Float, Long, Integer, String or Boolean.
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param df the DataFrame
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerFrameInput(String varName, Dataset<Row> df) throws DMLRuntimeException {
- registerFrameInput(varName, df, false);
- }
-
- /**
- * Register DataFrame as input.
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param df the DataFrame
- * @param containsID false if the DataFrame has an column ID which denotes the row ID.
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, Dataset<Row> df, boolean containsID) throws DMLRuntimeException {
- int blksz = ConfigurationManager.getBlocksize();
- MatrixCharacteristics mcOut = new MatrixCharacteristics(-1, -1, blksz, blksz);
- JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = RDDConverterUtils
- .dataFrameToBinaryBlock(new JavaSparkContext(_sc), df, mcOut, containsID, false);
- registerInput(varName, rdd, mcOut);
- }
-
- /**
- * Register DataFrame as input. DataFrame is assumed to be in row format and each cell can be converted into
- * SystemML frame row. Each column could be of type, Double, Float, Long, Integer, String or Boolean.
- * <p>
- * @param varName variable name
- * @param df the DataFrame
- * @param containsID false if the DataFrame has an column ID which denotes the row ID.
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerFrameInput(String varName, Dataset<Row> df, boolean containsID) throws DMLRuntimeException {
- int blksz = ConfigurationManager.getBlocksize();
- MatrixCharacteristics mcOut = new MatrixCharacteristics(-1, -1, blksz, blksz);
- JavaPairRDD<Long, FrameBlock> rdd = FrameRDDConverterUtils.dataFrameToBinaryBlock(new JavaSparkContext(_sc), df, mcOut, containsID);
- registerInput(varName, rdd, mcOut.getRows(), mcOut.getCols(), null);
- }
-
- /**
- * Experimental API. Not supported in Python MLContext API.
- * @param varName variable name
- * @param df the DataFrame
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, MLMatrix df) throws DMLRuntimeException {
- registerInput(varName, MLMatrix.getRDDLazily(df), df.mc);
- }
-
- // ------------------------------------------------------------------------------------
- // 2. CSV/Text: Usually JavaRDD<String>, but also supports JavaPairRDD<LongWritable, Text>
- /**
- * Register CSV/Text as inputs: Method for supplying csv file format properties, but without dimensions or nnz
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the RDD
- * @param format the format
- * @param hasHeader is there a header
- * @param delim the delimiter
- * @param fill if true, fill, otherwise don't fill
- * @param fillValue the fill value
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, JavaRDD<String> rdd, String format, boolean hasHeader,
- String delim, boolean fill, double fillValue) throws DMLRuntimeException {
- registerInput(varName, rdd, format, hasHeader, delim, fill, fillValue, -1, -1, -1);
- }
-
- /**
- * Register CSV/Text as inputs: Method for supplying csv file format properties, but without dimensions or nnz
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the RDD
- * @param format the format
- * @param hasHeader is there a header
- * @param delim the delimiter
- * @param fill if true, fill, otherwise don't fill
- * @param fillValue the fill value
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, RDD<String> rdd, String format, boolean hasHeader,
- String delim, boolean fill, double fillValue) throws DMLRuntimeException {
- registerInput(varName, rdd.toJavaRDD(), format, hasHeader, delim, fill, fillValue, -1, -1, -1);
- }
-
- /**
- * Register CSV/Text as inputs: Method for supplying csv file format properties along with dimensions or nnz
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the RDD
- * @param format the format
- * @param hasHeader is there a header
- * @param delim the delimiter
- * @param fill if true, fill, otherwise don't fill
- * @param fillValue the fill value
- * @param rlen rows
- * @param clen columns
- * @param nnz non-zeros
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, RDD<String> rdd, String format, boolean hasHeader,
- String delim, boolean fill, double fillValue, long rlen, long clen, long nnz) throws DMLRuntimeException {
- registerInput(varName, rdd.toJavaRDD(), format, hasHeader, delim, fill, fillValue, -1, -1, -1);
- }
-
- /**
- * Register CSV/Text as inputs: Method for supplying csv file format properties along with dimensions or nnz
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the JavaRDD
- * @param format the format
- * @param hasHeader is there a header
- * @param delim the delimiter
- * @param fill if true, fill, otherwise don't fill
- * @param fillValue the fill value
- * @param rlen rows
- * @param clen columns
- * @param nnz non-zeros
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, JavaRDD<String> rdd, String format, boolean hasHeader,
- String delim, boolean fill, double fillValue, long rlen, long clen, long nnz) throws DMLRuntimeException {
- CSVFileFormatProperties props = new CSVFileFormatProperties(hasHeader, delim, fill, fillValue, "");
- registerInput(varName, rdd.mapToPair(new ConvertStringToLongTextPair()), format, rlen, clen, nnz, props);
- }
-
- /**
- * Register CSV/Text as inputs: Convenience method without dimensions and nnz. It uses default file properties (example: delim, fill, ..)
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the RDD
- * @param format the format
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, RDD<String> rdd, String format) throws DMLRuntimeException {
- registerInput(varName, rdd.toJavaRDD().mapToPair(new ConvertStringToLongTextPair()), format, -1, -1, -1, null);
- }
-
- /**
- * Register CSV/Text as inputs: Convenience method without dimensions and nnz. It uses default file properties (example: delim, fill, ..)
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the JavaRDD
- * @param format the format
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, JavaRDD<String> rdd, String format) throws DMLRuntimeException {
- registerInput(varName, rdd.mapToPair(new ConvertStringToLongTextPair()), format, -1, -1, -1, null);
- }
-
- /**
- * Register CSV/Text as inputs: Convenience method with dimensions and but no nnz. It uses default file properties (example: delim, fill, ..)
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the JavaRDD
- * @param format the format
- * @param rlen rows
- * @param clen columns
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, JavaRDD<String> rdd, String format, long rlen, long clen) throws DMLRuntimeException {
- registerInput(varName, rdd.mapToPair(new ConvertStringToLongTextPair()), format, rlen, clen, -1, null);
- }
-
- /**
- * Register CSV/Text as inputs: Convenience method with dimensions and but no nnz. It uses default file properties (example: delim, fill, ..)
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the RDD
- * @param format the format
- * @param rlen rows
- * @param clen columns
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, RDD<String> rdd, String format, long rlen, long clen) throws DMLRuntimeException {
- registerInput(varName, rdd.toJavaRDD().mapToPair(new ConvertStringToLongTextPair()), format, rlen, clen, -1, null);
- }
-
- /**
- * Register CSV/Text as inputs: with dimensions and nnz. It uses default file properties (example: delim, fill, ..)
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the JavaRDD
- * @param format the format
- * @param rlen rows
- * @param clen columns
- * @param nnz non-zeros
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, JavaRDD<String> rdd, String format, long rlen, long clen, long nnz) throws DMLRuntimeException {
- registerInput(varName, rdd.mapToPair(new ConvertStringToLongTextPair()), format, rlen, clen, nnz, null);
- }
-
- /**
- * Register CSV/Text as inputs: with dimensions and nnz. It uses default file properties (example: delim, fill, ..)
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the JavaRDD
- * @param format the format
- * @param rlen rows
- * @param clen columns
- * @param nnz non-zeros
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, RDD<String> rdd, String format, long rlen, long clen, long nnz) throws DMLRuntimeException {
- registerInput(varName, rdd.toJavaRDD().mapToPair(new ConvertStringToLongTextPair()), format, rlen, clen, nnz, null);
- }
-
- // All CSV related methods call this ... It provides access to dimensions, nnz, file properties.
- private void registerInput(String varName, JavaPairRDD<LongWritable, Text> textOrCsv_rdd, String format, long rlen, long clen, long nnz, FileFormatProperties props) throws DMLRuntimeException {
- if(!(DMLScript.rtplatform == RUNTIME_PLATFORM.SPARK || DMLScript.rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK)) {
- throw new DMLRuntimeException("The registerInput functionality only supported for spark runtime. Please use MLContext(sc) instead of default constructor.");
- }
-
- if(_variables == null)
- _variables = new LocalVariableMap();
- if(_inVarnames == null)
- _inVarnames = new ArrayList<String>();
-
- MatrixObject mo;
- if( format.equals("csv") ) {
- int blksz = ConfigurationManager.getBlocksize();
- MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, blksz, blksz, nnz);
- mo = new MatrixObject(ValueType.DOUBLE, OptimizerUtils.getUniqueTempFileName(), new MatrixFormatMetaData(mc, OutputInfo.CSVOutputInfo, InputInfo.CSVInputInfo));
- }
- else if( format.equals("text") ) {
- if(rlen == -1 || clen == -1) {
- throw new DMLRuntimeException("The metadata is required in registerInput for format:" + format);
- }
- int blksz = ConfigurationManager.getBlocksize();
- MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, blksz, blksz, nnz);
- mo = new MatrixObject(ValueType.DOUBLE, OptimizerUtils.getUniqueTempFileName(), new MatrixFormatMetaData(mc, OutputInfo.TextCellOutputInfo, InputInfo.TextCellInputInfo));
- }
- else if( format.equals("mm") ) {
- // TODO: Handle matrix market
- throw new DMLRuntimeException("Matrixmarket format is not yet implemented in registerInput: " + format);
- }
- else {
-
- throw new DMLRuntimeException("Incorrect format in registerInput: " + format);
- }
-
- JavaPairRDD<LongWritable, Text> rdd = textOrCsv_rdd.mapToPair(new CopyTextInputFunction());
- if(props != null)
- mo.setFileFormatProperties(props);
- mo.setRDDHandle(new RDDObject(rdd, varName));
- _variables.put(varName, mo);
- _inVarnames.add(varName);
- checkIfRegisteringInputAllowed();
- }
-
- /**
- * Register Frame with CSV/Text as inputs: with dimensions.
- * File properties (example: delim, fill, ..) can be specified through props else defaults will be used.
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rddIn the JavaPairRDD
- * @param format the format
- * @param rlen rows
- * @param clen columns
- * @param props properties
- * @param schema List of column types
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, JavaRDD<String> rddIn, String format, long rlen, long clen, FileFormatProperties props,
- List<ValueType> schema) throws DMLRuntimeException {
- if(!(DMLScript.rtplatform == RUNTIME_PLATFORM.SPARK || DMLScript.rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK)) {
- throw new DMLRuntimeException("The registerInput functionality only supported for spark runtime. Please use MLContext(sc) instead of default constructor.");
- }
-
- long nnz = -1;
- if(_variables == null)
- _variables = new LocalVariableMap();
- if(_inVarnames == null)
- _inVarnames = new ArrayList<String>();
-
- JavaPairRDD<LongWritable, Text> rddText = rddIn.mapToPair(new ConvertStringToLongTextPair());
-
- int blksz = ConfigurationManager.getBlocksize();
- MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, blksz, blksz, nnz);
- FrameObject fo = null;
- if( format.equals("csv") ) {
- CSVFileFormatProperties csvprops = (props!=null) ? (CSVFileFormatProperties)props: new CSVFileFormatProperties();
- fo = new FrameObject(OptimizerUtils.getUniqueTempFileName(), new MatrixFormatMetaData(mc, OutputInfo.CSVOutputInfo, InputInfo.CSVInputInfo));
- fo.setFileFormatProperties(csvprops);
- }
- else if( format.equals("text") ) {
- if(rlen == -1 || clen == -1) {
- throw new DMLRuntimeException("The metadata is required in registerInput for format:" + format);
- }
- fo = new FrameObject(OptimizerUtils.getUniqueTempFileName(), new MatrixFormatMetaData(mc, OutputInfo.TextCellOutputInfo, InputInfo.TextCellInputInfo));
- }
- else {
-
- throw new DMLRuntimeException("Incorrect format in registerInput: " + format);
- }
- if(props != null)
- fo.setFileFormatProperties(props);
-
- fo.setRDDHandle(new RDDObject(rddText, varName));
- fo.setSchema("String"); //TODO fix schema
- _variables.put(varName, fo);
- _inVarnames.add(varName);
- checkIfRegisteringInputAllowed();
- }
-
- private void registerInput(String varName, JavaPairRDD<Long, FrameBlock> rdd, long rlen, long clen, FileFormatProperties props) throws DMLRuntimeException {
- if(!(DMLScript.rtplatform == RUNTIME_PLATFORM.SPARK || DMLScript.rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK)) {
- throw new DMLRuntimeException("The registerInput functionality only supported for spark runtime. Please use MLContext(sc) instead of default constructor.");
- }
-
- if(_variables == null)
- _variables = new LocalVariableMap();
- if(_inVarnames == null)
- _inVarnames = new ArrayList<String>();
-
- int blksz = ConfigurationManager.getBlocksize();
- MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, blksz, blksz, -1);
- FrameObject fo = new FrameObject(OptimizerUtils.getUniqueTempFileName(), new MatrixFormatMetaData(mc, OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo));
-
- if(props != null)
- fo.setFileFormatProperties(props);
-
- fo.setRDDHandle(new RDDObject(rdd, varName));
- _variables.put(varName, fo);
- _inVarnames.add(varName);
- checkIfRegisteringInputAllowed();
- }
-
- // ------------------------------------------------------------------------------------
-
- // 3. Binary blocked RDD: Support JavaPairRDD<MatrixIndexes,MatrixBlock>
-
- /**
- * Register binary blocked RDD with given dimensions, default block sizes and no nnz
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the JavaPairRDD
- * @param rlen rows
- * @param clen columns
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, long rlen, long clen) throws DMLRuntimeException {
- //TODO replace default blocksize
- registerInput(varName, rdd, rlen, clen, OptimizerUtils.DEFAULT_BLOCKSIZE, OptimizerUtils.DEFAULT_BLOCKSIZE);
- }
-
- /**
- * Register binary blocked RDD with given dimensions, given block sizes and no nnz
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the JavaPairRDD
- * @param rlen rows
- * @param clen columns
- * @param brlen block rows
- * @param bclen block columns
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, long rlen, long clen, int brlen, int bclen) throws DMLRuntimeException {
- registerInput(varName, rdd, rlen, clen, brlen, bclen, -1);
- }
-
-
- /**
- * Register binary blocked RDD with given dimensions, given block sizes and given nnz (preferred).
- * <p>
- * Marks the variable in the DML script as input variable.
- * Note that this expects a "varName = read(...)" statement in the DML script which through non-MLContext invocation
- * would have been created by reading a HDFS file.
- * @param varName variable name
- * @param rdd the JavaPairRDD
- * @param rlen rows
- * @param clen columns
- * @param brlen block rows
- * @param bclen block columns
- * @param nnz non-zeros
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerInput(String varName, JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, long rlen, long clen, int brlen, int bclen, long nnz) throws DMLRuntimeException {
- if(rlen == -1 || clen == -1) {
- throw new DMLRuntimeException("The metadata is required in registerInput for binary format");
- }
-
- MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, brlen, bclen, nnz);
- registerInput(varName, rdd, mc);
- }
-
- // All binary blocked method call this.
- public void registerInput(String varName, JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, MatrixCharacteristics mc) throws DMLRuntimeException {
- if(_variables == null)
- _variables = new LocalVariableMap();
- if(_inVarnames == null)
- _inVarnames = new ArrayList<String>();
- // Bug in Spark is messing up blocks and indexes due to too eager reuse of data structures
- JavaPairRDD<MatrixIndexes, MatrixBlock> copyRDD = SparkUtils.copyBinaryBlockMatrix(rdd);
-
- MatrixObject mo = new MatrixObject(ValueType.DOUBLE, OptimizerUtils.getUniqueTempFileName(),
- new MatrixFormatMetaData(mc, OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo));
- mo.setRDDHandle(new RDDObject(copyRDD, varName));
- _variables.put(varName, mo);
- _inVarnames.add(varName);
- checkIfRegisteringInputAllowed();
- }
-
- public void registerInput(String varName, MatrixBlock mb) throws DMLRuntimeException {
- int blksz = ConfigurationManager.getBlocksize();
- MatrixCharacteristics mc = new MatrixCharacteristics(mb.getNumRows(), mb.getNumColumns(), blksz, blksz, mb.getNonZeros());
- registerInput(varName, mb, mc);
- }
-
- public void registerInput(String varName, MatrixBlock mb, MatrixCharacteristics mc) throws DMLRuntimeException {
- if(_variables == null)
- _variables = new LocalVariableMap();
- if(_inVarnames == null)
- _inVarnames = new ArrayList<String>();
- MatrixObject mo = new MatrixObject(ValueType.DOUBLE, OptimizerUtils.getUniqueTempFileName(),
- new MatrixFormatMetaData(mc, OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo));
- mo.acquireModify(mb);
- mo.release();
- _variables.put(varName, mo);
- _inVarnames.add(varName);
- checkIfRegisteringInputAllowed();
- }
-
- // =============================================================================================
-
- /**
- * Marks the variable in the DML script as output variable.
- * Note that this expects a "write(varName, ...)" statement in the DML script which through non-MLContext invocation
- * would have written the matrix to HDFS.
- * @param varName variable name
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public void registerOutput(String varName) throws DMLRuntimeException {
- if(!(DMLScript.rtplatform == RUNTIME_PLATFORM.SPARK || DMLScript.rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK)) {
- throw new DMLRuntimeException("The registerOutput functionality only supported for spark runtime. Please use MLContext(sc) instead of default constructor.");
- }
- if(_outVarnames == null)
- _outVarnames = new ArrayList<String>();
- _outVarnames.add(varName);
- if(_variables == null)
- _variables = new LocalVariableMap();
- }
-
- // =============================================================================================
-
- /**
- * Execute DML script by passing named arguments using specified config file.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param namedArgs named arguments
- * @param parsePyDML true if pydml, false otherwise
- * @param configFilePath path to config file
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, Map<String, String> namedArgs, boolean parsePyDML, String configFilePath) throws IOException, DMLException, ParseException {
- String [] args = new String[namedArgs.size()];
- int i = 0;
- for(Entry<String, String> entry : namedArgs.entrySet()) {
- if(entry.getValue().trim().isEmpty())
- args[i] = entry.getKey() + "=\"" + entry.getValue() + "\"";
- else
- args[i] = entry.getKey() + "=" + entry.getValue();
- i++;
- }
- return compileAndExecuteScript(dmlScriptFilePath, args, true, parsePyDML ? ScriptType.PYDML : ScriptType.DML, configFilePath);
- }
-
- /**
- * Execute DML script by passing named arguments using specified config file.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param namedArgs named arguments
- * @param configFilePath path to config file
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, Map<String, String> namedArgs, String configFilePath) throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, namedArgs, false, configFilePath);
- }
-
- /**
- * Execute DML script by passing named arguments with default configuration.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param namedArgs named arguments
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, Map<String, String> namedArgs) throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, namedArgs, false, null);
- }
-
- /**
- * Execute DML script by passing named arguments.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param namedArgs named arguments
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, scala.collection.immutable.Map<String, String> namedArgs) throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, new HashMap<String, String>(scala.collection.JavaConversions.mapAsJavaMap(namedArgs)));
- }
-
- /**
- * Experimental: Execute PyDML script by passing named arguments if parsePyDML=true.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param namedArgs named arguments
- * @param parsePyDML true if pydml, false otherwise
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, Map<String, String> namedArgs, boolean parsePyDML) throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, namedArgs, parsePyDML, null);
- }
-
- /**
- * Experimental: Execute PyDML script by passing named arguments if parsePyDML=true.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param namedArgs named arguments
- * @param parsePyDML true if pydml, false otherwise
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, scala.collection.immutable.Map<String, String> namedArgs, boolean parsePyDML) throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, new HashMap<String, String>(scala.collection.JavaConversions.mapAsJavaMap(namedArgs)), parsePyDML);
- }
-
- /**
- * Execute DML script by passing positional arguments using specified config file
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param args arguments
- * @param configFilePath path to config file
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, String [] args, String configFilePath) throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, args, false, configFilePath);
- }
-
- /**
- * Execute DML script by passing positional arguments using specified config file
- * This method is implemented for compatibility with Python MLContext.
- * Java/Scala users should use 'MLOutput execute(String dmlScriptFilePath, String [] args, String configFilePath)' instead as
- * equivalent scala collections (Seq/ArrayBuffer) is not implemented.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param args arguments
- * @param configFilePath path to config file
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, ArrayList<String> args, String configFilePath) throws IOException, DMLException, ParseException {
- String [] argsArr = new String[args.size()];
- argsArr = args.toArray(argsArr);
- return execute(dmlScriptFilePath, argsArr, false, configFilePath);
- }
-
- /**
- * Execute DML script by passing positional arguments using default configuration
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param args arguments
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, String [] args) throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, args, false, null);
- }
-
- /**
- * Execute DML script by passing positional arguments using default configuration.
- * This method is implemented for compatibility with Python MLContext.
- * Java/Scala users should use 'MLOutput execute(String dmlScriptFilePath, String [] args)' instead as
- * equivalent scala collections (Seq/ArrayBuffer) is not implemented.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param args arguments
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, ArrayList<String> args) throws IOException, DMLException, ParseException {
- String [] argsArr = new String[args.size()];
- argsArr = args.toArray(argsArr);
- return execute(dmlScriptFilePath, argsArr, false, null);
- }
-
- /**
- * Experimental: Execute DML script by passing positional arguments if parsePyDML=true, using default configuration.
- * This method is implemented for compatibility with Python MLContext.
- * Java/Scala users should use 'MLOutput execute(String dmlScriptFilePath, String [] args, boolean parsePyDML)' instead as
- * equivalent scala collections (Seq/ArrayBuffer) is not implemented.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param args arguments
- * @param parsePyDML true if pydml, false otherwise
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, ArrayList<String> args, boolean parsePyDML) throws IOException, DMLException, ParseException {
- String [] argsArr = new String[args.size()];
- argsArr = args.toArray(argsArr);
- return execute(dmlScriptFilePath, argsArr, parsePyDML, null);
- }
-
- /**
- * Experimental: Execute DML script by passing positional arguments if parsePyDML=true, using specified config file.
- * This method is implemented for compatibility with Python MLContext.
- * Java/Scala users should use 'MLOutput execute(String dmlScriptFilePath, String [] args, boolean parsePyDML, String configFilePath)' instead as
- * equivalent scala collections (Seq/ArrayBuffer) is not implemented.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param args arguments
- * @param parsePyDML true if pydml, false otherwise
- * @param configFilePath path to config file
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, ArrayList<String> args, boolean parsePyDML, String configFilePath) throws IOException, DMLException, ParseException {
- String [] argsArr = new String[args.size()];
- argsArr = args.toArray(argsArr);
- return execute(dmlScriptFilePath, argsArr, parsePyDML, configFilePath);
- }
-
- /*
- @NOTE: from calling with the SparkR , somehow Map passing from R to java
- is not working and hence we pass in two arrays each representing keys
- and values
- */
- /**
- * Execute DML script by passing positional arguments using specified config file
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param argsName argument names
- * @param argsValues argument values
- * @param configFilePath path to config file
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, ArrayList<String> argsName,
- ArrayList<String> argsValues, String configFilePath)
- throws IOException, DMLException, ParseException {
- HashMap<String, String> newNamedArgs = new HashMap<String, String>();
- if (argsName.size() != argsValues.size()) {
- throw new DMLException("size of argsName " + argsName.size() +
- " is diff than " + " size of argsValues");
- }
- for (int i = 0; i < argsName.size(); i++) {
- String k = argsName.get(i);
- String v = argsValues.get(i);
- newNamedArgs.put(k, v);
- }
- return execute(dmlScriptFilePath, newNamedArgs, configFilePath);
- }
- /**
- * Execute DML script by passing positional arguments using specified config file
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param argsName argument names
- * @param argsValues argument values
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, ArrayList<String> argsName,
- ArrayList<String> argsValues)
- throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, argsName, argsValues, null);
- }
-
- /**
- * Experimental: Execute DML script by passing positional arguments if parsePyDML=true, using specified config file.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param args arguments
- * @param parsePyDML true if pydml, false otherwise
- * @param configFilePath path to config file
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, String [] args, boolean parsePyDML, String configFilePath) throws IOException, DMLException, ParseException {
- return compileAndExecuteScript(dmlScriptFilePath, args, false, parsePyDML ? ScriptType.PYDML : ScriptType.DML, configFilePath);
- }
-
- /**
- * Experimental: Execute DML script by passing positional arguments if parsePyDML=true, using default configuration.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param args arguments
- * @param parsePyDML true if pydml, false otherwise
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, String [] args, boolean parsePyDML) throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, args, parsePyDML, null);
- }
-
- /**
- * Execute DML script without any arguments using specified config path
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param configFilePath path to config file
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, String configFilePath) throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, false, configFilePath);
- }
-
- /**
- * Execute DML script without any arguments using default configuration.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath) throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, false, null);
- }
-
- /**
- * Experimental: Execute DML script without any arguments if parsePyDML=true, using specified config path.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param parsePyDML true if pydml, false otherwise
- * @param configFilePath path to config file
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, boolean parsePyDML, String configFilePath) throws IOException, DMLException, ParseException {
- return compileAndExecuteScript(dmlScriptFilePath, null, false, parsePyDML ? ScriptType.PYDML : ScriptType.DML, configFilePath);
- }
-
- /**
- * Experimental: Execute DML script without any arguments if parsePyDML=true, using default configuration.
- * @param dmlScriptFilePath the dml script can be in local filesystem or in HDFS
- * @param parsePyDML true if pydml, false otherwise
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput execute(String dmlScriptFilePath, boolean parsePyDML) throws IOException, DMLException, ParseException {
- return execute(dmlScriptFilePath, parsePyDML, null);
- }
-
- // -------------------------------- Utility methods begins ----------------------------------------------------------
-
-
- /**
- * Call this method if you want to clear any RDDs set via registerInput, registerOutput.
- * This is required if ml.execute(..) has been called earlier and you want to call a new DML script.
- * Note: By default this doesnot clean up configuration set using setConfig method.
- * To clean the configuration as along with registered input/outputs, please use reset(true);
- * @throws DMLRuntimeException if DMLException occurs
- */
- public void reset()
- throws DMLRuntimeException
- {
- reset(false);
- }
-
- public void reset(boolean cleanupConfig)
- throws DMLRuntimeException
- {
- //cleanup variables from bufferpool, incl evicted files
- //(otherwise memory leak because bufferpool holds references)
- CacheableData.cleanupCacheDir();
-
- //clear mlcontext state
- _inVarnames = null;
- _outVarnames = null;
- _variables = null;
- if(cleanupConfig)
- _additionalConfigs.clear();
- }
-
- /**
- * Used internally
- * @param source the expression
- * @param target the target
- * @throws LanguageException if LanguageException occurs
- */
- void setAppropriateVarsForRead(Expression source, String target)
- throws LanguageException
- {
- boolean isTargetRegistered = isRegisteredAsInput(target);
- boolean isReadExpression = (source instanceof DataExpression && ((DataExpression) source).isRead());
- if(isTargetRegistered && isReadExpression) {
- // Do not check metadata file for registered reads
- ((DataExpression) source).setCheckMetadata(false);
-
- if (((DataExpression)source).getDataType() == Expression.DataType.MATRIX) {
-
- MatrixObject mo = null;
-
- try {
- mo = getMatrixObject(target);
- int blp = source.getBeginLine(); int bcp = source.getBeginColumn();
- int elp = source.getEndLine(); int ecp = source.getEndColumn();
- ((DataExpression) source).addVarParam(DataExpression.READROWPARAM, new IntIdentifier(mo.getNumRows(), source.getFilename(), blp, bcp, elp, ecp));
- ((DataExpression) source).addVarParam(DataExpression.READCOLPARAM, new IntIdentifier(mo.getNumColumns(), source.getFilename(), blp, bcp, elp, ecp));
- ((DataExpression) source).addVarParam(DataExpression.READNUMNONZEROPARAM, new IntIdentifier(mo.getNnz(), source.getFilename(), blp, bcp, elp, ecp));
- ((DataExpression) source).addVarParam(DataExpression.DATATYPEPARAM, new StringIdentifier("matrix", source.getFilename(), blp, bcp, elp, ecp));
- ((DataExpression) source).addVarParam(DataExpression.VALUETYPEPARAM, new StringIdentifier("double", source.getFilename(), blp, bcp, elp, ecp));
-
- if(mo.getMetaData() instanceof MatrixFormatMetaData) {
- MatrixFormatMetaData metaData = (MatrixFormatMetaData) mo.getMetaData();
- if(metaData.getOutputInfo() == OutputInfo.CSVOutputInfo) {
- ((DataExpression) source).addVarParam(DataExpression.FORMAT_TYPE, new StringIdentifier(DataExpression.FORMAT_TYPE_VALUE_CSV, source.getFilename(), blp, bcp, elp, ecp));
- }
- else if(metaData.getOutputInfo() == OutputInfo.TextCellOutputInfo) {
- ((DataExpression) source).addVarParam(DataExpression.FORMAT_TYPE, new StringIdentifier(DataExpression.FORMAT_TYPE_VALUE_TEXT, source.getFilename(), blp, bcp, elp, ecp));
- }
- else if(metaData.getOutputInfo() == OutputInfo.BinaryBlockOutputInfo) {
- ((DataExpression) source).addVarParam(DataExpression.ROWBLOCKCOUNTPARAM, new IntIdentifier(mo.getNumRowsPerBlock(), source.getFilename(), blp, bcp, elp, ecp));
- ((DataExpression) source).addVarParam(DataExpression.COLUMNBLOCKCOUNTPARAM, new IntIdentifier(mo.getNumColumnsPerBlock(), source.getFilename(), blp, bcp, elp, ecp));
- ((DataExpression) source).addVarParam(DataExpression.FORMAT_TYPE, new StringIdentifier(DataExpression.FORMAT_TYPE_VALUE_BINARY, source.getFilename(), blp, bcp, elp, ecp));
- }
- else {
- throw new LanguageException("Unsupported format through MLContext");
- }
- }
- } catch (DMLRuntimeException e) {
- throw new LanguageException(e);
- }
- } else if (((DataExpression)source).getDataType() == Expression.DataType.FRAME) {
- FrameObject mo = null;
- try {
- mo = getFrameObject(target);
- int blp = source.getBeginLine(); int bcp = source.getBeginColumn();
- int elp = source.getEndLine(); int ecp = source.getEndColumn();
- ((DataExpression) source).addVarParam(DataExpression.READROWPARAM, new IntIdentifier(mo.getNumRows(), source.getFilename(), blp, bcp, elp, ecp));
- ((DataExpression) source).addVarParam(DataExpression.READCOLPARAM, new IntIdentifier(mo.getNumColumns(), source.getFilename(), blp, bcp, elp, ecp));
- ((DataExpression) source).addVarParam(DataExpression.DATATYPEPARAM, new StringIdentifier("frame", source.getFilename(), blp, bcp, elp, ecp));
- ((DataExpression) source).addVarParam(DataExpression.VALUETYPEPARAM, new StringIdentifier("double", source.getFilename(), blp, bcp, elp, ecp)); //TODO change to schema
-
- if(mo.getMetaData() instanceof MatrixFormatMetaData) {
- MatrixFormatMetaData metaData = (MatrixFormatMetaData) mo.getMetaData();
- if(metaData.getOutputInfo() == OutputInfo.CSVOutputInfo) {
- ((DataExpression) source).addVarParam(DataExpression.FORMAT_TYPE, new StringIdentifier(DataExpression.FORMAT_TYPE_VALUE_CSV, source.getFilename(), blp, bcp, elp, ecp));
- }
- else if(metaData.getOutputInfo() == OutputInfo.TextCellOutputInfo) {
- ((DataExpression) source).addVarParam(DataExpression.FORMAT_TYPE, new StringIdentifier(DataExpression.FORMAT_TYPE_VALUE_TEXT, source.getFilename(), blp, bcp, elp, ecp));
- }
- else if(metaData.getOutputInfo() == OutputInfo.BinaryBlockOutputInfo) {
- ((DataExpression) source).addVarParam(DataExpression.FORMAT_TYPE, new StringIdentifier(DataExpression.FORMAT_TYPE_VALUE_BINARY, source.getFilename(), blp, bcp, elp, ecp));
- }
- else {
- throw new LanguageException("Unsupported format through MLContext");
- }
- }
- } catch (DMLRuntimeException e) {
- throw new LanguageException(e);
- }
- }
- }
- }
-
- /**
- * Used internally
- * @param tmp list of instructions
- * @return list of instructions
- */
- ArrayList<Instruction> performCleanupAfterRecompilation(ArrayList<Instruction> tmp) {
- String [] outputs = (_outVarnames != null) ? _outVarnames.toArray(new String[0]) : new String[0];
- return JMLCUtils.cleanupRuntimeInstructions(tmp, outputs);
- }
-
- // -------------------------------- Utility methods ends ----------------------------------------------------------
-
- // -------------------------------- Private methods begins ----------------------------------------------------------
- private boolean isRegisteredAsInput(String varName) {
- if(_inVarnames != null) {
- for(String v : _inVarnames) {
- if(v.equals(varName)) {
- return true;
- }
- }
- }
- return false;
- }
-
- private MatrixObject getMatrixObject(String varName) throws DMLRuntimeException {
- if(_variables != null) {
- Data mo = _variables.get(varName);
- if(mo instanceof MatrixObject) {
- return (MatrixObject) mo;
- }
- else {
- throw new DMLRuntimeException("ERROR: Incorrect type");
- }
- }
- throw new DMLRuntimeException("ERROR: getMatrixObject not set for variable:" + varName);
- }
-
- private FrameObject getFrameObject(String varName) throws DMLRuntimeException {
- if(_variables != null) {
- Data mo = _variables.get(varName);
- if(mo instanceof FrameObject) {
- return (FrameObject) mo;
- }
- else {
- throw new DMLRuntimeException("ERROR: Incorrect type");
- }
- }
- throw new DMLRuntimeException("ERROR: getMatrixObject not set for variable:" + varName);
- }
-
- private int compareVersion(String versionStr1, String versionStr2) {
- Scanner s1 = null;
- Scanner s2 = null;
- try {
- s1 = new Scanner(versionStr1); s1.useDelimiter("\\.");
- s2 = new Scanner(versionStr2); s2.useDelimiter("\\.");
- while(s1.hasNextInt() && s2.hasNextInt()) {
- int version1 = s1.nextInt();
- int version2 = s2.nextInt();
- if(version1 < version2) {
- return -1;
- } else if(version1 > version2) {
- return 1;
- }
- }
-
- if(s1.hasNextInt()) return 1;
- }
- finally {
- IOUtilFunctions.closeSilently(s1);
- IOUtilFunctions.closeSilently(s2);
- }
-
- return 0;
- }
-
- private void initializeSpark(SparkContext sc, boolean monitorPerformance, boolean setForcedSparkExecType) throws DMLRuntimeException {
- MLContextProxy.setActive(true);
-
- this._sc = sc;
-
- if(compareVersion(sc.version(), "1.3.0") < 0 ) {
- throw new DMLRuntimeException("Expected spark version >= 1.3.0 for running SystemML");
- }
-
- if(setForcedSparkExecType)
- DMLScript.rtplatform = RUNTIME_PLATFORM.SPARK;
- else
- DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
- }
-
-
- /**
- * Execute a script stored in a string.
- *
- * @param dmlScript the script
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLOutput executeScript(String dmlScript)
- throws IOException, DMLException {
- return executeScript(dmlScript, false);
- }
-
- public MLOutput executeScript(String dmlScript, boolean isPyDML)
- throws IOException, DMLException {
- return executeScript(dmlScript, isPyDML, null);
- }
-
- public MLOutput executeScript(String dmlScript, String configFilePath)
- throws IOException, DMLException {
- return executeScript(dmlScript, false, configFilePath);
- }
-
- public MLOutput executeScript(String dmlScript, boolean isPyDML, String configFilePath)
- throws IOException, DMLException {
- return compileAndExecuteScript(dmlScript, null, false, false, isPyDML ? ScriptType.PYDML : ScriptType.DML, configFilePath);
- }
-
- /*
- @NOTE: from calling with the SparkR , somehow HashMap passing from R to java
- is not working and hence we pass in two arrays each representing keys
- and values
- */
- public MLOutput executeScript(String dmlScript, ArrayList<String> argsName,
- ArrayList<String> argsValues, String configFilePath)
- throws IOException, DMLException, ParseException {
- HashMap<String, String> newNamedArgs = new HashMap<String, String>();
- if (argsName.size() != argsValues.size()) {
- throw new DMLException("size of argsName " + argsName.size() +
- " is diff than " + " size of argsValues");
- }
- for (int i = 0; i < argsName.size(); i++) {
- String k = argsName.get(i);
- String v = argsValues.get(i);
- newNamedArgs.put(k, v);
- }
- return executeScript(dmlScript, newNamedArgs, configFilePath);
- }
-
- public MLOutput executeScript(String dmlScript, ArrayList<String> argsName,
- ArrayList<String> argsValues)
- throws IOException, DMLException, ParseException {
- return executeScript(dmlScript, argsName, argsValues, null);
- }
-
-
- public MLOutput executeScript(String dmlScript, scala.collection.immutable.Map<String, String> namedArgs)
- throws IOException, DMLException {
- return executeScript(dmlScript, new HashMap<String, String>(scala.collection.JavaConversions.mapAsJavaMap(namedArgs)), null);
- }
-
- public MLOutput executeScript(String dmlScript, scala.collection.immutable.Map<String, String> namedArgs, boolean isPyDML)
- throws IOException, DMLException {
- return executeScript(dmlScript, new HashMap<String, String>(scala.collection.JavaConversions.mapAsJavaMap(namedArgs)), isPyDML, null);
- }
-
- public MLOutput executeScript(String dmlScript, scala.collection.immutable.Map<String, String> namedArgs, String configFilePath)
- throws IOException, DMLException {
- return executeScript(dmlScript, new HashMap<String, String>(scala.collection.JavaConversions.mapAsJavaMap(namedArgs)), configFilePath);
- }
-
- public MLOutput executeScript(String dmlScript, scala.collection.immutable.Map<String, String> namedArgs, boolean isPyDML, String configFilePath)
- throws IOException, DMLException {
- return executeScript(dmlScript, new HashMap<String, String>(scala.collection.JavaConversions.mapAsJavaMap(namedArgs)), isPyDML, configFilePath);
- }
-
- public MLOutput executeScript(String dmlScript, Map<String, String> namedArgs)
- throws IOException, DMLException {
- return executeScript(dmlScript, namedArgs, null);
- }
-
- public MLOutput executeScript(String dmlScript, Map<String, String> namedArgs, boolean isPyDML)
- throws IOException, DMLException {
- return executeScript(dmlScript, namedArgs, isPyDML, null);
- }
-
- public MLOutput executeScript(String dmlScript, Map<String, String> namedArgs, String configFilePath)
- throws IOException, DMLException {
- return executeScript(dmlScript, namedArgs, false, configFilePath);
- }
-
- public MLOutput executeScript(String dmlScript, Map<String, String> namedArgs, boolean isPyDML, String configFilePath)
- throws IOException, DMLException {
- String [] args = new String[namedArgs.size()];
- int i = 0;
- for(Entry<String, String> entry : namedArgs.entrySet()) {
- if(entry.getValue().trim().isEmpty())
- args[i] = entry.getKey() + "=\"" + entry.getValue() + "\"";
- else
- args[i] = entry.getKey() + "=" + entry.getValue();
- i++;
- }
- return compileAndExecuteScript(dmlScript, args, false, true, isPyDML ? ScriptType.PYDML : ScriptType.DML, configFilePath);
- }
-
- private void checkIfRegisteringInputAllowed() throws DMLRuntimeException {
- if(!(DMLScript.rtplatform == RUNTIME_PLATFORM.SPARK || DMLScript.rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK)) {
- throw new DMLRuntimeException("ERROR: registerInput is only allowed for spark execution mode");
- }
- }
-
- private MLOutput compileAndExecuteScript(String dmlScriptFilePath, String [] args, boolean isNamedArgument, ScriptType scriptType, String configFilePath) throws IOException, DMLException {
- return compileAndExecuteScript(dmlScriptFilePath, args, true, isNamedArgument, scriptType, configFilePath);
- }
-
- /**
- * All the execute() methods call this, which after setting appropriate input/output variables
- * calls _compileAndExecuteScript
- * We have explicitly synchronized this function because MLContext/SystemML does not yet support multi-threading.
- * @throws ParseException if ParseException occurs
- * @param dmlScriptFilePath script file path
- * @param args arguments
- * @param isFile whether the string is a path
- * @param isNamedArgument is named argument
- * @param scriptType type of script (DML or PyDML)
- * @param configFilePath path to config file
- * @return output as MLOutput
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- */
- private synchronized MLOutput compileAndExecuteScript(String dmlScriptFilePath, String [] args, boolean isFile, boolean isNamedArgument, ScriptType scriptType, String configFilePath) throws IOException, DMLException {
- try {
-
- DMLScript.SCRIPT_TYPE = scriptType;
-
- if(getActiveMLContext() != null) {
- throw new DMLRuntimeException("SystemML (and hence by definition MLContext) doesnot support parallel execute() calls from same or different MLContexts. "
- + "As a temporary fix, please do explicit synchronization, i.e. synchronized(MLContext.class) { ml.execute(...) } ");
- }
-
- // Set active MLContext.
- _activeMLContext = this;
-
- if( OptimizerUtils.isSparkExecutionMode() ) {
- // Depending on whether registerInput/registerOutput was called initialize the variables
- String[] inputs = (_inVarnames != null) ? _inVarnames.toArray(new String[0]) : new String[0];
- String[] outputs = (_outVarnames != null) ? _outVarnames.toArray(new String[0]) : new String[0];
- Map<String, JavaPairRDD<?,?>> retVal = (_outVarnames!=null && !_outVarnames.isEmpty()) ?
- retVal = new HashMap<String, JavaPairRDD<?,?>>() : null;
- Map<String, MatrixCharacteristics> outMetadata = new HashMap<String, MatrixCharacteristics>();
- Map<String, String> argVals = DMLScript.createArgumentsMap(isNamedArgument, args);
-
- // Run the DML script
- ExecutionContext ec = executeUsingSimplifiedCompilationChain(dmlScriptFilePath, isFile, argVals, scriptType, inputs, outputs, _variables, configFilePath);
- SparkExecutionContext sec = (SparkExecutionContext) ec;
-
- // Now collect the output
- if(_outVarnames != null) {
- if(_variables == null)
- throw new DMLRuntimeException("The symbol table returned after executing the script is empty");
-
- for( String ovar : _outVarnames ) {
- if( !_variables.keySet().contains(ovar) )
- throw new DMLException("Error: The variable " + ovar + " is not available as output after the execution of the DMLScript.");
-
- retVal.put(ovar, sec.getRDDHandleForVariable(ovar, InputInfo.BinaryBlockInputInfo));
- outMetadata.put(ovar, ec.getMatrixCharacteristics(ovar)); // For converting output to dataframe
- }
- }
-
- return new MLOutput(retVal, outMetadata);
- }
- else {
- throw new DMLRuntimeException("Unsupported runtime:" + DMLScript.rtplatform.name());
- }
- }
- finally {
- // Remove global dml config and all thread-local configs
- // TODO enable cleanup whenever invalid GNMF MLcontext is fixed
- // (the test is invalid because it assumes that status of previous execute is kept)
- //ConfigurationManager.setGlobalConfig(new DMLConfig());
- //ConfigurationManager.clearLocalConfigs();
-
- // Reset active MLContext.
- _activeMLContext = null;
- }
- }
-
-
- /**
- * This runs the DML script and returns the ExecutionContext for the caller to extract the output variables.
- * The caller (which is compileAndExecuteScript) is expected to set inputSymbolTable with appropriate matrix representation (RDD, MatrixObject).
- *
- * @param dmlScriptFilePath script file path
- * @param isFile true if file, false otherwise
- * @param argVals map of args
- * @param scriptType type of script (DML or PyDML)
- * @param inputs the inputs
- * @param outputs the outputs
- * @param inputSymbolTable the input symbol table
- * @param configFilePath path to config file
- * @return the execution context
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- private ExecutionContext executeUsingSimplifiedCompilationChain(String dmlScriptFilePath, boolean isFile, Map<String, String> argVals, ScriptType scriptType,
- String[] inputs, String[] outputs, LocalVariableMap inputSymbolTable, String configFilePath)
- throws IOException, DMLException
- {
- //construct dml configuration
- DMLConfig config = (configFilePath == null) ? new DMLConfig() : new DMLConfig(configFilePath);
- for(Entry<String, String> param : _additionalConfigs.entrySet()) {
- config.setTextValue(param.getKey(), param.getValue());
- }
-
- //set global dml and specialized compiler configurations
- ConfigurationManager.setGlobalConfig(config);
- CompilerConfig cconf = new CompilerConfig();
- cconf.set(ConfigType.IGNORE_UNSPECIFIED_ARGS, true);
- cconf.set(ConfigType.REJECT_READ_WRITE_UNKNOWNS, false);
- cconf.set(ConfigType.ALLOW_CSE_PERSISTENT_READS, false);
- ConfigurationManager.setGlobalConfig(cconf);
-
- //read dml script string
- String dmlScriptStr = DMLScript.readDMLScript( isFile, dmlScriptFilePath);
-
- //simplified compilation chain
- _rtprog = null;
-
- //parsing
- ParserWrapper parser = ParserFactory.createParser(scriptType);
- DMLProgram prog;
- if (isFile) {
- prog = parser.parse(dmlScriptFilePath, null, argVals);
- } else {
- prog = parser.parse(null, dmlScriptStr, argVals);
- }
-
- //language validate
- DMLTranslator dmlt = new DMLTranslator(prog);
- dmlt.liveVariableAnalysis(prog);
- dmlt.validateParseTree(prog);
-
- //hop construct/rewrite
- dmlt.constructHops(prog);
- dmlt.rewriteHopsDAG(prog);
-
- Explain.explain(prog);
-
- //rewrite persistent reads/writes
- if(inputSymbolTable != null) {
- RewriteRemovePersistentReadWrite rewrite = new RewriteRemovePersistentReadWrite(inputs, outputs, inputSymbolTable);
- ProgramRewriter rewriter2 = new ProgramRewriter(rewrite);
- rewriter2.rewriteProgramHopDAGs(prog);
- }
-
- //lop construct and runtime prog generation
- dmlt.constructLops(prog);
- _rtprog = prog.getRuntimeProgram(config);
-
- //optional global data flow optimization
- if(OptimizerUtils.isOptLevel(OptimizationLevel.O4_GLOBAL_TIME_MEMORY) ) {
- _rtprog = GlobalOptimizerWrapper.optimizeProgram(prog, _rtprog);
- }
-
- // launch SystemML appmaster not required as it is already launched
-
- //count number compiled MR jobs / SP instructions
- ExplainCounts counts = Explain.countDistributedOperations(_rtprog);
- Statistics.resetNoOfCompiledJobs( counts.numJobs );
-
- // Initialize caching and scratch space
- DMLScript.initHadoopExecution(config);
-
- //final cleanup runtime prog
- JMLCUtils.cleanupRuntimeProgram(_rtprog, outputs);
-
- //create and populate execution context
- ExecutionContext ec = ExecutionContextFactory.createContext(_rtprog);
- if(inputSymbolTable != null) {
- ec.setVariables(inputSymbolTable);
- }
-
- //core execute runtime program
- _rtprog.execute( ec );
-
- return ec;
- }
-
- // -------------------------------- Private methods ends ----------------------------------------------------------
-
- // TODO: Add additional create to provide sep, missing values, etc. for CSV
- /**
- * Experimental API: Might be discontinued in future release
- * @param sparkSession the Spark Session
- * @param filePath the file path
- * @param format the format
- * @return the MLMatrix
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLMatrix read(SparkSession sparkSession, String filePath, String format) throws IOException, DMLException, ParseException {
- this.reset();
- this.registerOutput("output");
- MLOutput out = this.executeScript("output = read(\"" + filePath + "\", format=\"" + format + "\"); " + MLMatrix.writeStmt);
- JavaPairRDD<MatrixIndexes, MatrixBlock> blocks = out.getBinaryBlockedRDD("output");
- MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output");
- return MLMatrix.createMLMatrix(this, sparkSession, blocks, mcOut);
- }
-
- /**
- * Experimental API: Might be discontinued in future release
- * @param sqlContext the SQL Context
- * @param filePath the file path
- * @param format the format
- * @return the MLMatrix
- * @throws IOException if IOException occurs
- * @throws DMLException if DMLException occurs
- * @throws ParseException if ParseException occurs
- */
- public MLMatrix read(SQLContext sqlContext, String filePath, String format) throws IOException, DMLException, ParseException {
- SparkSession sparkSession = sqlContext.sparkSession();
- return read(sparkSession, filePath, format);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/api/MLContextProxy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContextProxy.java b/src/main/java/org/apache/sysml/api/MLContextProxy.java
index db87230..18b2eaa 100644
--- a/src/main/java/org/apache/sysml/api/MLContextProxy.java
+++ b/src/main/java/org/apache/sysml/api/MLContextProxy.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@ package org.apache.sysml.api;
import java.util.ArrayList;
+import org.apache.sysml.api.mlcontext.MLContext;
import org.apache.sysml.api.mlcontext.MLContextException;
import org.apache.sysml.parser.Expression;
import org.apache.sysml.parser.LanguageException;
@@ -31,59 +32,42 @@ import org.apache.sysml.runtime.instructions.Instruction;
* which would try to load spark libraries and hence fail if these are not available. This
* indirection is much more efficient than catching NoClassDefFoundErrors for every access
* to MLContext (e.g., on each recompile).
- *
+ *
*/
-public class MLContextProxy
+public class MLContextProxy
{
-
+
private static boolean _active = false;
-
+
public static void setActive(boolean flag) {
_active = flag;
}
-
+
public static boolean isActive() {
return _active;
}
- @SuppressWarnings("deprecation")
- public static ArrayList<Instruction> performCleanupAfterRecompilation(ArrayList<Instruction> tmp)
+ public static ArrayList<Instruction> performCleanupAfterRecompilation(ArrayList<Instruction> tmp)
{
- if(org.apache.sysml.api.MLContext.getActiveMLContext() != null) {
- return org.apache.sysml.api.MLContext.getActiveMLContext().performCleanupAfterRecompilation(tmp);
- } else if (org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext() != null) {
- return org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext().getInternalProxy().performCleanupAfterRecompilation(tmp);
- }
- return tmp;
+ return MLContext.getActiveMLContext().getInternalProxy().performCleanupAfterRecompilation(tmp);
}
- @SuppressWarnings("deprecation")
- public static void setAppropriateVarsForRead(Expression source, String targetname)
- throws LanguageException
+ public static void setAppropriateVarsForRead(Expression source, String targetname)
+ throws LanguageException
{
- if(org.apache.sysml.api.MLContext.getActiveMLContext() != null) {
- org.apache.sysml.api.MLContext.getActiveMLContext().setAppropriateVarsForRead(source, targetname);
- } else if (org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext() != null) {
- org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext().getInternalProxy().setAppropriateVarsForRead(source, targetname);
- }
+ MLContext.getActiveMLContext().getInternalProxy().setAppropriateVarsForRead(source, targetname);
}
- @SuppressWarnings("deprecation")
public static Object getActiveMLContext() {
- if (org.apache.sysml.api.MLContext.getActiveMLContext() != null) {
- return org.apache.sysml.api.MLContext.getActiveMLContext();
- } else if (org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext() != null) {
- return org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext();
- }
- return null;
+ return MLContext.getActiveMLContext();
}
public static Object getActiveMLContextForAPI() {
- if (org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext() != null) {
- return org.apache.sysml.api.mlcontext.MLContext.getActiveMLContext();
+ if (MLContext.getActiveMLContext() != null) {
+ return MLContext.getActiveMLContext();
}
throw new MLContextException("No MLContext object is currently active. Have you created one? "
+ "Hint: in Scala, 'val ml = new MLContext(sc)'", true);
}
-
+
}