You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2017/05/26 06:52:13 UTC
[2/4] incubator-systemml git commit: [SYSTEMML-1303] Remove
deprecated old MLContext API
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 1dd3600..06a2005 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -84,8 +84,8 @@ public class SparkExecutionContext extends ExecutionContext
{
private static final Log LOG = LogFactory.getLog(SparkExecutionContext.class.getName());
private static final boolean LDEBUG = false; //local debug flag
-
- //internal configurations
+
+ //internal configurations
private static boolean LAZY_SPARKCTX_CREATION = true;
private static boolean ASYNCHRONOUS_VAR_DESTROY = true;
@@ -93,16 +93,16 @@ public class SparkExecutionContext extends ExecutionContext
//executor memory and relative fractions as obtained from the spark configuration
private static SparkClusterConfig _sconf = null;
-
- //singleton spark context (as there can be only one spark context per JVM)
- private static JavaSparkContext _spctx = null;
-
- //registry of parallelized RDDs to enforce that at any time, we spent at most
+
+ //singleton spark context (as there can be only one spark context per JVM)
+ private static JavaSparkContext _spctx = null;
+
+ //registry of parallelized RDDs to enforce that at any time, we spent at most
//10% of JVM max heap size for parallelized RDDs; if this is not sufficient,
//matrices or frames are exported to HDFS and the RDDs are created from files.
//TODO unify memory management for CP, par RDDs, and potentially broadcasts
private static MemoryManagerParRDDs _parRDDs = new MemoryManagerParRDDs(0.1);
-
+
static {
// for internal debugging only
if( LDEBUG ) {
@@ -111,31 +111,31 @@ public class SparkExecutionContext extends ExecutionContext
}
}
- protected SparkExecutionContext(boolean allocateVars, Program prog)
+ protected SparkExecutionContext(boolean allocateVars, Program prog)
{
//protected constructor to force use of ExecutionContextFactory
super( allocateVars, prog );
-
+
//spark context creation via internal initializer
if( !(LAZY_SPARKCTX_CREATION && OptimizerUtils.isHybridExecutionMode()) ) {
initSparkContext();
}
}
-
+
/**
* Returns the used singleton spark context. In case of lazy spark context
* creation, this methods blocks until the spark context is created.
- *
+ *
* @return java spark context
*/
public JavaSparkContext getSparkContext()
{
- //lazy spark context creation on demand (lazy instead of asynchronous
+ //lazy spark context creation on demand (lazy instead of asynchronous
//to avoid wait for uninitialized spark context on close)
if( LAZY_SPARKCTX_CREATION ) {
initSparkContext();
}
-
+
//return the created spark context
return _spctx;
}
@@ -144,11 +144,11 @@ public class SparkExecutionContext extends ExecutionContext
initSparkContext();
return _spctx;
}
-
+
/**
* Indicates if the spark context has been created or has
* been passed in from outside.
- *
+ *
* @return true if spark context created
*/
public synchronized static boolean isSparkContextCreated() {
@@ -159,26 +159,25 @@ public class SparkExecutionContext extends ExecutionContext
_spctx = null;
}
- public void close()
+ public void close()
{
synchronized( SparkExecutionContext.class ) {
- if( _spctx != null )
+ if( _spctx != null )
{
//stop the spark context if existing
_spctx.stop();
-
+
//make sure stopped context is never used again
- _spctx = null;
+ _spctx = null;
}
-
+
}
}
-
+
public static boolean isLazySparkContextCreation(){
return LAZY_SPARKCTX_CREATION;
}
- @SuppressWarnings("deprecation")
private synchronized static void initSparkContext()
{
//check for redundant spark context init
@@ -186,24 +185,19 @@ public class SparkExecutionContext extends ExecutionContext
return;
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
+
//create a default spark context (master, appname, etc refer to system properties
//as given in the spark configuration or during spark-submit)
-
+
Object mlCtxObj = MLContextProxy.getActiveMLContext();
- if(mlCtxObj != null)
+ if(mlCtxObj != null)
{
// This is when DML is called through spark shell
// Will clean the passing of static variables later as this involves minimal change to DMLScript
- if (mlCtxObj instanceof org.apache.sysml.api.MLContext) {
- org.apache.sysml.api.MLContext mlCtx = (org.apache.sysml.api.MLContext) mlCtxObj;
- _spctx = new JavaSparkContext(mlCtx.getSparkContext());
- } else if (mlCtxObj instanceof org.apache.sysml.api.mlcontext.MLContext) {
- org.apache.sysml.api.mlcontext.MLContext mlCtx = (org.apache.sysml.api.mlcontext.MLContext) mlCtxObj;
- _spctx = MLContextUtil.getJavaSparkContext(mlCtx);
- }
+ org.apache.sysml.api.mlcontext.MLContext mlCtx = (org.apache.sysml.api.mlcontext.MLContext) mlCtxObj;
+ _spctx = MLContextUtil.getJavaSparkContext(mlCtx);
}
- else
+ else
{
if(DMLScript.USE_LOCAL_SPARK_CONFIG) {
// For now set 4 cores for integration testing :)
@@ -220,128 +214,128 @@ public class SparkExecutionContext extends ExecutionContext
SparkConf conf = createSystemMLSparkConf();
_spctx = new JavaSparkContext(conf);
}
-
+
_parRDDs.clear();
}
-
- // Set warning if spark.driver.maxResultSize is not set. It needs to be set before starting Spark Context for CP collect
+
+ // Set warning if spark.driver.maxResultSize is not set. It needs to be set before starting Spark Context for CP collect
String strDriverMaxResSize = _spctx.getConf().get("spark.driver.maxResultSize", "1g");
- long driverMaxResSize = UtilFunctions.parseMemorySize(strDriverMaxResSize);
+ long driverMaxResSize = UtilFunctions.parseMemorySize(strDriverMaxResSize);
if (driverMaxResSize != 0 && driverMaxResSize<OptimizerUtils.getLocalMemBudget() && !DMLScript.USE_LOCAL_SPARK_CONFIG)
LOG.warn("Configuration parameter spark.driver.maxResultSize set to " + UtilFunctions.formatMemorySize(driverMaxResSize) + "."
- + " You can set it through Spark default configuration setting either to 0 (unlimited) or to available memory budget of size "
+ + " You can set it through Spark default configuration setting either to 0 (unlimited) or to available memory budget of size "
+ UtilFunctions.formatMemorySize((long)OptimizerUtils.getLocalMemBudget()) + ".");
-
+
//globally add binaryblock serialization framework for all hdfs read/write operations
- //TODO if spark context passed in from outside (mlcontext), we need to clean this up at the end
+ //TODO if spark context passed in from outside (mlcontext), we need to clean this up at the end
if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
MRJobConfiguration.addBinaryBlockSerializationFramework( _spctx.hadoopConfiguration() );
-
+
//statistics maintenance
if( DMLScript.STATISTICS ){
Statistics.setSparkCtxCreateTime(System.nanoTime()-t0);
}
- }
-
+ }
+
/**
* Sets up a SystemML-preferred Spark configuration based on the implicit
* default configuration (as passed via configurations from outside).
- *
+ *
* @return spark configuration
*/
public static SparkConf createSystemMLSparkConf() {
SparkConf conf = new SparkConf();
-
+
//always set unlimited result size (required for cp collect)
conf.set("spark.driver.maxResultSize", "0");
-
+
//always use the fair scheduler (for single jobs, it's equivalent to fifo
//but for concurrent jobs in parfor it ensures better data locality because
//round robin assignment mitigates the problem of 'sticky slots')
if( FAIR_SCHEDULER_MODE ) {
conf.set("spark.scheduler.mode", "FAIR");
}
-
+
//increase scheduler delay (usually more robust due to better data locality)
if( !conf.contains("spark.locality.wait") ) { //default 3s
conf.set("spark.locality.wait", "5s");
}
-
+
return conf;
}
/**
* Spark instructions should call this for all matrix inputs except broadcast
* variables.
- *
+ *
* @param varname variable name
* @return JavaPairRDD of MatrixIndexes-MatrixBlocks
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@SuppressWarnings("unchecked")
- public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname )
- throws DMLRuntimeException
+ public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname )
+ throws DMLRuntimeException
{
return (JavaPairRDD<MatrixIndexes,MatrixBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo);
}
-
+
/**
* Spark instructions should call this for all frame inputs except broadcast
* variables.
- *
+ *
* @param varname variable name
* @return JavaPairRDD of Longs-FrameBlocks
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@SuppressWarnings("unchecked")
- public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname )
- throws DMLRuntimeException
+ public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname )
+ throws DMLRuntimeException
{
JavaPairRDD<Long,FrameBlock> out = (JavaPairRDD<Long,FrameBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo);
return out;
}
- public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo )
+ public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo )
throws DMLRuntimeException
{
Data dat = getVariable(varname);
if( dat instanceof MatrixObject ) {
MatrixObject mo = getMatrixObject(varname);
- return getRDDHandleForMatrixObject(mo, inputInfo);
+ return getRDDHandleForMatrixObject(mo, inputInfo);
}
else if( dat instanceof FrameObject ) {
FrameObject fo = getFrameObject(varname);
- return getRDDHandleForFrameObject(fo, inputInfo);
+ return getRDDHandleForFrameObject(fo, inputInfo);
}
else {
throw new DMLRuntimeException("Failed to obtain RDD for data type other than matrix or frame.");
}
}
-
+
/**
- * This call returns an RDD handle for a given matrix object. This includes
- * the creation of RDDs for in-memory or binary-block HDFS data.
- *
+ * This call returns an RDD handle for a given matrix object. This includes
+ * the creation of RDDs for in-memory or binary-block HDFS data.
+ *
* @param mo matrix object
* @param inputInfo input info
* @return JavaPairRDD handle for a matrix object
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@SuppressWarnings("unchecked")
- public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo )
+ public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo )
throws DMLRuntimeException
- {
+ {
//NOTE: MB this logic should be integrated into MatrixObject
- //However, for now we cannot assume that spark libraries are
- //always available and hence only store generic references in
+ //However, for now we cannot assume that spark libraries are
+ //always available and hence only store generic references in
//matrix object while all the logic is in the SparkExecContext
-
+
JavaSparkContext sc = getSparkContext();
JavaPairRDD<?,?> rdd = null;
//CASE 1: rdd already existing (reuse if checkpoint or trigger
- //pending rdd operations if not yet cached but prevent to re-evaluate
+ //pending rdd operations if not yet cached but prevent to re-evaluate
//rdd operations if already executed and cached
- if( mo.getRDDHandle()!=null
+ if( mo.getRDDHandle()!=null
&& (mo.getRDDHandle().isCheckpointRDD() || !mo.isCached(false)) )
{
//return existing rdd handling (w/o input format change)
@@ -359,7 +353,7 @@ public class SparkExecutionContext extends ExecutionContext
if( mo.isDirty() || !mo.isHDFSFileExists() ) //write if necessary
mo.exportData();
rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
- rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug
+ rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug
fromFile = true;
}
else { //default case
@@ -368,7 +362,7 @@ public class SparkExecutionContext extends ExecutionContext
mo.release(); //unpin matrix
_parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
}
-
+
//keep rdd handle for future operations on it
RDDObject rddhandle = new RDDObject(rdd, mo.getVarName());
rddhandle.setHDFSFile(fromFile);
@@ -396,43 +390,43 @@ public class SparkExecutionContext extends ExecutionContext
else {
throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable");
}
-
+
//keep rdd handle for future operations on it
RDDObject rddhandle = new RDDObject(rdd, mo.getVarName());
rddhandle.setHDFSFile(true);
mo.setRDDHandle(rddhandle);
}
-
+
return rdd;
}
-
+
/**
* FIXME: currently this implementation assumes matrix representations but frame signature
* in order to support the old transform implementation.
- *
+ *
* @param fo frame object
* @param inputInfo input info
* @return JavaPairRDD handle for a frame object
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@SuppressWarnings("unchecked")
- public JavaPairRDD<?,?> getRDDHandleForFrameObject( FrameObject fo, InputInfo inputInfo )
+ public JavaPairRDD<?,?> getRDDHandleForFrameObject( FrameObject fo, InputInfo inputInfo )
throws DMLRuntimeException
- {
+ {
//NOTE: MB this logic should be integrated into FrameObject
- //However, for now we cannot assume that spark libraries are
- //always available and hence only store generic references in
+ //However, for now we cannot assume that spark libraries are
+ //always available and hence only store generic references in
//matrix object while all the logic is in the SparkExecContext
-
- InputInfo inputInfo2 = (inputInfo==InputInfo.BinaryBlockInputInfo) ?
+
+ InputInfo inputInfo2 = (inputInfo==InputInfo.BinaryBlockInputInfo) ?
InputInfo.BinaryBlockFrameInputInfo : inputInfo;
-
+
JavaSparkContext sc = getSparkContext();
JavaPairRDD<?,?> rdd = null;
//CASE 1: rdd already existing (reuse if checkpoint or trigger
- //pending rdd operations if not yet cached but prevent to re-evaluate
+ //pending rdd operations if not yet cached but prevent to re-evaluate
//rdd operations if already executed and cached
- if( fo.getRDDHandle()!=null
+ if( fo.getRDDHandle()!=null
&& (fo.getRDDHandle().isCheckpointRDD() || !fo.isCached(false)) )
{
//return existing rdd handling (w/o input format change)
@@ -451,7 +445,7 @@ public class SparkExecutionContext extends ExecutionContext
fo.exportData();
}
rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass);
- rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug
+ rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug
fromFile = true;
}
else { //default case
@@ -460,7 +454,7 @@ public class SparkExecutionContext extends ExecutionContext
fo.release(); //unpin frame
_parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
}
-
+
//keep rdd handle for future operations on it
RDDObject rddhandle = new RDDObject(rdd, fo.getVarName());
rddhandle.setHDFSFile(fromFile);
@@ -487,64 +481,64 @@ public class SparkExecutionContext extends ExecutionContext
else {
throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable");
}
-
+
//keep rdd handle for future operations on it
RDDObject rddhandle = new RDDObject(rdd, fo.getVarName());
rddhandle.setHDFSFile(true);
fo.setRDDHandle(rddhandle);
}
-
+
return rdd;
}
-
+
/**
* TODO So far we only create broadcast variables but never destroy
* them. This is a memory leak which might lead to executor out-of-memory.
- * However, in order to handle this, we need to keep track when broadcast
+ * However, in order to handle this, we need to keep track when broadcast
* variables are no longer required.
- *
+ *
* @param varname variable name
* @return wrapper for broadcast variables
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@SuppressWarnings("unchecked")
- public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( String varname )
+ public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( String varname )
throws DMLRuntimeException
- {
+ {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
MatrixObject mo = getMatrixObject(varname);
-
+
PartitionedBroadcast<MatrixBlock> bret = null;
-
+
//reuse existing broadcast handle
- if( mo.getBroadcastHandle()!=null
- && mo.getBroadcastHandle().isValid() )
+ if( mo.getBroadcastHandle()!=null
+ && mo.getBroadcastHandle().isValid() )
{
bret = mo.getBroadcastHandle().getBroadcast();
}
-
+
//create new broadcast handle (never created, evicted)
- if( bret == null )
+ if( bret == null )
{
//account for overwritten invalid broadcast (e.g., evicted)
if( mo.getBroadcastHandle()!=null )
CacheableData.addBroadcastSize(-mo.getBroadcastHandle().getSize());
-
- //obtain meta data for matrix
+
+ //obtain meta data for matrix
int brlen = (int) mo.getNumRowsPerBlock();
int bclen = (int) mo.getNumColumnsPerBlock();
-
+
//create partitioned matrix block and release memory consumed by input
MatrixBlock mb = mo.acquireRead();
PartitionedBlock<MatrixBlock> pmb = new PartitionedBlock<MatrixBlock>(mb, brlen, bclen);
mo.release();
-
+
//determine coarse-grained partitioning
int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), brlen, bclen);
- int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
+ int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
Broadcast<PartitionedBlock<MatrixBlock>>[] ret = new Broadcast[numParts];
-
+
//create coarse-grained partitioned broadcasts
if( numParts > 1 ) {
for( int i=0; i<numParts; i++ ) {
@@ -557,60 +551,60 @@ public class SparkExecutionContext extends ExecutionContext
else { //single partition
ret[0] = getSparkContext().broadcast(pmb);
}
-
+
bret = new PartitionedBroadcast<MatrixBlock>(ret);
- BroadcastObject<MatrixBlock> bchandle = new BroadcastObject<MatrixBlock>(bret, varname,
+ BroadcastObject<MatrixBlock> bchandle = new BroadcastObject<MatrixBlock>(bret, varname,
OptimizerUtils.estimatePartitionedSizeExactSparsity(mo.getMatrixCharacteristics()));
mo.setBroadcastHandle(bchandle);
CacheableData.addBroadcastSize(bchandle.getSize());
}
-
+
if (DMLScript.STATISTICS) {
Statistics.accSparkBroadCastTime(System.nanoTime() - t0);
Statistics.incSparkBroadcastCount(1);
}
-
+
return bret;
}
-
+
@SuppressWarnings("unchecked")
- public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( String varname)
+ public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( String varname)
throws DMLRuntimeException
- {
+ {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
FrameObject fo = getFrameObject(varname);
-
+
PartitionedBroadcast<FrameBlock> bret = null;
-
+
//reuse existing broadcast handle
- if( fo.getBroadcastHandle()!=null
- && fo.getBroadcastHandle().isValid() )
+ if( fo.getBroadcastHandle()!=null
+ && fo.getBroadcastHandle().isValid() )
{
bret = fo.getBroadcastHandle().getBroadcast();
}
-
+
//create new broadcast handle (never created, evicted)
- if( bret == null )
+ if( bret == null )
{
//account for overwritten invalid broadcast (e.g., evicted)
if( fo.getBroadcastHandle()!=null )
CacheableData.addBroadcastSize(-fo.getBroadcastHandle().getSize());
-
- //obtain meta data for frame
+
+ //obtain meta data for frame
int bclen = (int) fo.getNumColumns();
int brlen = OptimizerUtils.getDefaultFrameSize();
-
+
//create partitioned frame block and release memory consumed by input
FrameBlock mb = fo.acquireRead();
PartitionedBlock<FrameBlock> pmb = new PartitionedBlock<FrameBlock>(mb, brlen, bclen);
fo.release();
-
+
//determine coarse-grained partitioning
int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(fo.getNumRows(), fo.getNumColumns(), brlen, bclen);
- int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
+ int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
Broadcast<PartitionedBlock<FrameBlock>>[] ret = new Broadcast[numParts];
-
+
//create coarse-grained partitioned broadcasts
if( numParts > 1 ) {
for( int i=0; i<numParts; i++ ) {
@@ -623,41 +617,41 @@ public class SparkExecutionContext extends ExecutionContext
else { //single partition
ret[0] = getSparkContext().broadcast(pmb);
}
-
+
bret = new PartitionedBroadcast<FrameBlock>(ret);
- BroadcastObject<FrameBlock> bchandle = new BroadcastObject<FrameBlock>(bret, varname,
+ BroadcastObject<FrameBlock> bchandle = new BroadcastObject<FrameBlock>(bret, varname,
OptimizerUtils.estimatePartitionedSizeExactSparsity(fo.getMatrixCharacteristics()));
fo.setBroadcastHandle(bchandle);
CacheableData.addBroadcastSize(bchandle.getSize());
}
-
+
if (DMLScript.STATISTICS) {
Statistics.accSparkBroadCastTime(System.nanoTime() - t0);
Statistics.incSparkBroadcastCount(1);
}
-
+
return bret;
}
/**
- * Keep the output rdd of spark rdd operations as meta data of matrix/frame
+ * Keep the output rdd of spark rdd operations as meta data of matrix/frame
* objects in the symbol table.
- *
+ *
* @param varname variable name
* @param rdd JavaPairRDD handle for variable
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
- public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd)
+ public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd)
throws DMLRuntimeException
{
CacheableData<?> obj = getCacheableData(varname);
RDDObject rddhandle = new RDDObject(rdd, varname);
obj.setRDDHandle( rddhandle );
}
-
+
/**
* Utility method for creating an RDD out of an in-memory matrix block.
- *
+ *
* @param sc java spark context
* @param src matrix block
* @param brlen block row length
@@ -665,13 +659,13 @@ public class SparkExecutionContext extends ExecutionContext
* @return JavaPairRDD handle to matrix block
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
- public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen)
+ public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen)
throws DMLRuntimeException
- {
+ {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
LinkedList<Tuple2<MatrixIndexes,MatrixBlock>> list = new LinkedList<Tuple2<MatrixIndexes,MatrixBlock>>();
-
- if( src.getNumRows() <= brlen
+
+ if( src.getNumRows() <= brlen
&& src.getNumColumns() <= bclen )
{
list.addLast(new Tuple2<MatrixIndexes,MatrixBlock>(new MatrixIndexes(1,1), src));
@@ -679,44 +673,44 @@ public class SparkExecutionContext extends ExecutionContext
else
{
boolean sparse = src.isInSparseFormat();
-
+
//create and write subblocks of matrix
for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++)
for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++)
{
int maxRow = (blockRow*brlen + brlen < src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen;
int maxCol = (blockCol*bclen + bclen < src.getNumColumns()) ? bclen : src.getNumColumns() - blockCol*bclen;
-
+
MatrixBlock block = new MatrixBlock(maxRow, maxCol, sparse);
-
+
int row_offset = blockRow*brlen;
int col_offset = blockCol*bclen;
-
+
//copy submatrix to block
- src.sliceOperations( row_offset, row_offset+maxRow-1,
- col_offset, col_offset+maxCol-1, block );
-
+ src.sliceOperations( row_offset, row_offset+maxRow-1,
+ col_offset, col_offset+maxCol-1, block );
+
//append block to sequence file
MatrixIndexes indexes = new MatrixIndexes(blockRow+1, blockCol+1);
list.addLast(new Tuple2<MatrixIndexes,MatrixBlock>(indexes, block));
}
}
-
+
JavaPairRDD<MatrixIndexes,MatrixBlock> result = sc.parallelizePairs(list);
if (DMLScript.STATISTICS) {
Statistics.accSparkParallelizeTime(System.nanoTime() - t0);
Statistics.incSparkParallelizeCount(1);
}
-
+
return result;
}
- public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src)
+ public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src)
throws DMLRuntimeException
- {
+ {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
LinkedList<Tuple2<Long,FrameBlock>> list = new LinkedList<Tuple2<Long,FrameBlock>>();
-
+
//create and write subblocks of matrix
int blksize = ConfigurationManager.getBlocksize();
for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)blksize); blockRow++)
@@ -725,28 +719,28 @@ public class SparkExecutionContext extends ExecutionContext
int roffset = blockRow*blksize;
FrameBlock block = new FrameBlock(src.getSchema());
-
+
//copy sub frame to block, incl meta data on first
- src.sliceOperations( roffset, roffset+maxRow-1, 0, src.getNumColumns()-1, block );
+ src.sliceOperations( roffset, roffset+maxRow-1, 0, src.getNumColumns()-1, block );
if( roffset == 0 )
block.setColumnMetadata(src.getColumnMetadata());
-
+
//append block to sequence file
list.addLast(new Tuple2<Long,FrameBlock>((long)roffset+1, block));
}
-
+
JavaPairRDD<Long,FrameBlock> result = sc.parallelizePairs(list);
if (DMLScript.STATISTICS) {
Statistics.accSparkParallelizeTime(System.nanoTime() - t0);
Statistics.incSparkParallelizeCount(1);
}
-
+
return result;
}
-
+
/**
* This method is a generic abstraction for calls from the buffer pool.
- *
+ *
* @param rdd rdd object
* @param rlen number of rows
* @param clen number of columns
@@ -757,21 +751,21 @@ public class SparkExecutionContext extends ExecutionContext
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@SuppressWarnings("unchecked")
- public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int brlen, int bclen, long nnz)
+ public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int brlen, int bclen, long nnz)
throws DMLRuntimeException
- {
+ {
return toMatrixBlock(
- (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(),
+ (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(),
rlen, clen, brlen, bclen, nnz);
}
-
+
/**
- * Utility method for creating a single matrix block out of a binary block RDD.
- * Note that this collect call might trigger execution of any pending transformations.
- *
+ * Utility method for creating a single matrix block out of a binary block RDD.
+ * Note that this collect call might trigger execution of any pending transformations.
+ *
* NOTE: This is an unguarded utility function, which requires memory for both the output matrix
* and its collected, blocked representation.
- *
+ *
* @param rdd JavaPairRDD for matrix block
* @param rlen number of rows
* @param clen number of columns
@@ -781,19 +775,19 @@ public class SparkExecutionContext extends ExecutionContext
* @return matrix block
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
- public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz)
+ public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz)
throws DMLRuntimeException
{
-
+
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
MatrixBlock out = null;
-
+
if( rlen <= brlen && clen <= bclen ) //SINGLE BLOCK
{
//special case without copy and nnz maintenance
List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-
+
if( list.size()>1 )
throw new DMLRuntimeException("Expecting no more than one result block.");
else if( list.size()==1 )
@@ -806,70 +800,70 @@ public class SparkExecutionContext extends ExecutionContext
//determine target sparse/dense representation
long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
-
+
//create output matrix block (w/ lazy allocation)
out = new MatrixBlock(rlen, clen, sparse, lnnz);
-
+
List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-
+
//copy blocks one-at-a-time into output matrix block
- long aNnz = 0;
+ long aNnz = 0;
for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
{
//unpack index-block pair
MatrixIndexes ix = keyval._1();
MatrixBlock block = keyval._2();
-
+
//compute row/column block offsets
int row_offset = (int)(ix.getRowIndex()-1)*brlen;
int col_offset = (int)(ix.getColumnIndex()-1)*bclen;
int rows = block.getNumRows();
int cols = block.getNumColumns();
-
+
//append block
if( sparse ) { //SPARSE OUTPUT
//append block to sparse target in order to avoid shifting, where
//we use a shallow row copy in case of MCSR and single column blocks
- //note: this append requires, for multiple column blocks, a final sort
+ //note: this append requires, for multiple column blocks, a final sort
out.appendToSparse(block, row_offset, col_offset, clen>bclen);
}
else { //DENSE OUTPUT
- out.copy( row_offset, row_offset+rows-1,
- col_offset, col_offset+cols-1, block, false );
+ out.copy( row_offset, row_offset+rows-1,
+ col_offset, col_offset+cols-1, block, false );
}
-
+
//incremental maintenance nnz
aNnz += block.getNonZeros();
}
-
+
//post-processing output matrix
if( sparse && clen>bclen )
out.sortSparseRows();
out.setNonZeros(aNnz);
out.examSparsity();
}
-
+
if (DMLScript.STATISTICS) {
Statistics.accSparkCollectTime(System.nanoTime() - t0);
Statistics.incSparkCollectCount(1);
}
-
+
return out;
}
-
+
@SuppressWarnings("unchecked")
- public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz)
+ public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz)
throws DMLRuntimeException
- {
+ {
return toMatrixBlock(
- (JavaPairRDD<MatrixIndexes, MatrixCell>) rdd.getRDD(),
+ (JavaPairRDD<MatrixIndexes, MatrixCell>) rdd.getRDD(),
rlen, clen, nnz);
}
-
+
/**
- * Utility method for creating a single matrix block out of a binary cell RDD.
- * Note that this collect call might trigger execution of any pending transformations.
- *
+ * Utility method for creating a single matrix block out of a binary cell RDD.
+ * Note that this collect call might trigger execution of any pending transformations.
+ *
* @param rdd JavaPairRDD for matrix block
* @param rlen number of rows
* @param clen number of columns
@@ -877,57 +871,57 @@ public class SparkExecutionContext extends ExecutionContext
* @return matrix block
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
- public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> rdd, int rlen, int clen, long nnz)
+ public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> rdd, int rlen, int clen, long nnz)
throws DMLRuntimeException
- {
+ {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
MatrixBlock out = null;
-
+
//determine target sparse/dense representation
long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
-
+
//create output matrix block (w/ lazy allocation)
out = new MatrixBlock(rlen, clen, sparse);
-
+
List<Tuple2<MatrixIndexes,MatrixCell>> list = rdd.collect();
-
+
//copy blocks one-at-a-time into output matrix block
for( Tuple2<MatrixIndexes,MatrixCell> keyval : list )
{
//unpack index-block pair
MatrixIndexes ix = keyval._1();
MatrixCell cell = keyval._2();
-
+
//append cell to dense/sparse target in order to avoid shifting for sparse
//note: this append requires a final sort of sparse rows
out.appendValue((int)ix.getRowIndex()-1, (int)ix.getColumnIndex()-1, cell.getValue());
}
-
+
//post-processing output matrix
if( sparse )
out.sortSparseRows();
out.recomputeNonZeros();
out.examSparsity();
-
+
if (DMLScript.STATISTICS) {
Statistics.accSparkCollectTime(System.nanoTime() - t0);
Statistics.incSparkCollectCount(1);
}
-
+
return out;
}
- public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz)
+ public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz)
throws DMLRuntimeException
{
-
+
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
PartitionedBlock<MatrixBlock> out = new PartitionedBlock<MatrixBlock>(rlen, clen, brlen, bclen);
List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-
+
//copy blocks one-at-a-time into output matrix block
for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
{
@@ -936,24 +930,24 @@ public class SparkExecutionContext extends ExecutionContext
MatrixBlock block = keyval._2();
out.setBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex(), block);
}
-
+
if (DMLScript.STATISTICS) {
Statistics.accSparkCollectTime(System.nanoTime() - t0);
Statistics.incSparkCollectCount(1);
}
-
+
return out;
}
@SuppressWarnings("unchecked")
- public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] schema, int rlen, int clen)
- throws DMLRuntimeException
+ public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] schema, int rlen, int clen)
+ throws DMLRuntimeException
{
JavaPairRDD<Long,FrameBlock> lrdd = (JavaPairRDD<Long,FrameBlock>) rdd.getRDD();
return toFrameBlock(lrdd, schema, rlen, clen);
}
- public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, ValueType[] schema, int rlen, int clen)
+ public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, ValueType[] schema, int rlen, int clen)
throws DMLRuntimeException
{
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -964,16 +958,16 @@ public class SparkExecutionContext extends ExecutionContext
//create output frame block (w/ lazy allocation)
FrameBlock out = new FrameBlock(schema);
out.ensureAllocatedColumns(rlen);
-
+
List<Tuple2<Long,FrameBlock>> list = rdd.collect();
-
+
//copy blocks one-at-a-time into output matrix block
for( Tuple2<Long,FrameBlock> keyval : list )
{
//unpack index-block pair
int ix = (int)(keyval._1() - 1);
FrameBlock block = keyval._2();
-
+
//copy into output frame
out.copy( ix, ix+block.getNumRows()-1, 0, block.getNumColumns()-1, block );
if( ix == 0 ) {
@@ -981,12 +975,12 @@ public class SparkExecutionContext extends ExecutionContext
out.setColumnMetadata(block.getColumnMetadata());
}
}
-
+
if (DMLScript.STATISTICS) {
Statistics.accSparkCollectTime(System.nanoTime() - t0);
Statistics.incSparkCollectCount(1);
}
-
+
return out;
}
@@ -994,17 +988,17 @@ public class SparkExecutionContext extends ExecutionContext
public static long writeRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo )
{
JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD();
-
+
//piggyback nnz maintenance on write
LongAccumulator aNnz = getSparkContextStatic().sc().longAccumulator("nnz");
lrdd = lrdd.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
-
+
//save file is an action which also triggers nnz maintenance
- lrdd.saveAsHadoopFile(path,
- oinfo.outputKeyClass,
- oinfo.outputValueClass,
+ lrdd.saveAsHadoopFile(path,
+ oinfo.outputKeyClass,
+ oinfo.outputValueClass,
oinfo.outputFormatClass);
-
+
//return nnz aggregate of all blocks
return aNnz.value();
}
@@ -1013,58 +1007,58 @@ public class SparkExecutionContext extends ExecutionContext
public static void writeFrameRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo )
{
JavaPairRDD<?, FrameBlock> lrdd = (JavaPairRDD<Long, FrameBlock>) rdd.getRDD();
-
+
//convert keys to writables if necessary
if( oinfo == OutputInfo.BinaryBlockOutputInfo ) {
lrdd = ((JavaPairRDD<Long, FrameBlock>)lrdd).mapToPair(
new LongFrameToLongWritableFrameFunction());
oinfo = OutputInfo.BinaryBlockFrameOutputInfo;
}
-
+
//save file is an action which also triggers nnz maintenance
- lrdd.saveAsHadoopFile(path,
- oinfo.outputKeyClass,
- oinfo.outputValueClass,
+ lrdd.saveAsHadoopFile(path,
+ oinfo.outputKeyClass,
+ oinfo.outputValueClass,
oinfo.outputFormatClass);
}
-
+
///////////////////////////////////////////
// Cleanup of RDDs and Broadcast variables
///////
-
+
/**
* Adds a child rdd object to the lineage of a parent rdd.
- *
+ *
* @param varParent parent variable
* @param varChild child variable
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
- public void addLineageRDD(String varParent, String varChild)
- throws DMLRuntimeException
+ public void addLineageRDD(String varParent, String varChild)
+ throws DMLRuntimeException
{
RDDObject parent = getCacheableData(varParent).getRDDHandle();
RDDObject child = getCacheableData(varChild).getRDDHandle();
-
+
parent.addLineageChild( child );
}
-
+
/**
* Adds a child broadcast object to the lineage of a parent rdd.
- *
+ *
* @param varParent parent variable
* @param varChild child variable
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
- public void addLineageBroadcast(String varParent, String varChild)
- throws DMLRuntimeException
+ public void addLineageBroadcast(String varParent, String varChild)
+ throws DMLRuntimeException
{
RDDObject parent = getCacheableData(varParent).getRDDHandle();
BroadcastObject<?> child = getCacheableData(varChild).getBroadcastHandle();
-
+
parent.addLineageChild( child );
}
- public void addLineage(String varParent, String varChild, boolean broadcast)
+ public void addLineage(String varParent, String varChild, boolean broadcast)
throws DMLRuntimeException
{
if( broadcast )
@@ -1072,25 +1066,25 @@ public class SparkExecutionContext extends ExecutionContext
else
addLineageRDD(varParent, varChild);
}
-
+
@Override
- public void cleanupMatrixObject( MatrixObject mo )
+ public void cleanupMatrixObject( MatrixObject mo )
throws DMLRuntimeException
{
//NOTE: this method overwrites the default behavior of cleanupMatrixObject
//and hence is transparently used by rmvar instructions and other users. The
//core difference is the lineage-based cleanup of RDD and broadcast variables.
-
+
try
{
- if ( mo.isCleanupEnabled() )
+ if ( mo.isCleanupEnabled() )
{
//compute ref count only if matrix cleanup actually necessary
- if ( !getVariables().hasReferences(mo) )
+ if ( !getVariables().hasReferences(mo) )
{
- //clean cached data
- mo.clearData();
-
+ //clean cached data
+ mo.clearData();
+
//clean hdfs data if no pending rdd operations on it
if( mo.isHDFSFileExists() && mo.getFileName()!=null ) {
if( mo.getRDDHandle()==null ) {
@@ -1101,12 +1095,12 @@ public class SparkExecutionContext extends ExecutionContext
rdd.setHDFSFilename(mo.getFileName());
}
}
-
+
//cleanup RDD and broadcast variables (recursive)
//note: requires that mo.clearData already removed back references
- if( mo.getRDDHandle()!=null ) {
+ if( mo.getRDDHandle()!=null ) {
rCleanupLineageObject(mo.getRDDHandle());
- }
+ }
if( mo.getBroadcastHandle()!=null ) {
rCleanupLineageObject(mo.getBroadcastHandle());
}
@@ -1120,18 +1114,18 @@ public class SparkExecutionContext extends ExecutionContext
}
@SuppressWarnings({ "rawtypes", "unchecked" })
- private void rCleanupLineageObject(LineageObject lob)
+ private void rCleanupLineageObject(LineageObject lob)
throws IOException
- {
+ {
//abort recursive cleanup if still consumers
if( lob.getNumReferences() > 0 )
return;
-
- //abort if still reachable through matrix object (via back references for
+
+ //abort if still reachable through matrix object (via back references for
//robustness in function calls and to prevent repeated scans of the symbol table)
if( lob.hasBackReference() )
return;
-
+
//cleanup current lineage object (from driver/executors)
//incl deferred hdfs file removal (only if metadata set by cleanup call)
if( lob instanceof RDDObject ) {
@@ -1151,38 +1145,38 @@ public class SparkExecutionContext extends ExecutionContext
cleanupBroadcastVariable(bc);
CacheableData.addBroadcastSize(-((BroadcastObject)lob).getSize());
}
-
+
//recursively process lineage children
for( LineageObject c : lob.getLineageChilds() ){
c.decrementNumReferences();
rCleanupLineageObject(c);
}
}
-
+
/**
* This call destroys a broadcast variable at all executors and the driver.
* Hence, it is intended to be used on rmvar only. Depending on the
* ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
- *
+ *
* @param bvar broadcast variable
*/
- public static void cleanupBroadcastVariable(Broadcast<?> bvar)
+ public static void cleanupBroadcastVariable(Broadcast<?> bvar)
{
- //In comparison to 'unpersist' (which would only delete the broadcast
+ //In comparison to 'unpersist' (which would only delete the broadcast
//from the executors), this call also deletes related data from the driver.
if( bvar.isValid() ) {
bvar.destroy( !ASYNCHRONOUS_VAR_DESTROY );
}
}
-
+
/**
* This call removes an rdd variable from executor memory and disk if required.
* Hence, it is intended to be used on rmvar only. Depending on the
* ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
- *
+ *
* @param rvar rdd variable to remove
*/
- public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar)
+ public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar)
{
if( rvar.getStorageLevel()!=StorageLevel.NONE() ) {
rvar.unpersist( !ASYNCHRONOUS_VAR_DESTROY );
@@ -1190,72 +1184,72 @@ public class SparkExecutionContext extends ExecutionContext
}
@SuppressWarnings("unchecked")
- public void repartitionAndCacheMatrixObject( String var )
+ public void repartitionAndCacheMatrixObject( String var )
throws DMLRuntimeException
{
MatrixObject mo = getMatrixObject(var);
MatrixCharacteristics mcIn = mo.getMatrixCharacteristics();
-
+
//double check size to avoid unnecessary spark context creation
if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), (double)
OptimizerUtils.estimateSizeExactSparsity(mcIn)) )
- return;
-
+ return;
+
//get input rdd and default storage level
- JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
+ JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
-
+
//avoid unnecessary caching of input in order to reduce memory pressure
if( mo.getRDDHandle().allowsShortCircuitRead()
&& isRDDMarkedForCaching(in.id()) && !isRDDCached(in.id()) ) {
in = (JavaPairRDD<MatrixIndexes,MatrixBlock>)
((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
-
+
//investigate issue of unnecessarily large number of partitions
int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in);
if( numPartitions < in.getNumPartitions() )
in = in.coalesce( numPartitions );
}
-
- //repartition rdd (force creation of shuffled rdd via merge), note: without deep copy albeit
+
+ //repartition rdd (force creation of shuffled rdd via merge), note: without deep copy albeit
//executed on the original data, because there will be no merge, i.e., no key duplicates
JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in, false);
-
+
//convert mcsr into memory-efficient csr if potentially sparse
- if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {
+ if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {
out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR));
}
-
- //persist rdd in default storage level
+
+ //persist rdd in default storage level
out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL )
.count(); //trigger caching to prevent contention
-
+
//create new rdd handle, in-place of current matrix object
RDDObject inro = mo.getRDDHandle(); //guaranteed to exist (see above)
RDDObject outro = new RDDObject(out, var); //create new rdd object
outro.setCheckpointRDD(true); //mark as checkpointed
outro.addLineageChild(inro); //keep lineage to prevent cycles on cleanup
- mo.setRDDHandle(outro);
+ mo.setRDDHandle(outro);
}
@SuppressWarnings("unchecked")
- public void cacheMatrixObject( String var )
+ public void cacheMatrixObject( String var )
throws DMLRuntimeException
{
//get input rdd and default storage level
MatrixObject mo = getMatrixObject(var);
-
+
//double check size to avoid unnecessary spark context creation
if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), (double)
OptimizerUtils.estimateSizeExactSparsity(mo.getMatrixCharacteristics())) )
- return;
-
- JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
+ return;
+
+ JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
-
+
//persist rdd (force rdd caching, if not already cached)
if( !isRDDCached(in.id()) )
- in.count(); //trigger caching to prevent contention
+ in.count(); //trigger caching to prevent contention
}
public void setThreadLocalSchedulerPool(String poolName) {
@@ -1283,7 +1277,7 @@ public class SparkExecutionContext extends ExecutionContext
if( !jsc.sc().getPersistentRDDs().contains(rddID) ) {
return false;
}
-
+
//check that rdd is actually already cached
for( RDDInfo info : jsc.sc().getRDDStorageInfo() ) {
if( info.id() == rddID )
@@ -1293,35 +1287,35 @@ public class SparkExecutionContext extends ExecutionContext
}
///////////////////////////////////////////
- // Spark configuration handling
+ // Spark configuration handling
///////
/**
- * Obtains the lazily analyzed spark cluster configuration.
- *
+ * Obtains the lazily analyzed spark cluster configuration.
+ *
* @return spark cluster configuration
*/
public static SparkClusterConfig getSparkClusterConfig() {
- //lazy creation of spark cluster config
+ //lazy creation of spark cluster config
if( _sconf == null )
_sconf = new SparkClusterConfig();
return _sconf;
}
-
+
/**
* Obtains the available memory budget for broadcast variables in bytes.
- *
+ *
* @return broadcast memory budget
*/
public static double getBroadcastMemoryBudget() {
return getSparkClusterConfig()
.getBroadcastMemoryBudget();
}
-
+
/**
* Obtain the available memory budget for data storage in bytes.
- *
- * @param min flag for minimum data budget
+ *
+ * @param min flag for minimum data budget
* @param refresh flag for refresh with spark context
* @return data memory budget
*/
@@ -1329,21 +1323,21 @@ public class SparkExecutionContext extends ExecutionContext
return getSparkClusterConfig()
.getDataMemoryBudget(min, refresh);
}
-
+
/**
* Obtain the number of executors in the cluster (excluding the driver).
- *
+ *
* @return number of executors
*/
public static int getNumExecutors() {
return getSparkClusterConfig()
.getNumExecutors();
}
-
+
/**
- * Obtain the default degree of parallelism (cores in the cluster).
- *
- * @param refresh flag for refresh with spark context
+ * Obtain the default degree of parallelism (cores in the cluster).
+ *
+ * @param refresh flag for refresh with spark context
* @return default degree of parallelism
*/
public static int getDefaultParallelism(boolean refresh) {
@@ -1360,13 +1354,13 @@ public class SparkExecutionContext extends ExecutionContext
int numExecutors = getNumExecutors();
int numCores = getDefaultParallelism(false);
boolean multiThreaded = (numCores > numExecutors);
-
+
//check for jdk version less than 8 (and raise warning if multi-threaded)
- if( isLtJDK8 && multiThreaded)
+ if( isLtJDK8 && multiThreaded)
{
- //get the jre version
+ //get the jre version
String version = System.getProperty("java.version");
-
+
LOG.warn("########################################################################################");
LOG.warn("### WARNING: Multi-threaded text reblock may lead to thread contention on JRE < 1.8 ####");
LOG.warn("### java.version = " + version);
@@ -1377,51 +1371,51 @@ public class SparkExecutionContext extends ExecutionContext
LOG.warn("########################################################################################");
}
}
-
+
/**
- * Captures relevant spark cluster configuration properties, e.g., memory budgets and
+ * Captures relevant spark cluster configuration properties, e.g., memory budgets and
* degree of parallelism. This configuration abstracts legacy (< Spark 1.6) and current
- * configurations and provides a unified view.
+ * configurations and provides a unified view.
*/
- private static class SparkClusterConfig
+ private static class SparkClusterConfig
{
//broadcasts are stored in mem-and-disk in data space, this config
//defines the fraction of data space to be used as broadcast budget
private static final double BROADCAST_DATA_FRACTION = 0.3;
-
+
//forward private config from Spark's UnifiedMemoryManager.scala (>1.6)
private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024;
-
+
//meta configurations
private boolean _legacyVersion = false; //spark version <1.6
private boolean _confOnly = false; //infrastructure info based on config
-
+
//memory management configurations
private long _memExecutor = -1; //mem per executor
private double _memDataMinFrac = -1; //minimum data fraction
private double _memDataMaxFrac = -1; //maximum data fraction
private double _memBroadcastFrac = -1; //broadcast fraction
-
+
//degree of parallelism configurations
private int _numExecutors = -1; //total executors
- private int _defaultPar = -1; //total vcores
-
- public SparkClusterConfig()
+ private int _defaultPar = -1; //total vcores
+
+ public SparkClusterConfig()
{
SparkConf sconf = createSystemMLSparkConf();
_confOnly = true;
-
+
//parse version and config
String sparkVersion = getSparkVersionString();
_legacyVersion = (UtilFunctions.compareVersion(sparkVersion, "1.6.0") < 0
|| sconf.getBoolean("spark.memory.useLegacyMode", false) );
-
+
//obtain basic spark configurations
if( _legacyVersion )
analyzeSparkConfiguationLegacy(sconf);
else
analyzeSparkConfiguation(sconf);
-
+
//log debug of created spark cluster config
if( LOG.isDebugEnabled() )
LOG.debug( this.toString() );
@@ -1432,30 +1426,30 @@ public class SparkExecutionContext extends ExecutionContext
}
public long getDataMemoryBudget(boolean min, boolean refresh) {
- //always get the current num executors on refresh because this might
+ //always get the current num executors on refresh because this might
//change if not all executors are initially allocated and it is plan-relevant
int numExec = _numExecutors;
if( refresh && !_confOnly ) {
JavaSparkContext jsc = getSparkContextStatic();
numExec = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
}
-
+
//compute data memory budget
return (long) ( numExec * _memExecutor *
- (min ? _memDataMinFrac : _memDataMaxFrac) );
+ (min ? _memDataMinFrac : _memDataMaxFrac) );
}
public int getNumExecutors() {
if( _numExecutors < 0 )
- analyzeSparkParallelismConfiguation(null);
+ analyzeSparkParallelismConfiguation(null);
return _numExecutors;
}
public int getDefaultParallelism(boolean refresh) {
if( _defaultPar < 0 && !refresh )
analyzeSparkParallelismConfiguation(null);
-
- //always get the current default parallelism on refresh because this might
+
+ //always get the current default parallelism on refresh because this might
//change if not all executors are initially allocated and it is plan-relevant
return ( refresh && !_confOnly ) ?
getSparkContextStatic().defaultParallelism() : _defaultPar;
@@ -1464,36 +1458,36 @@ public class SparkExecutionContext extends ExecutionContext
public void analyzeSparkConfiguationLegacy(SparkConf conf) {
//ensure allocated spark conf
SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf;
-
+
//parse absolute executor memory
_memExecutor = UtilFunctions.parseMemorySize(
sconf.get("spark.executor.memory", "1g"));
-
+
//get data and shuffle memory ratios (defaults not specified in job conf)
double dataFrac = sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60%
_memDataMinFrac = dataFrac;
_memDataMaxFrac = dataFrac;
_memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION; //default 18%
-
- //analyze spark degree of parallelism
- analyzeSparkParallelismConfiguation(sconf);
+
+ //analyze spark degree of parallelism
+ analyzeSparkParallelismConfiguation(sconf);
}
public void analyzeSparkConfiguation(SparkConf conf) {
//ensure allocated spark conf
SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf;
-
+
//parse absolute executor memory, incl fixed cut off
_memExecutor = UtilFunctions.parseMemorySize(
- sconf.get("spark.executor.memory", "1g"))
+ sconf.get("spark.executor.memory", "1g"))
- RESERVED_SYSTEM_MEMORY_BYTES;
-
+
//get data and shuffle memory ratios (defaults not specified in job conf)
_memDataMinFrac = sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50%
_memDataMaxFrac = sconf.getDouble("spark.memory.fraction", 0.75); //default 75%
_memBroadcastFrac = _memDataMaxFrac * BROADCAST_DATA_FRACTION; //default 22.5%
-
- //analyze spark degree of parallelism
+
+ //analyze spark degree of parallelism
analyzeSparkParallelismConfiguation(sconf);
}
@@ -1501,7 +1495,7 @@ public class SparkExecutionContext extends ExecutionContext
int numExecutors = sconf.getInt("spark.executor.instances", -1);
int numCoresPerExec = sconf.getInt("spark.executor.cores", -1);
int defaultPar = sconf.getInt("spark.default.parallelism", -1);
-
+
if( numExecutors > 1 && (defaultPar > 1 || numCoresPerExec > 1) ) {
_numExecutors = numExecutors;
_defaultPar = (defaultPar>1) ? defaultPar : numExecutors * numCoresPerExec;
@@ -1512,28 +1506,28 @@ public class SparkExecutionContext extends ExecutionContext
//note: spark context provides this information while conf does not
//(for num executors we need to correct for driver and local mode)
JavaSparkContext jsc = getSparkContextStatic();
- _numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
+ _numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
_defaultPar = jsc.defaultParallelism();
- _confOnly &= false; //implies env info refresh w/ spark context
+ _confOnly &= false; //implies env info refresh w/ spark context
}
}
-
+
/**
* Obtains the spark version string. If the spark context has been created,
- * we simply get it from the context; otherwise, we use Spark internal
- * constants to avoid creating the spark context just for the version.
- *
+ * we simply get it from the context; otherwise, we use Spark internal
+ * constants to avoid creating the spark context just for the version.
+ *
* @return spark version string
*/
private String getSparkVersionString() {
//check for existing spark context
- if( isSparkContextCreated() )
+ if( isSparkContextCreated() )
return getSparkContextStatic().version();
-
+
//use spark internal constant to avoid context creation
return org.apache.spark.package$.MODULE$.SPARK_VERSION();
}
-
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder("SparkClusterConfig: \n");
@@ -1544,17 +1538,17 @@ public class SparkExecutionContext extends ExecutionContext
sb.append("-- memDataMaxFrac = " + _memDataMaxFrac + "\n");
sb.append("-- memBroadcastFrac = " + _memBroadcastFrac + "\n");
sb.append("-- numExecutors = " + _numExecutors + "\n");
- sb.append("-- defaultPar = " + _defaultPar + "\n");
+ sb.append("-- defaultPar = " + _defaultPar + "\n");
return sb.toString();
}
}
-
- private static class MemoryManagerParRDDs
+
+ private static class MemoryManagerParRDDs
{
private final long _limit;
private long _size;
private HashMap<Integer, Long> _rdds;
-
+
public MemoryManagerParRDDs(double fractionMem) {
_limit = (long)(fractionMem * InfrastructureAnalyzer.getLocalMaxMemory());
_size = 0;
@@ -1566,7 +1560,7 @@ public class SparkExecutionContext extends ExecutionContext
_size += ret ? rddSize : 0;
return ret;
}
-
+
public synchronized void registerRDD(int rddID, long rddSize, boolean reserved) {
if( !reserved ) {
throw new RuntimeException("Unsupported rdd registration "
@@ -1574,7 +1568,7 @@ public class SparkExecutionContext extends ExecutionContext
}
_rdds.put(rddID, rddSize);
}
-
+
public synchronized void deregisterRDD(int rddID) {
long rddSize = _rdds.remove(rddID);
_size -= rddSize;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java
deleted file mode 100644
index a1173bd..0000000
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/GetMLBlock.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.functions;
-
-import java.io.Serializable;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.sql.Row;
-
-import scala.Tuple2;
-
-import org.apache.sysml.api.MLBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-
-@SuppressWarnings("deprecation")
-public class GetMLBlock implements Function<Tuple2<MatrixIndexes,MatrixBlock>, Row>, Serializable {
-
- private static final long serialVersionUID = 8829736765002126985L;
-
- @Override
- public Row call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception {
- return new MLBlock(kv._1, kv._2);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7ba17c7f/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index f206fbd..377ca2e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -63,14 +63,14 @@ import scala.Tuple2;
* can be moved to RDDConverterUtils.
*/
@SuppressWarnings("unused")
-public class RDDConverterUtilsExt
+public class RDDConverterUtilsExt
{
public enum RDDConverterTypes {
TEXT_TO_MATRIX_CELL,
MATRIXENTRY_TO_MATRIXCELL
}
-
-
+
+
/**
* Example usage:
* <pre><code>
@@ -88,7 +88,7 @@ public class RDDConverterUtilsExt
* val mc = new MatrixCharacteristics(numRows, numCols, 1000, 1000, nnz)
* val binBlocks = RDDConverterUtilsExt.coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), coordinateMatrix, mc, true)
* </code></pre>
- *
+ *
* @param sc java spark context
* @param input coordinate matrix
* @param mcIn matrix characteristics
@@ -97,26 +97,26 @@ public class RDDConverterUtilsExt
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
public static JavaPairRDD<MatrixIndexes, MatrixBlock> coordinateMatrixToBinaryBlock(JavaSparkContext sc,
- CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException
+ CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException
{
//convert matrix entry rdd to binary block rdd (w/ partial blocks)
JavaPairRDD<MatrixIndexes, MatrixBlock> out = input.entries().toJavaRDD()
.mapPartitionsToPair(new MatrixEntryToBinaryBlockFunction(mcIn));
-
- //inject empty blocks (if necessary)
+
+ //inject empty blocks (if necessary)
if( outputEmptyBlocks && mcIn.mightHaveEmptyBlocks() ) {
- out = out.union(
+ out = out.union(
SparkUtils.getEmptyBlockRDD(sc, mcIn) );
}
-
+
//aggregate partial matrix blocks
- out = RDDAggregateUtils.mergeByKey(out, false);
-
+ out = RDDAggregateUtils.mergeByKey(out, false);
+
return out;
}
-
+
public static JavaPairRDD<MatrixIndexes, MatrixBlock> coordinateMatrixToBinaryBlock(SparkContext sc,
- CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException
+ CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks) throws DMLRuntimeException
{
return coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), input, mcIn, true);
}
@@ -128,19 +128,19 @@ public class RDDConverterUtilsExt
}
return df.select(columns.get(0), scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
}
-
+
public static MatrixBlock convertPy4JArrayToMB(byte [] data, long rlen, long clen) throws DMLRuntimeException {
return convertPy4JArrayToMB(data, (int)rlen, (int)clen, false);
}
-
+
public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, int clen) throws DMLRuntimeException {
return convertPy4JArrayToMB(data, rlen, clen, false);
}
-
+
public static MatrixBlock convertSciPyCOOToMB(byte [] data, byte [] row, byte [] col, long rlen, long clen, long nnz) throws DMLRuntimeException {
return convertSciPyCOOToMB(data, row, col, (int)rlen, (int)clen, (int)nnz);
}
-
+
public static MatrixBlock convertSciPyCOOToMB(byte [] data, byte [] row, byte [] col, int rlen, int clen, int nnz) throws DMLRuntimeException {
MatrixBlock mb = new MatrixBlock(rlen, clen, true);
mb.allocateSparseRowsBlock(false);
@@ -154,17 +154,17 @@ public class RDDConverterUtilsExt
double val = buf1.getDouble();
int rowIndex = buf2.getInt();
int colIndex = buf3.getInt();
- mb.setValue(rowIndex, colIndex, val);
+ mb.setValue(rowIndex, colIndex, val);
}
mb.recomputeNonZeros();
mb.examSparsity();
return mb;
}
-
+
public static MatrixBlock convertPy4JArrayToMB(byte [] data, long rlen, long clen, boolean isSparse) throws DMLRuntimeException {
return convertPy4JArrayToMB(data, (int) rlen, (int) clen, isSparse);
}
-
+
public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, boolean isSparse) {
MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse);
ret.allocateDenseOrSparseBlock();
@@ -176,7 +176,7 @@ public class RDDConverterUtilsExt
}
return allocateDenseOrSparse(rlen, clen, isSparse);
}
-
+
public static void copyRowBlocks(MatrixBlock mb, int rowIndex, MatrixBlock ret, int numRowsPerBlock, int rlen, int clen) throws DMLRuntimeException {
copyRowBlocks(mb, (long)rowIndex, ret, (long)numRowsPerBlock, (long)rlen, (long)clen);
}
@@ -192,12 +192,12 @@ public class RDDConverterUtilsExt
ret.copy((int)(rowIndex*numRowsPerBlock), (int)Math.min((rowIndex+1)*numRowsPerBlock-1, rlen-1), 0, (int)(clen-1), mb, false);
// }
}
-
+
public static void postProcessAfterCopying(MatrixBlock ret) throws DMLRuntimeException {
ret.recomputeNonZeros();
ret.examSparsity();
}
-
+
public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, int clen, boolean isSparse) throws DMLRuntimeException {
MatrixBlock mb = new MatrixBlock(rlen, clen, isSparse, -1);
if(isSparse) {
@@ -219,19 +219,19 @@ public class RDDConverterUtilsExt
mb.examSparsity();
return mb;
}
-
+
public static byte [] convertMBtoPy4JDenseArr(MatrixBlock mb) throws DMLRuntimeException {
byte [] ret = null;
if(mb.isInSparseFormat()) {
mb.sparseToDense();
}
-
+
long limit = mb.getNumRows()*mb.getNumColumns();
int times = Double.SIZE / Byte.SIZE;
if( limit > Integer.MAX_VALUE / times )
throw new DMLRuntimeException("MatrixBlock of size " + limit + " cannot be converted to dense numpy array");
ret = new byte[(int) (limit * times)];
-
+
double [] denseBlock = mb.getDenseBlock();
if(mb.isEmptyBlock()) {
for(int i=0;i < limit;i++){
@@ -246,10 +246,10 @@ public class RDDConverterUtilsExt
ByteBuffer.wrap(ret, i*times, times).order(ByteOrder.nativeOrder()).putDouble(denseBlock[i]);
}
}
-
+
return ret;
}
-
+
public static class AddRowID implements Function<Tuple2<Row,Long>, Row> {
private static final long serialVersionUID = -3733816995375745659L;
@@ -263,12 +263,12 @@ public class RDDConverterUtilsExt
fields[oldNumCols] = new Double(arg0._2 + 1);
return RowFactory.create(fields);
}
-
+
}
/**
* Add element indices as new column to DataFrame
- *
+ *
* @param df input data frame
* @param sparkSession the Spark Session
* @param nameOfCol name of index column
@@ -286,27 +286,10 @@ public class RDDConverterUtilsExt
return sparkSession.createDataFrame(newRows, new StructType(newSchema));
}
- /**
- * Add element indices as new column to DataFrame
- *
- * @param df input data frame
- * @param sqlContext the SQL Context
- * @param nameOfCol name of index column
- * @return new data frame
- *
- * @deprecated This will be removed in SystemML 1.0.
- */
- @Deprecated
- public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SQLContext sqlContext, String nameOfCol) {
- SparkSession sparkSession = sqlContext.sparkSession();
- return addIDToDataFrame(df, sparkSession, nameOfCol);
- }
-
-
- private static class MatrixEntryToBinaryBlockFunction implements PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock>
+ private static class MatrixEntryToBinaryBlockFunction implements PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock>
{
private static final long serialVersionUID = 4907483236186747224L;
-
+
private IJVToBinaryBlockFunctionHelper helper = null;
public MatrixEntryToBinaryBlockFunction(MatrixCharacteristics mc) throws DMLRuntimeException {
helper = new IJVToBinaryBlockFunctionHelper(mc);
@@ -318,18 +301,18 @@ public class RDDConverterUtilsExt
}
}
-
+
private static class IJVToBinaryBlockFunctionHelper implements Serializable {
private static final long serialVersionUID = -7952801318564745821L;
//internal buffer size (aligned w/ default matrix block size)
private static final int BUFFER_SIZE = 4 * 1000 * 1000; //4M elements (32MB)
private int _bufflen = -1;
-
+
private long _rlen = -1;
private long _clen = -1;
private int _brlen = -1;
private int _bclen = -1;
-
+
public IJVToBinaryBlockFunctionHelper(MatrixCharacteristics mc) throws DMLRuntimeException
{
if(!mc.dimsKnown()) {
@@ -339,21 +322,21 @@ public class RDDConverterUtilsExt
_clen = mc.getCols();
_brlen = mc.getRowsPerBlock();
_bclen = mc.getColsPerBlock();
-
+
//determine upper bounded buffer len
_bufflen = (int) Math.min(_rlen*_clen, BUFFER_SIZE);
-
+
}
-
+
// ----------------------------------------------------
// Can extend this by having type hierarchy
public Tuple2<MatrixIndexes, MatrixCell> textToMatrixCell(Text txt) {
FastStringTokenizer st = new FastStringTokenizer(' ');
//get input string (ignore matrix market comments)
String strVal = txt.toString();
- if( strVal.startsWith("%") )
+ if( strVal.startsWith("%") )
return null;
-
+
//parse input ijv triple
st.reset( strVal );
long row = st.nextLong();
@@ -363,19 +346,19 @@ public class RDDConverterUtilsExt
MatrixCell cell = new MatrixCell(val);
return new Tuple2<MatrixIndexes, MatrixCell>(indx, cell);
}
-
+
public Tuple2<MatrixIndexes, MatrixCell> matrixEntryToMatrixCell(MatrixEntry entry) {
MatrixIndexes indx = new MatrixIndexes(entry.i(), entry.j());
MatrixCell cell = new MatrixCell(entry.value());
return new Tuple2<MatrixIndexes, MatrixCell>(indx, cell);
}
-
+
// ----------------------------------------------------
-
+
Iterable<Tuple2<MatrixIndexes, MatrixBlock>> convertToBinaryBlock(Object arg0, RDDConverterTypes converter) throws Exception {
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
ReblockBuffer rbuff = new ReblockBuffer(_bufflen, _rlen, _clen, _brlen, _bclen);
-
+
Iterator<?> iter = (Iterator<?>) arg0;
while( iter.hasNext() ) {
Tuple2<MatrixIndexes, MatrixCell> cell = null;
@@ -383,38 +366,38 @@ public class RDDConverterUtilsExt
case MATRIXENTRY_TO_MATRIXCELL:
cell = matrixEntryToMatrixCell((MatrixEntry) iter.next());
break;
-
+
case TEXT_TO_MATRIX_CELL:
cell = textToMatrixCell((Text) iter.next());
break;
-
+
default:
throw new Exception("Invalid converter for IJV data:" + converter.toString());
}
-
+
if(cell == null) {
continue;
}
-
+
//flush buffer if necessary
if( rbuff.getSize() >= rbuff.getCapacity() )
flushBufferToList(rbuff, ret);
-
+
//add value to reblock buffer
rbuff.appendCell(cell._1.getRowIndex(), cell._1.getColumnIndex(), cell._2.getValue());
}
-
+
//final flush buffer
flushBufferToList(rbuff, ret);
-
+
return ret;
}
- private void flushBufferToList( ReblockBuffer rbuff, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret )
+ private void flushBufferToList( ReblockBuffer rbuff, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret )
throws IOException, DMLRuntimeException
{
//temporary list of indexed matrix values to prevent library dependencies
- ArrayList<IndexedMatrixValue> rettmp = new ArrayList<IndexedMatrixValue>();
+ ArrayList<IndexedMatrixValue> rettmp = new ArrayList<IndexedMatrixValue>();
rbuff.flushBufferToBinaryBlocks(rettmp);
ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp));
}
@@ -423,50 +406,17 @@ public class RDDConverterUtilsExt
/**
* Convert a dataframe of comma-separated string rows to a dataframe of
* ml.linalg.Vector rows.
- *
- * <p>
- * Example input rows:<br>
- *
- * <code>
- * ((1.2, 4.3, 3.4))<br>
- * (1.2, 3.4, 2.2)<br>
- * [[1.2, 34.3, 1.2, 1.25]]<br>
- * [1.2, 3.4]<br>
- * </code>
- *
- * @param sqlContext
- * Spark SQL Context
- * @param inputDF
- * dataframe of comma-separated row strings to convert to
- * dataframe of ml.linalg.Vector rows
- * @return dataframe of ml.linalg.Vector rows
- * @throws DMLRuntimeException
- * if DMLRuntimeException occurs
- *
- * @deprecated This will be removed in SystemML 1.0. Please migrate to {@code
- * RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(SparkSession, Dataset<Row>) }
- */
- @Deprecated
- public static Dataset<Row> stringDataFrameToVectorDataFrame(SQLContext sqlContext, Dataset<Row> inputDF)
- throws DMLRuntimeException {
- SparkSession sparkSession = sqlContext.sparkSession();
- return stringDataFrameToVectorDataFrame(sparkSession, inputDF);
- }
-
- /**
- * Convert a dataframe of comma-separated string rows to a dataframe of
- * ml.linalg.Vector rows.
- *
+ *
* <p>
* Example input rows:<br>
- *
+ *
* <code>
* ((1.2, 4.3, 3.4))<br>
* (1.2, 3.4, 2.2)<br>
* [[1.2, 34.3, 1.2, 1.25]]<br>
* [1.2, 3.4]<br>
* </code>
- *
+ *
* @param sparkSession
* Spark Session
* @param inputDF