You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:45:39 UTC
[08/78] [abbrv] [partial] incubator-systemml git commit: Move files
to new package folder structure
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java
deleted file mode 100644
index 839a830..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/MatrixObject.java
+++ /dev/null
@@ -1,1686 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.caching;
-
-import java.io.IOException;
-import java.lang.ref.SoftReference;
-
-import org.apache.commons.lang.mutable.MutableBoolean;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.api.DMLScript.RUNTIME_PLATFORM;
-import com.ibm.bi.dml.hops.OptimizerUtils;
-import com.ibm.bi.dml.lops.Lop;
-import com.ibm.bi.dml.parser.DMLTranslator;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import com.ibm.bi.dml.runtime.controlprogram.context.SparkExecutionContext;
-import com.ibm.bi.dml.runtime.instructions.spark.data.BroadcastObject;
-import com.ibm.bi.dml.runtime.instructions.spark.data.RDDObject;
-import com.ibm.bi.dml.runtime.instructions.spark.data.RDDProperties;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixDimensionsMetaData;
-import com.ibm.bi.dml.runtime.matrix.MatrixFormatMetaData;
-import com.ibm.bi.dml.runtime.matrix.MetaData;
-import com.ibm.bi.dml.runtime.matrix.data.FileFormatProperties;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.NumItemsByEachReducerMetaData;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.util.DataConverter;
-import com.ibm.bi.dml.runtime.util.IndexRange;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-
-
-/**
- * Represents a matrix in control program. This class contains method to read
- * matrices from HDFS and convert them to a specific format/representation. It
- * is also able to write several formats/representation of matrices to HDFS.
-
- * IMPORTANT: Preserve one-to-one correspondence between {@link MatrixObject}
- * and {@link MatrixBlock} objects, for cache purposes. Do not change a
- * {@link MatrixBlock} object without informing its {@link MatrixObject} object.
- *
- */
-public class MatrixObject extends CacheableData
-{
- private static final long serialVersionUID = 6374712373206495637L;
-
- /**
- * Current state of pinned variables, required for guarded collect.
- */
- private static ThreadLocal<Long> sizePinned = new ThreadLocal<Long>() {
- @Override protected Long initialValue() { return 0L; }
- };
-
- /**
- * Cache for actual data, evicted by garbage collector.
- */
- private SoftReference<MatrixBlock> _cache = null;
-
- /**
- * Container object that holds the actual data.
- */
- private MatrixBlock _data = null;
-
- /**
- * The name of HDFS file in which the data is backed up.
- */
- private String _hdfsFileName = null; // file name and path
-
- /**
- * Flag that indicates whether or not hdfs file exists.
- * It is used for improving the performance of "rmvar" instruction.
- * When it has value <code>false</code>, one can skip invocations to
- * utility functions such as MapReduceTool.deleteFileIfExistOnHDFS(),
- * which can be potentially expensive.
- */
- private boolean _hdfsFileExists = false;
-
- /**
- * <code>true</code> if the in-memory or evicted matrix may be different from
- * the matrix located at {@link #_hdfsFileName}; <code>false</code> if the two
- * matrices should be the same.
- */
- private boolean _dirtyFlag = false;
-
- /**
- * Object that holds the metadata associated with the matrix, which
- * includes: 1) Matrix dimensions, if available 2) Number of non-zeros, if
- * available 3) Block dimensions, if applicable 4) InputInfo -- subsequent
- * operations that use this Matrix expect it to be in this format.
- *
- * When the matrix is written to HDFS (local file system, as well?), one
- * must get the OutputInfo that matches with InputInfo stored inside _mtd.
- */
- private MetaData _metaData = null;
-
- //additional names and flags
- private String _varName = ""; //plan variable name
- private String _cacheFileName = null; //local eviction file name
- private boolean _requiresLocalWrite = false; //flag if local write for read obj
- private boolean _isAcquireFromEmpty = false; //flag if read from status empty
- private boolean _cleanupFlag = true; //flag if obj unpinned (cleanup enabled)
- private boolean _updateInPlaceFlag = false; //flag if in-place update
-
- //spark-specific handles
- //note: we use the abstraction of LineageObjects for two reasons: (1) to keep track of cleanup
- //for lazily evaluated RDDs, and (2) as abstraction for environments that do not necessarily have spark libraries available
- private RDDObject _rddHandle = null; //RDD handle
- private BroadcastObject _bcHandle = null; //Broadcast handle
- private RDDProperties _rddProperties = null;
-
- /**
- * Information relevant to partitioned matrices.
- */
- private boolean _partitioned = false; //indicates if obj partitioned
- private PDataPartitionFormat _partitionFormat = null; //indicates how obj partitioned
- private int _partitionSize = -1; //indicates n for BLOCKWISE_N
- private String _partitionCacheName = null; //name of cache block
- private MatrixBlock _partitionInMemory = null;
-
- /**
- * Information relevant to specific external file formats
- */
- FileFormatProperties _formatProperties = null;
-
- public RDDProperties getRddProperties() {
- return _rddProperties;
- }
-
- public void setRddProperties(RDDProperties _rddProperties) {
- this._rddProperties = _rddProperties;
- }
-
- /**
- * Constructor that takes only the HDFS filename.
- */
- public MatrixObject (ValueType vt, String file)
- {
- this (vt, file, null); //HDFS file path
- }
-
- /**
- * Constructor that takes both HDFS filename and associated metadata.
- */
- public MatrixObject( ValueType vt, String file, MetaData mtd )
- {
- super (DataType.MATRIX, vt);
- _metaData = mtd;
- _hdfsFileName = file;
-
- _cache = null;
- _data = null;
- }
-
- /**
- * Copy constructor that copies meta data but NO data.
- *
- * @param mo
- */
- public MatrixObject( MatrixObject mo )
- {
- super(mo.getDataType(), mo.getValueType());
-
- _hdfsFileName = mo._hdfsFileName;
- _hdfsFileExists = mo._hdfsFileExists;
-
- MatrixFormatMetaData metaOld = (MatrixFormatMetaData)mo.getMetaData();
- _metaData = new MatrixFormatMetaData(new MatrixCharacteristics(metaOld.getMatrixCharacteristics()),
- metaOld.getOutputInfo(), metaOld.getInputInfo());
-
- _varName = mo._varName;
- _cleanupFlag = mo._cleanupFlag;
- _updateInPlaceFlag = mo._updateInPlaceFlag;
- _partitioned = mo._partitioned;
- _partitionFormat = mo._partitionFormat;
- _partitionSize = mo._partitionSize;
- _partitionCacheName = mo._partitionCacheName;
- }
-
- public void setVarName(String s)
- {
- _varName = s;
- }
-
- public String getVarName()
- {
- return _varName;
- }
-
- @Override
- public void setMetaData(MetaData md)
- {
- _metaData = md;
- }
-
- @Override
- public MetaData getMetaData()
- {
- return _metaData;
- }
-
- @Override
- public void removeMetaData()
- {
- _metaData = null;
- }
-
- @Override
- public void updateMatrixCharacteristics (MatrixCharacteristics mc)
- {
- ((MatrixDimensionsMetaData)_metaData).setMatrixCharacteristics( mc );
- }
-
- /**
- * Make the matrix metadata consistent with the in-memory matrix data
- * @throws CacheException
- */
- public void refreshMetaData()
- throws CacheException
- {
- if ( _data == null || _metaData ==null ) //refresh only for existing data
- throw new CacheException("Cannot refresh meta data because there is no data or meta data. ");
- //we need to throw an exception, otherwise input/output format cannot be inferred
-
- MatrixCharacteristics mc = ((MatrixDimensionsMetaData) _metaData).getMatrixCharacteristics();
- mc.setDimension( _data.getNumRows(),
- _data.getNumColumns() );
- mc.setNonZeros( _data.getNonZeros() );
- }
-
- public void setFileFormatProperties(FileFormatProperties formatProperties) {
- _formatProperties = formatProperties;
- }
-
- public FileFormatProperties getFileFormatProperties() {
- return _formatProperties;
- }
-
- public boolean isFileExists()
- {
- return _hdfsFileExists;
- }
-
- public void setFileExists( boolean flag )
- {
- _hdfsFileExists = flag;
- }
-
- public String getFileName()
- {
- return _hdfsFileName;
- }
-
- public synchronized void setFileName( String file )
- {
- if (!_hdfsFileName.equals (file))
- {
- _hdfsFileName = file;
- if( ! isEmpty(true) )
- _dirtyFlag = true;
- }
- }
-
- /**
- *
- * @return
- */
- public long getNumRows ()
- {
- MatrixDimensionsMetaData meta = (MatrixDimensionsMetaData) _metaData;
- MatrixCharacteristics mc = meta.getMatrixCharacteristics();
- return mc.getRows ();
- }
-
- /**
- *
- * @return
- */
- public long getNumColumns()
- {
- MatrixDimensionsMetaData meta = (MatrixDimensionsMetaData) _metaData;
- MatrixCharacteristics mc = meta.getMatrixCharacteristics();
- return mc.getCols ();
- }
-
- /**
- *
- * @return
- */
- public long getNumRowsPerBlock()
- {
- MatrixDimensionsMetaData meta = (MatrixDimensionsMetaData) _metaData;
- MatrixCharacteristics mc = meta.getMatrixCharacteristics();
- return mc.getRowsPerBlock();
- }
-
- /**
- *
- * @return
- */
- public long getNumColumnsPerBlock()
- {
- MatrixDimensionsMetaData meta = (MatrixDimensionsMetaData) _metaData;
- MatrixCharacteristics mc = meta.getMatrixCharacteristics();
- return mc.getColsPerBlock();
- }
-
- /**
- *
- * @return
- */
- public long getNnz()
- {
- MatrixDimensionsMetaData meta = (MatrixDimensionsMetaData) _metaData;
- MatrixCharacteristics mc = meta.getMatrixCharacteristics();
- return mc.getNonZeros();
- }
-
- /**
- *
- * @return
- */
- public double getSparsity()
- {
- MatrixDimensionsMetaData meta = (MatrixDimensionsMetaData) _metaData;
- MatrixCharacteristics mc = meta.getMatrixCharacteristics();
-
- return ((double)mc.getNonZeros())/mc.getRows()/mc.getCols();
- }
-
- /**
- *
- * @return
- */
- public MatrixCharacteristics getMatrixCharacteristics()
- {
- MatrixDimensionsMetaData meta = (MatrixDimensionsMetaData) _metaData;
- return meta.getMatrixCharacteristics();
- }
-
- /**
- * <code>true</code> if the in-memory or evicted matrix may be different from
- * the matrix located at {@link #_hdfsFileName}; <code>false</code> if the two
- * matrices are supposed to be the same.
- */
- public boolean isDirty ()
- {
- return _dirtyFlag;
- }
-
- public String toString()
- {
- StringBuilder str = new StringBuilder();
- str.append("Matrix: ");
- str.append(_hdfsFileName + ", ");
- //System.out.println(_hdfsFileName);
- if ( _metaData instanceof NumItemsByEachReducerMetaData ) {
- str.append("NumItemsByEachReducerMetaData");
- }
- else
- {
- try
- {
- MatrixFormatMetaData md = (MatrixFormatMetaData)_metaData;
- if ( md != null ) {
- MatrixCharacteristics mc = ((MatrixDimensionsMetaData)_metaData).getMatrixCharacteristics();
- str.append(mc.toString());
-
- InputInfo ii = md.getInputInfo();
- if ( ii == null )
- str.append("null");
- else {
- str.append(", ");
- str.append(InputInfo.inputInfoToString(ii));
- }
- }
- else {
- str.append("null, null");
- }
- }
- catch(Exception ex)
- {
- LOG.error(ex);
- }
- }
- str.append(", ");
- str.append(isDirty() ? "dirty" : "not-dirty");
-
- return str.toString();
- }
-
- public RDDObject getRDDHandle()
- {
- return _rddHandle;
- }
-
- public void setRDDHandle( RDDObject rdd )
- {
- //cleanup potential old back reference
- if( _rddHandle != null )
- _rddHandle.setBackReference(null);
-
- //add new rdd handle
- _rddHandle = rdd;
- if( _rddHandle != null )
- rdd.setBackReference(this);
- }
-
- public BroadcastObject getBroadcastHandle()
- {
- return _bcHandle;
- }
-
- public void setBroadcastHandle( BroadcastObject bc )
- {
- //cleanup potential old back reference
- if( _bcHandle != null )
- _bcHandle.setBackReference(null);
-
- //add new broadcast handle
- _bcHandle = bc;
- if( _bcHandle != null )
- bc.setBackReference(this);
- }
-
-
- // *********************************************
- // *** ***
- // *** HIGH-LEVEL METHODS THAT SPECIFY ***
- // *** THE LOCKING AND CACHING INTERFACE ***
- // *** ***
- // *********************************************
-
-
- /**
- * Acquires a shared "read-only" lock, produces the reference to the matrix data,
- * restores the matrix to main memory, reads from HDFS if needed.
- *
- * Synchronized because there might be parallel threads (parfor local) that
- * access the same MatrixObjectNew object (in case it was created before the loop).
- *
- * In-Status: EMPTY, EVICTABLE, EVICTED, READ;
- * Out-Status: READ(+1).
- *
- * @return the matrix data reference
- * @throws CacheException
- */
- public synchronized MatrixBlock acquireRead()
- throws CacheException
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("Acquire read "+_varName);
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
- if ( !isAvailableToRead() )
- throw new CacheStatusException ("MatrixObject not available to read.");
-
- //get object from cache
- if( _data == null )
- getCache();
-
- //read data from HDFS/RDD if required
- //(probe data for cache_nowrite / jvm_reuse)
- if( isEmpty(true) && _data==null )
- {
- try
- {
- if( DMLScript.STATISTICS )
- CacheStatistics.incrementHDFSHits();
-
- if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() )
- {
- //check filename
- if( _hdfsFileName == null )
- throw new CacheException("Cannot read matrix for empty filename.");
-
- //read matrix from hdfs
- _data = readMatrixFromHDFS( _hdfsFileName );
-
- //mark for initial local write despite read operation
- _requiresLocalWrite = CACHING_WRITE_CACHE_ON_READ;
- }
- else
- {
- //read matrix from rdd (incl execute pending rdd operations)
- MutableBoolean writeStatus = new MutableBoolean();
- _data = readMatrixFromRDD( getRDDHandle(), writeStatus );
-
- //mark for initial local write (prevent repeated execution of rdd operations)
- if( writeStatus.booleanValue() )
- _requiresLocalWrite = CACHING_WRITE_CACHE_ON_READ;
- else
- _requiresLocalWrite = true;
- }
-
- _dirtyFlag = false;
- }
- catch (IOException e)
- {
- throw new CacheIOException("Reading of " + _hdfsFileName + " ("+_varName+") failed.", e);
- }
-
- _isAcquireFromEmpty = true;
- }
- else if( DMLScript.STATISTICS )
- {
- if( _data!=null )
- CacheStatistics.incrementMemHits();
- }
-
- //cache status maintenance
- super.acquire( false, _data==null );
- updateStatusPinned(true);
-
- if( DMLScript.STATISTICS ){
- long t1 = System.nanoTime();
- CacheStatistics.incrementAcquireRTime(t1-t0);
- }
-
- return _data;
- }
-
- /**
- * Acquires the exclusive "write" lock for a thread that wants to change matrix
- * cell values. Produces the reference to the matrix data, restores the matrix
- * to main memory, reads from HDFS if needed.
- *
- * In-Status: EMPTY, EVICTABLE, EVICTED;
- * Out-Status: MODIFY.
- *
- * @return the matrix data reference
- * @throws CacheException
- */
- public synchronized MatrixBlock acquireModify()
- throws CacheException
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("Acquire modify "+_varName);
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
- if ( !isAvailableToModify() )
- throw new CacheStatusException("MatrixObject not available to modify.");
-
- //get object from cache
- if( _data == null )
- getCache();
-
- //read data from HDFS if required
- if( isEmpty(true) && _data == null )
- {
- //check filename
- if( _hdfsFileName == null )
- throw new CacheException("Cannot read matrix for empty filename.");
-
- //load data
- try
- {
- _data = readMatrixFromHDFS( _hdfsFileName );
- }
- catch (IOException e)
- {
- throw new CacheIOException("Reading of " + _hdfsFileName + " ("+_varName+") failed.", e);
- }
- }
-
- //cache status maintenance
- super.acquire( true, _data==null );
- updateStatusPinned(true);
- _dirtyFlag = true;
- _isAcquireFromEmpty = false;
-
- if( DMLScript.STATISTICS ){
- long t1 = System.nanoTime();
- CacheStatistics.incrementAcquireMTime(t1-t0);
- }
-
- return _data;
- }
-
- /**
- * Acquires the exclusive "write" lock for a thread that wants to throw away the
- * old matrix data and link up with new matrix data. Abandons the old matrix data
- * without reading it. Sets the new matrix data reference.
-
- * In-Status: EMPTY, EVICTABLE, EVICTED;
- * Out-Status: MODIFY.
- *
- * @param newData : the new matrix data reference
- * @return the matrix data reference, which is the same as the argument
- * @throws CacheException
- */
- public synchronized MatrixBlock acquireModify(MatrixBlock newData)
- throws CacheException
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("Acquire modify newdata "+_varName);
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
- if (! isAvailableToModify ())
- throw new CacheStatusException ("MatrixObject not available to modify.");
-
- //clear old data
- clearData();
-
- //cache status maintenance
- super.acquire (true, false); //no need to load evicted matrix
- _dirtyFlag = true;
- _isAcquireFromEmpty = false;
-
- //set references to new data
- if (newData == null)
- throw new CacheException("acquireModify with empty matrix block.");
- _data = newData;
- updateStatusPinned(true);
-
- if( DMLScript.STATISTICS ){
- long t1 = System.nanoTime();
- CacheStatistics.incrementAcquireMTime(t1-t0);
- }
-
- return _data;
- }
-
- /**
- * Releases the shared ("read-only") or exclusive ("write") lock. Updates
- * the matrix size, last-access time, metadata, etc.
- *
- * Synchronized because there might be parallel threads (parfor local) that
- * access the same MatrixObjectNew object (in case it was created before the loop).
- *
- * In-Status: READ, MODIFY;
- * Out-Status: READ(-1), EVICTABLE, EMPTY.
- *
- * @throws CacheStatusException
- */
- public synchronized void release()
- throws CacheException
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("Release "+_varName);
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
- boolean write = false;
- if ( isModify() )
- {
- //set flags for write
- write = true;
- _dirtyFlag = true;
-
- //update meta data
- refreshMetaData();
- }
-
- //compact empty in-memory block
- if( _data.isEmptyBlock(false) && _data.isAllocated() )
- _data.cleanupBlock(true, true);
-
- //cache status maintenance (pass cacheNoWrite flag)
- super.release(_isAcquireFromEmpty && !_requiresLocalWrite);
- updateStatusPinned(false);
-
- if( isCachingActive() //only if caching is enabled (otherwise keep everything in mem)
- && isCached(true) //not empty and not read/modify
- && !isUpdateInPlace() //pinned result variable
- && !isBelowCachingThreshold() ) //min size for caching
- {
- if( write || _requiresLocalWrite )
- {
- //evict blob
- String filePath = getCacheFilePathAndName();
- try {
- writeMatrix (filePath);
- }
- catch (Exception e)
- {
- throw new CacheException("Eviction to local path " + filePath + " ("+_varName+") failed.", e);
- }
- _requiresLocalWrite = false;
- }
-
- //create cache
- createCache();
- _data = null;
- }
- else if( LOG.isTraceEnabled() ){
- LOG.trace("Var "+_varName+" not subject to caching: rows="+_data.getNumRows()+", cols="+_data.getNumColumns()+", state="+getStatusAsString());
- }
-
- if( DMLScript.STATISTICS ){
- long t1 = System.nanoTime();
- CacheStatistics.incrementReleaseTime(t1-t0);
- }
- }
-
- /**
- * Sets the matrix data reference to <code>null</code>, abandons the old matrix.
- * Makes the "envelope" empty. Run it to finalize the matrix (otherwise the
- * evicted matrix file may remain undeleted).
- *
- * In-Status: EMPTY, EVICTABLE, EVICTED;
- * Out-Status: EMPTY.
- * @throws CacheException
- */
- public synchronized void clearData()
- throws CacheException
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("Clear data "+_varName);
-
- // check if cleanup enabled and possible
- if( !_cleanupFlag )
- return; // do nothing
- if( !isAvailableToModify() )
- throw new CacheStatusException ("MatrixObject (" + this.getDebugName() + ") not available to modify. Status = " + this.getStatusAsString() + ".");
-
- // clear existing WB / FS representation (but prevent unnecessary probes)
- if( !(isEmpty(true)||(_data!=null && isBelowCachingThreshold())
- ||(_data!=null && !isCachingActive()) )) //additional condition for JMLC
- freeEvictedBlob();
-
- // clear the in-memory data
- _data = null;
- clearCache();
-
- // clear rdd/broadcast back refs
- if( _rddHandle != null )
- _rddHandle.setBackReference(null);
- if( _bcHandle != null )
- _bcHandle.setBackReference(null);
-
- // change object state EMPTY
- _dirtyFlag = false;
- setEmpty();
- }
-
- public synchronized void exportData()
- throws CacheException
- {
- exportData( -1 );
- }
-
- /**
- * Writes, or flushes, the matrix data to HDFS.
- *
- * In-Status: EMPTY, EVICTABLE, EVICTED, READ;
- * Out-Status: EMPTY, EVICTABLE, EVICTED, READ.
- *
- * @throws CacheException
- */
- public synchronized void exportData( int replication )
- throws CacheException
- {
- exportData(_hdfsFileName, null, replication, null);
- _hdfsFileExists = true;
- }
-
- /**
- *
- * @param fName
- * @param outputFormat
- * @param formatProperties
- * @throws CacheException
- */
- public synchronized void exportData (String fName, String outputFormat, FileFormatProperties formatProperties)
- throws CacheException
- {
- exportData(fName, outputFormat, -1, formatProperties);
- }
-
- /**
- *
- * @param fName
- * @param outputFormat
- * @throws CacheException
- */
- public synchronized void exportData (String fName, String outputFormat)
- throws CacheException
- {
- exportData(fName, outputFormat, -1, null);
- }
-
- /**
- * Synchronized because there might be parallel threads (parfor local) that
- * access the same MatrixObjectNew object (in case it was created before the loop).
- * If all threads export the same data object concurrently it results in errors
- * because they all write to the same file. Efficiency for loops and parallel threads
- * is achieved by checking if the in-memory matrix block is dirty.
- *
- * NOTE: MB: we do not use dfs copy from local (evicted) to HDFS because this would ignore
- * the output format and most importantly would bypass reblocking during write (which effects the
- * potential degree of parallelism). However, we copy files on HDFS if certain criteria are given.
- *
- * @param fName
- * @param outputFormat
- * @throws CacheException
- */
- public synchronized void exportData (String fName, String outputFormat, int replication, FileFormatProperties formatProperties)
- throws CacheException
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("Export data "+_varName+" "+fName);
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
- //prevent concurrent modifications
- if ( !isAvailableToRead() )
- throw new CacheStatusException ("MatrixObject not available to read.");
-
- LOG.trace("Exporting " + this.getDebugName() + " to " + fName + " in format " + outputFormat);
-
- boolean pWrite = false; // !fName.equals(_hdfsFileName); //persistent write flag
- if ( fName.equals(_hdfsFileName) ) {
- _hdfsFileExists = true;
- pWrite = false;
- }
- else {
- pWrite = true; // i.e., export is called from "write" instruction
- }
-
- //actual export (note: no direct transfer of local copy in order to ensure blocking (and hence, parallelism))
- if( isDirty() || //use dirty for skipping parallel exports
- (pWrite && !isEqualOutputFormat(outputFormat)) )
- {
- // CASE 1: dirty in-mem matrix or pWrite w/ different format (write matrix to fname; load into memory if evicted)
- // a) get the matrix
- if( isEmpty(true) )
- {
- //read data from HDFS if required (never read before), this applies only to pWrite w/ different output formats
- //note: for large rdd outputs, we compile dedicated writespinstructions (no need to handle this here)
- try
- {
- if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() )
- _data = readMatrixFromHDFS( _hdfsFileName );
- else
- _data = readMatrixFromRDD( getRDDHandle(), new MutableBoolean() );
- _dirtyFlag = false;
- }
- catch (IOException e)
- {
- throw new CacheIOException("Reading of " + _hdfsFileName + " ("+_varName+") failed.", e);
- }
- }
- //get object from cache
- if( _data == null )
- getCache();
- super.acquire( false, _data==null ); //incl. read matrix if evicted
-
- // b) write the matrix
- try
- {
- writeMetaData( fName, outputFormat, formatProperties );
- writeMatrixToHDFS( fName, outputFormat, replication, formatProperties );
- if ( !pWrite )
- _dirtyFlag = false;
- }
- catch (Exception e)
- {
- throw new CacheIOException ("Export to " + fName + " failed.", e);
- }
- finally
- {
- release();
- }
- }
- else if( pWrite ) // pwrite with same output format
- {
- //CASE 2: matrix already in same format but different file on hdfs (copy matrix to fname)
- try
- {
- MapReduceTool.deleteFileIfExistOnHDFS(fName);
- MapReduceTool.deleteFileIfExistOnHDFS(fName+".mtd");
- if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() )
- MapReduceTool.copyFileOnHDFS( _hdfsFileName, fName );
- else //write might trigger rdd operations and nnz maintenance
- writeMatrixFromRDDtoHDFS(getRDDHandle(), fName, outputFormat);
- writeMetaData( fName, outputFormat, formatProperties );
- }
- catch (Exception e) {
- throw new CacheIOException ("Export to " + fName + " failed.", e);
- }
- }
- else if( getRDDHandle()!=null && //pending rdd operation
- !getRDDHandle().allowsShortCircuitRead() )
- {
- //CASE 3: pending rdd operation (other than checkpoints)
- try
- {
- writeMatrixFromRDDtoHDFS(getRDDHandle(), fName, outputFormat);
- writeMetaData( fName, outputFormat, formatProperties );
- }
- catch (Exception e) {
- throw new CacheIOException ("Export to " + fName + " failed.", e);
- }
- }
- else
- {
- //CASE 4: data already in hdfs (do nothing, no need for export)
- LOG.trace(this.getDebugName() + ": Skip export to hdfs since data already exists.");
- }
-
- if( DMLScript.STATISTICS ){
- long t1 = System.nanoTime();
- CacheStatistics.incrementExportTime(t1-t0);
- }
- }
-
- /**
- *
- * @param fName
- * @param outputFormat
- * @return
- * @throws CacheIOException
- */
- public synchronized boolean moveData(String fName, String outputFormat)
- throws CacheIOException
- {
- boolean ret = false;
-
- try
- {
- //ensure input file is persistent on hdfs (pending RDD operations),
- //file might have been written during export or collect via write/read
- if( getRDDHandle() != null && !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) {
- writeMatrixFromRDDtoHDFS(getRDDHandle(), _hdfsFileName, outputFormat);
- }
-
- //export or rename to target file on hdfs
- if( isDirty() || (!isEqualOutputFormat(outputFormat) && isEmpty(true)))
- {
- exportData(fName, outputFormat);
- ret = true;
- }
- else if( isEqualOutputFormat(outputFormat) )
- {
- MapReduceTool.deleteFileIfExistOnHDFS(fName);
- MapReduceTool.deleteFileIfExistOnHDFS(fName+".mtd");
- MapReduceTool.renameFileOnHDFS( _hdfsFileName, fName );
- writeMetaData( fName, outputFormat, null );
- ret = true;
- }
- }
- catch (Exception e)
- {
- throw new CacheIOException ("Move to " + fName + " failed.", e);
- }
-
- return ret;
- }
-
-
- // *********************************************
- // *** ***
- // *** HIGH-LEVEL PUBLIC METHODS ***
- // *** FOR PARTITIONED MATRIX ACCESS ***
- // *** (all other methods still usable) ***
- // *** ***
- // *********************************************
-
- /**
- * @param n
- *
- */
- public void setPartitioned( PDataPartitionFormat format, int n )
- {
- _partitioned = true;
- _partitionFormat = format;
- _partitionSize = n;
- }
-
-
- public void unsetPartitioned()
- {
- _partitioned = false;
- _partitionFormat = null;
- _partitionSize = -1;
- }
-
- /**
- *
- * @return
- */
- public boolean isPartitioned()
- {
- return _partitioned;
- }
-
- public PDataPartitionFormat getPartitionFormat()
- {
- return _partitionFormat;
- }
-
- public int getPartitionSize()
- {
- return _partitionSize;
- }
-
- public synchronized void setInMemoryPartition(MatrixBlock block)
- {
- _partitionInMemory = block;
- }
-
- /**
- * NOTE: for reading matrix partitions, we could cache (in its real sense) the read block
- * with soft references (no need for eviction, as partitioning only applied for read-only matrices).
- * However, since we currently only support row- and column-wise partitioning caching is not applied yet.
- * This could be changed once we also support column-block-wise and row-block-wise. Furthermore,
- * as we reject to partition vectors and support only full row or column indexing, no metadata (apart from
- * the partition flag) is required.
- *
- * @param pred
- * @return
- * @throws CacheException
- */
- public synchronized MatrixBlock readMatrixPartition( IndexRange pred )
- throws CacheException
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("Acquire partition "+_varName+" "+pred);
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
- if ( !_partitioned )
- throw new CacheStatusException ("MatrixObject not available to indexed read.");
-
- //return static partition of set from outside of the program
- if( _partitionInMemory != null )
- return _partitionInMemory;
-
- MatrixBlock mb = null;
-
- try
- {
- boolean blockwise = (_partitionFormat==PDataPartitionFormat.ROW_BLOCK_WISE || _partitionFormat==PDataPartitionFormat.COLUMN_BLOCK_WISE);
-
- //preparations for block wise access
- MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
- MatrixCharacteristics mc = iimd.getMatrixCharacteristics();
- int brlen = mc.getRowsPerBlock();
- int bclen = mc.getColsPerBlock();
-
- //get filename depending on format
- String fname = getPartitionFileName( pred, brlen, bclen );
-
- //probe cache
- if( blockwise && _partitionCacheName != null && _partitionCacheName.equals(fname) )
- {
- mb = _cache.get(); //try getting block from cache
- }
-
- if( mb == null ) //block not in cache
- {
- //get rows and cols
- long rows = -1;
- long cols = -1;
- switch( _partitionFormat )
- {
- case ROW_WISE:
- rows = 1;
- cols = mc.getCols();
- break;
- case ROW_BLOCK_WISE:
- rows = brlen;
- cols = mc.getCols();
- break;
- case COLUMN_WISE:
- rows = mc.getRows();
- cols = 1;
- break;
- case COLUMN_BLOCK_WISE:
- rows = mc.getRows();
- cols = bclen;
- break;
- default:
- throw new CacheException("Unsupported partition format: "+_partitionFormat);
- }
-
-
- //read the
- if( MapReduceTool.existsFileOnHDFS(fname) )
- mb = readMatrixFromHDFS( fname, rows, cols );
- else
- {
- mb = new MatrixBlock((int)rows, (int)cols, true);
- LOG.warn("Reading empty matrix partition "+fname);
- }
- }
-
- //post processing
- if( blockwise )
- {
- //put block into cache
- _partitionCacheName = fname;
- _cache = new SoftReference<MatrixBlock>(mb);
-
- if( _partitionFormat == PDataPartitionFormat.ROW_BLOCK_WISE )
- {
- int rix = (int)((pred.rowStart-1)%brlen);
- mb = mb.sliceOperations(rix, rix, (int)(pred.colStart-1), (int)(pred.colEnd-1), new MatrixBlock());
- }
- if( _partitionFormat == PDataPartitionFormat.COLUMN_BLOCK_WISE )
- {
- int cix = (int)((pred.colStart-1)%bclen);
- mb = mb.sliceOperations((int)(pred.rowStart-1), (int)(pred.rowEnd-1), cix, cix, new MatrixBlock());
- }
- }
-
- //NOTE: currently no special treatment of non-existing partitions necessary
- // because empty blocks are written anyway
- }
- catch(Exception ex)
- {
- throw new CacheException(ex);
- }
-
- if( DMLScript.STATISTICS ){
- long t1 = System.nanoTime();
- CacheStatistics.incrementAcquireRTime(t1-t0);
- }
-
- return mb;
- }
-
-
- /**
- *
- * @param pred
- * @return
- * @throws CacheStatusException
- */
- public String getPartitionFileName( IndexRange pred, int brlen, int bclen )
- throws CacheStatusException
- {
- if ( !_partitioned )
- throw new CacheStatusException ("MatrixObject not available to indexed read.");
-
- StringBuilder sb = new StringBuilder();
- sb.append(_hdfsFileName);
-
- switch( _partitionFormat )
- {
- case ROW_WISE:
- sb.append(Lop.FILE_SEPARATOR);
- sb.append(pred.rowStart);
- break;
- case ROW_BLOCK_WISE:
- sb.append(Lop.FILE_SEPARATOR);
- sb.append((pred.rowStart-1)/brlen+1);
- break;
- case COLUMN_WISE:
- sb.append(Lop.FILE_SEPARATOR);
- sb.append(pred.colStart);
- break;
- case COLUMN_BLOCK_WISE:
- sb.append(Lop.FILE_SEPARATOR);
- sb.append((pred.colStart-1)/bclen+1);
- break;
- default:
- throw new CacheStatusException ("MatrixObject not available to indexed read.");
- }
-
- return sb.toString();
- }
-
-
-
- // *********************************************
- // *** ***
- // *** LOW-LEVEL PROTECTED METHODS ***
- // *** EXTEND CACHEABLE DATA ***
- // *** ONLY CALLED BY THE SUPERCLASS ***
- // *** ***
- // *********************************************
-
-
- @Override
- protected boolean isBlobPresent()
- {
- return (_data != null);
- }
-
- @Override
- protected void evictBlobFromMemory ( MatrixBlock mb )
- throws CacheIOException
- {
- throw new CacheIOException("Redundant explicit eviction.");
- }
-
- @Override
- protected void restoreBlobIntoMemory ()
- throws CacheIOException
- {
- long begin = 0;
-
- if( LOG.isTraceEnabled() ) {
- LOG.trace("RESTORE of Matrix "+_varName+", "+_hdfsFileName);
- begin = System.currentTimeMillis();
- }
-
- String filePath = getCacheFilePathAndName();
-
- if( LOG.isTraceEnabled() )
- LOG.trace ("CACHE: Restoring matrix... " + _varName + " HDFS path: " +
- (_hdfsFileName == null ? "null" : _hdfsFileName) + ", Restore from path: " + filePath);
-
- if (_data != null)
- throw new CacheIOException (filePath + " : Cannot restore on top of existing in-memory data.");
-
- try
- {
- _data = readMatrix(filePath);
- }
- catch (IOException e)
- {
- throw new CacheIOException (filePath + " : Restore failed.", e);
- }
-
- //check for success
- if (_data == null)
- throw new CacheIOException (filePath + " : Restore failed.");
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Restoring matrix - COMPLETED ... " + (System.currentTimeMillis()-begin) + " msec.");
- }
-
- @Override
- protected void freeEvictedBlob()
- {
- String cacheFilePathAndName = getCacheFilePathAndName();
- long begin = 0;
- if( LOG.isTraceEnabled() ){
- LOG.trace("CACHE: Freeing evicted matrix... " + _varName + " HDFS path: " +
- (_hdfsFileName == null ? "null" : _hdfsFileName) + " Eviction path: " + cacheFilePathAndName);
- begin = System.currentTimeMillis();
- }
-
- LazyWriteBuffer.deleteMatrix(cacheFilePathAndName);
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Freeing evicted matrix - COMPLETED ... " + (System.currentTimeMillis()-begin) + " msec.");
- }
-
- @Override
- protected boolean isBelowCachingThreshold()
- {
- long rlen = _data.getNumRows();
- long clen = _data.getNumColumns();
- long nnz = _data.getNonZeros();
-
- //get in-memory size (assume dense, if nnz unknown)
- double sparsity = OptimizerUtils.getSparsity( rlen, clen, nnz );
- double size = MatrixBlock.estimateSizeInMemory( rlen, clen, sparsity );
-
- return ( !_data.isAllocated() || size <= CACHING_THRESHOLD );
- }
-
- // *******************************************
- // *** ***
- // *** LOW-LEVEL PRIVATE METHODS ***
- // *** FOR MATRIX I/O ***
- // *** ***
- // *******************************************
-
- private boolean isUpdateInPlace()
- {
- return _updateInPlaceFlag;
- }
-
- /**
- *
- */
- private String getCacheFilePathAndName ()
- {
- if( _cacheFileName==null )
- {
- StringBuilder sb = new StringBuilder();
- sb.append(CacheableData.cacheEvictionLocalFilePath);
- sb.append(CacheableData.cacheEvictionLocalFilePrefix);
- sb.append(String.format ("%09d", getUniqueCacheID()));
- sb.append(CacheableData.cacheEvictionLocalFileExtension);
-
- _cacheFileName = sb.toString();
- }
-
- return _cacheFileName;
- }
-
- /**
- *
- * @param filePathAndName
- * @return
- * @throws IOException
- */
- private MatrixBlock readMatrix (String filePathAndName)
- throws IOException
- {
- return LazyWriteBuffer.readMatrix(filePathAndName);
- }
-
- /**
- *
- * @param filePathAndName
- * @return
- * @throws IOException
- */
- private MatrixBlock readMatrixFromHDFS(String filePathAndName)
- throws IOException
- {
- MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
- MatrixCharacteristics mc = iimd.getMatrixCharacteristics();
- return readMatrixFromHDFS( filePathAndName, mc.getRows(), mc.getCols() );
- }
-
- /**
- *
- * @param rdd
- * @return
- * @throws IOException
- */
- private MatrixBlock readMatrixFromRDD(RDDObject rdd, MutableBoolean writeStatus)
- throws IOException
- {
- //note: the read of a matrix block from an RDD might trigger
- //lazy evaluation of pending transformations.
- RDDObject lrdd = rdd;
-
- //prepare return status (by default only collect)
- writeStatus.setValue(false);
-
- MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
- MatrixCharacteristics mc = iimd.getMatrixCharacteristics();
- MatrixBlock mb = null;
- try
- {
- //prevent unnecessary collect through rdd checkpoint
- if( rdd.allowsShortCircuitCollect() ) {
- lrdd = (RDDObject)rdd.getLineageChilds().get(0);
- }
-
- //obtain matrix block from RDD
- int rlen = (int)mc.getRows();
- int clen = (int)mc.getCols();
- int brlen = (int)mc.getRowsPerBlock();
- int bclen = (int)mc.getColsPerBlock();
- long nnz = mc.getNonZeros();
-
- //guarded rdd collect
- if( !OptimizerUtils.checkSparkCollectMemoryBudget(rlen, clen, brlen, bclen, nnz, sizePinned.get()) ) {
- //write RDD to hdfs and read to prevent invalid collect mem consumption
- //note: lazy, partition-at-a-time collect (toLocalIterator) was significantly slower
- if( !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing file
- long newnnz = SparkExecutionContext.writeRDDtoHDFS(lrdd, _hdfsFileName, iimd.getOutputInfo());
- ((MatrixDimensionsMetaData) _metaData).getMatrixCharacteristics().setNonZeros(newnnz);
- ((RDDObject)rdd).setHDFSFile(true); //mark rdd as hdfs file (for restore)
- writeStatus.setValue(true); //mark for no cache-write on read
- }
- mb = readMatrixFromHDFS(_hdfsFileName);
- }
- else {
- //collect matrix block from RDD
- mb = SparkExecutionContext.toMatrixBlock(lrdd, rlen, clen, brlen, bclen, nnz);
- }
- }
- catch(DMLRuntimeException ex) {
- throw new IOException(ex);
- }
-
- //sanity check correct output
- if( mb == null ) {
- throw new IOException("Unable to load matrix from rdd: "+lrdd.getVarName());
- }
-
- return mb;
- }
-
- /**
- *
- * @param rdd
- * @param fname
- * @param outputFormat
- * @throws DMLRuntimeException
- */
- private void writeMatrixFromRDDtoHDFS(RDDObject rdd, String fname, String outputFormat)
- throws DMLRuntimeException
- {
- //prepare output info
- MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
- OutputInfo oinfo = (outputFormat != null ? OutputInfo.stringToOutputInfo (outputFormat)
- : InputInfo.getMatchingOutputInfo (iimd.getInputInfo ()));
-
- //note: the write of an RDD to HDFS might trigger
- //lazy evaluation of pending transformations.
- long newnnz = SparkExecutionContext.writeRDDtoHDFS(rdd, fname, oinfo);
- ((MatrixDimensionsMetaData) _metaData).getMatrixCharacteristics().setNonZeros(newnnz);
- }
-
- /**
- *
- * @param filePathAndName
- * @param rlen
- * @param clen
- * @return
- * @throws IOException
- */
- private MatrixBlock readMatrixFromHDFS(String filePathAndName, long rlen, long clen)
- throws IOException
- {
- long begin = 0;
-
- MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
- MatrixCharacteristics mc = iimd.getMatrixCharacteristics();
-
- if( LOG.isTraceEnabled() ){
- LOG.trace("Reading matrix from HDFS... " + _varName + " Path: " + filePathAndName
- + ", dimensions: [" + mc.getRows() + ", " + mc.getCols() + ", " + mc.getNonZeros() + "]");
- begin = System.currentTimeMillis();
- }
-
- double sparsity = ( mc.getNonZeros() >= 0 ? ((double)mc.getNonZeros())/(mc.getRows()*mc.getCols()) : 1.0d) ; //expected sparsity
- MatrixBlock newData = DataConverter.readMatrixFromHDFS(filePathAndName, iimd.getInputInfo(),
- rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), sparsity, _formatProperties);
-
- //sanity check correct output
- if( newData == null ) {
- throw new IOException("Unable to load matrix from file: "+filePathAndName);
- }
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Reading Completed: " + (System.currentTimeMillis()-begin) + " msec.");
-
- return newData;
- }
-
- /**
- *
- * @param filePathAndName
- * @throws DMLRuntimeException
- * @throws IOException
- */
- private void writeMatrix (String filePathAndName)
- throws DMLRuntimeException, IOException
- {
- LazyWriteBuffer.writeMatrix(filePathAndName, _data);
- }
-
- /**
- * Writes in-memory matrix to HDFS in a specified format.
- *
- * @throws DMLRuntimeException
- * @throws IOException
- */
- private void writeMatrixToHDFS (String filePathAndName, String outputFormat, int replication, FileFormatProperties formatProperties)
- throws DMLRuntimeException, IOException
- {
- long begin = 0;
- if( LOG.isTraceEnabled() ){
- LOG.trace (" Writing matrix to HDFS... " + _varName + " Path: " + filePathAndName + ", Format: " +
- (outputFormat != null ? outputFormat : "inferred from metadata"));
- begin = System.currentTimeMillis();
- }
-
- MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
-
- if (_data != null)
- {
- // Get the dimension information from the metadata stored within MatrixObject
- MatrixCharacteristics mc = iimd.getMatrixCharacteristics ();
- // Write the matrix to HDFS in requested format
- OutputInfo oinfo = (outputFormat != null ? OutputInfo.stringToOutputInfo (outputFormat)
- : InputInfo.getMatchingOutputInfo (iimd.getInputInfo ()));
-
- // when outputFormat is binaryblock, make sure that matrixCharacteristics has correct blocking dimensions
- // note: this is only required if singlenode (due to binarycell default)
- if ( oinfo == OutputInfo.BinaryBlockOutputInfo && DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE &&
- (mc.getRowsPerBlock() != DMLTranslator.DMLBlockSize || mc.getColsPerBlock() != DMLTranslator.DMLBlockSize) )
- {
- DataConverter.writeMatrixToHDFS(_data, filePathAndName, oinfo, new MatrixCharacteristics(mc.getRows(), mc.getCols(), DMLTranslator.DMLBlockSize, DMLTranslator.DMLBlockSize, mc.getNonZeros()), replication, formatProperties);
- }
- else {
- DataConverter.writeMatrixToHDFS(_data, filePathAndName, oinfo, mc, replication, formatProperties);
- }
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Writing matrix to HDFS ("+filePathAndName+") - COMPLETED... " + (System.currentTimeMillis()-begin) + " msec.");
- }
- else if( LOG.isTraceEnabled() )
- {
- LOG.trace ("Writing matrix to HDFS ("+filePathAndName+") - NOTHING TO WRITE (_data == null).");
- }
-
- if( DMLScript.STATISTICS )
- CacheStatistics.incrementHDFSWrites();
- }
-
- /**
- *
- * @param filePathAndName
- * @param outputFormat
- * @throws DMLRuntimeException
- * @throws IOException
- */
- private void writeMetaData (String filePathAndName, String outputFormat, FileFormatProperties formatProperties)
- throws DMLRuntimeException, IOException
- {
- MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
-
- if (iimd != null)
- {
- // Write the matrix to HDFS in requested format
- OutputInfo oinfo = (outputFormat != null ? OutputInfo.stringToOutputInfo (outputFormat)
- : InputInfo.getMatchingOutputInfo (iimd.getInputInfo ()));
-
- if ( oinfo != OutputInfo.MatrixMarketOutputInfo ) {
- // Get the dimension information from the metadata stored within MatrixObject
- MatrixCharacteristics mc = iimd.getMatrixCharacteristics ();
-
- // when outputFormat is binaryblock, make sure that matrixCharacteristics has correct blocking dimensions
- // note: this is only required if singlenode (due to binarycell default)
- if ( oinfo == OutputInfo.BinaryBlockOutputInfo && DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE &&
- (mc.getRowsPerBlock() != DMLTranslator.DMLBlockSize || mc.getColsPerBlock() != DMLTranslator.DMLBlockSize) )
- {
- mc = new MatrixCharacteristics(mc.getRows(), mc.getCols(), DMLTranslator.DMLBlockSize, DMLTranslator.DMLBlockSize, mc.getNonZeros());
- }
- MapReduceTool.writeMetaDataFile (filePathAndName + ".mtd", valueType, mc, oinfo, formatProperties);
- }
- }
- else {
- throw new DMLRuntimeException("Unexpected error while writing mtd file (" + filePathAndName + ") -- metadata is null.");
- }
- }
-
- /**
- *
- * @param outputFormat
- * @return
- */
- private boolean isEqualOutputFormat( String outputFormat )
- {
- boolean ret = true;
-
- if( outputFormat != null )
- {
- try
- {
- MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
- OutputInfo oi1 = InputInfo.getMatchingOutputInfo( iimd.getInputInfo() );
- OutputInfo oi2 = OutputInfo.stringToOutputInfo( outputFormat );
- if( oi1 != oi2 )
- {
- ret = false;
- }
- }
- catch(Exception ex)
- {
- ret = false;
- }
- }
-
- return ret;
- }
-
- @Override
- public synchronized String getDebugName()
- {
- int maxLength = 23;
- String debugNameEnding = (_hdfsFileName == null ? "null" :
- (_hdfsFileName.length() < maxLength ? _hdfsFileName : "..." +
- _hdfsFileName.substring (_hdfsFileName.length() - maxLength + 3)));
- return _varName + " " + debugNameEnding;
- }
-
-
- // *******************************************
- // *** ***
- // *** LOW-LEVEL PRIVATE METHODS ***
- // *** FOR SOFTREFERENCE CACHE ***
- // *** ***
- // *******************************************
-
- /**
- *
- */
- private void createCache( )
- {
- _cache = new SoftReference<MatrixBlock>( _data );
- }
-
- /**
- *
- */
- private void getCache()
- {
- if( _cache !=null )
- {
- _data = _cache.get();
- clearCache();
- }
- }
-
- /**
- *
- */
- private void clearCache()
- {
- if( _cache != null )
- {
- _cache.clear();
- _cache = null;
- }
- }
-
- /**
- *
- * @param add
- */
- private void updateStatusPinned(boolean add) {
- if( _data != null ) { //data should never be null
- long size = sizePinned.get();
- size += (add ? 1 : -1) * _data.getSizeInMemory();
- sizePinned.set( Math.max(size,0) );
- }
- }
-
- /**
- * see clear data
- *
- * @param flag
- */
- public void enableCleanup(boolean flag)
- {
- _cleanupFlag = flag;
- }
-
- /**
- * see clear data
- *
- * @return
- */
- public boolean isCleanupEnabled()
- {
- return _cleanupFlag;
- }
-
- /**
- *
- * @param flag
- */
- public void enableUpdateInPlace(boolean flag)
- {
- _updateInPlaceFlag = flag;
- }
-
- /**
- *
- * @return
- */
- public boolean isUpdateInPlaceEnabled()
- {
- return _updateInPlaceFlag;
- }
-
-
-
- /**
- *
- */
- public void setEmptyStatus()
- {
- setEmpty();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/PageCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/PageCache.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/PageCache.java
deleted file mode 100644
index 2219f58..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/caching/PageCache.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.caching;
-
-import java.lang.ref.SoftReference;
-import java.util.HashMap;
-import java.util.LinkedList;
-
-/**
- *
- *
- */
-public class PageCache
-{
-
- private static final int CLEANUP_THRESHOLD = 128;
- private static HashMap<Integer, LinkedList<SoftReference<byte[]>>> _pool;
-
- /**
- *
- */
- public static void init()
- {
- _pool = new HashMap<Integer, LinkedList<SoftReference<byte[]>>>();
- }
-
- /**
- *
- */
- public static void clear()
- {
- _pool = null;
- }
-
- /**
- *
- * @param data
- */
- public static void putPage( byte[] data )
- {
- //cleanup if too many different size lists
- if( _pool.size()>CLEANUP_THRESHOLD )
- _pool.clear();
-
- LinkedList<SoftReference<byte[]>> list = _pool.get( data.length );
- if( list==null ){
- list = new LinkedList<SoftReference<byte[]>>();
- _pool.put(data.length, list);
- }
- list.addLast(new SoftReference<byte[]>(data));
- }
-
- /**
- *
- * @param size
- * @return
- */
- public static byte[] getPage( int size )
- {
- LinkedList<SoftReference<byte[]>> list = _pool.get( size );
- if( list!=null ) {
- while( !list.isEmpty() ){
- SoftReference<byte[]> ref = list.removeFirst();
- byte[] tmp = ref.get();
- if( tmp!=null )
- return tmp;
- }
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/ExecutionContext.java
deleted file mode 100644
index 8bdfed8..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/ExecutionContext.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.context;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.debug.DMLFrame;
-import com.ibm.bi.dml.debug.DMLProgramCounter;
-import com.ibm.bi.dml.debug.DebugState;
-import com.ibm.bi.dml.parser.DMLProgram;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.LocalVariableMap;
-import com.ibm.bi.dml.runtime.controlprogram.Program;
-import com.ibm.bi.dml.runtime.controlprogram.caching.CacheException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.instructions.Instruction;
-import com.ibm.bi.dml.runtime.instructions.cp.BooleanObject;
-import com.ibm.bi.dml.runtime.instructions.cp.Data;
-import com.ibm.bi.dml.runtime.instructions.cp.DoubleObject;
-import com.ibm.bi.dml.runtime.instructions.cp.FunctionCallCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-import com.ibm.bi.dml.runtime.instructions.cp.ScalarObject;
-import com.ibm.bi.dml.runtime.instructions.cp.StringObject;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixDimensionsMetaData;
-import com.ibm.bi.dml.runtime.matrix.MetaData;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-import com.ibm.bi.dml.runtime.util.UtilFunctions;
-
-
-public class ExecutionContext
-{
-
- //program reference (e.g., function repository)
- protected Program _prog = null;
-
- //symbol table
- protected LocalVariableMap _variables;
-
- //debugging (optional)
- protected DebugState _dbState = null;
-
- protected ExecutionContext()
- {
- //protected constructor to force use of ExecutionContextFactory
- this( true, null );
- }
-
- protected ExecutionContext(Program prog)
- {
- //protected constructor to force use of ExecutionContextFactory
- this( true, prog );
-
- }
-
- protected ExecutionContext(LocalVariableMap vars)
- {
- //protected constructor to force use of ExecutionContextFactory
- this( false, null);
- _variables = vars;
- }
-
- protected ExecutionContext( boolean allocateVariableMap, Program prog )
- {
- //protected constructor to force use of ExecutionContextFactory
- if( allocateVariableMap )
- _variables = new LocalVariableMap();
- else
- _variables = null;
- _prog = prog;
- if (DMLScript.ENABLE_DEBUG_MODE){
- _dbState = DebugState.getInstance();
- }
- }
-
- public Program getProgram(){
- return _prog;
- }
-
- public LocalVariableMap getVariables() {
- return _variables;
- }
-
- public void setVariables(LocalVariableMap vars) {
- _variables = vars;
- }
-
-
- /* -------------------------------------------------------
- * Methods to handle variables and associated data objects
- * -------------------------------------------------------
- */
-
- public Data getVariable(String name)
- {
- return _variables.get(name);
- }
-
- public void setVariable(String name, Data val)
- throws DMLRuntimeException
- {
- _variables.put(name, val);
- }
-
- public Data removeVariable(String name)
- {
- return _variables.remove(name);
- }
-
- public void setMetaData(String fname, MetaData md)
- throws DMLRuntimeException
- {
- _variables.get(fname).setMetaData(md);
- }
-
- public MetaData getMetaData(String varname)
- throws DMLRuntimeException
- {
- return _variables.get(varname).getMetaData();
- }
-
- public void removeMetaData(String varname)
- throws DMLRuntimeException
- {
- _variables.get(varname).removeMetaData();
- }
-
- public MatrixObject getMatrixObject(String varname)
- throws DMLRuntimeException
- {
- Data dat = getVariable(varname);
-
- //error handling if non existing or no matrix
- if( dat == null )
- throw new DMLRuntimeException("Variable '"+varname+"' does not exist in the symbol table.");
- if( !(dat instanceof MatrixObject) )
- throw new DMLRuntimeException("Variable '"+varname+"' is not a matrix.");
-
- return (MatrixObject) dat;
- }
-
- public MatrixCharacteristics getMatrixCharacteristics( String varname )
- throws DMLRuntimeException
- {
- MatrixDimensionsMetaData dims = (MatrixDimensionsMetaData) getMetaData(varname);
- return dims.getMatrixCharacteristics();
- }
-
- public MatrixBlock getMatrixInput(String varName)
- throws DMLRuntimeException
- {
- try {
- MatrixObject mobj = (MatrixObject) getVariable(varName);
- return mobj.acquireRead();
- } catch (CacheException e) {
- throw new DMLRuntimeException( e );
- }
- }
-
- public void releaseMatrixInput(String varName)
- throws DMLRuntimeException
- {
- try {
- ((MatrixObject)getVariable(varName)).release();
- } catch (CacheException e) {
- throw new DMLRuntimeException(e);
- }
- }
-
- public ScalarObject getScalarInput(String name, ValueType vt, boolean isLiteral)
- throws DMLRuntimeException
- {
- if ( isLiteral ) {
- switch (vt) {
- case INT:
- long intVal = UtilFunctions.parseToLong(name);
- IntObject intObj = new IntObject(intVal);
- return intObj;
- case DOUBLE:
- double doubleVal = Double.parseDouble(name);
- DoubleObject doubleObj = new DoubleObject(doubleVal);
- return doubleObj;
- case BOOLEAN:
- Boolean boolVal = Boolean.parseBoolean(name);
- BooleanObject boolObj = new BooleanObject(boolVal);
- return boolObj;
- case STRING:
- StringObject stringObj = new StringObject(name);
- return stringObj;
- default:
- throw new DMLRuntimeException("Unknown value type: " + vt + " for variable: " + name);
- }
- }
- else {
- Data obj = getVariable(name);
- if (obj == null) {
- throw new DMLRuntimeException("Unknown variable: " + name);
- }
- return (ScalarObject) obj;
- }
- }
-
-
- public void setScalarOutput(String varName, ScalarObject so)
- throws DMLRuntimeException
- {
- setVariable(varName, so);
- }
-
- public void setMatrixOutput(String varName, MatrixBlock outputData)
- throws DMLRuntimeException
- {
- MatrixObject sores = (MatrixObject) this.getVariable (varName);
-
- try
- {
- sores.acquireModify (outputData);
- sores.release();
-
- setVariable (varName, sores);
- }
- catch ( CacheException e ) {
- throw new DMLRuntimeException( e );
- }
- }
-
- /**
- *
- * @param varName
- * @param outputData
- * @param inplace
- * @throws DMLRuntimeException
- */
- public void setMatrixOutput(String varName, MatrixBlock outputData, boolean inplace)
- throws DMLRuntimeException
- {
- if( inplace ) //modify metadata to prevent output serialization
- {
- MatrixObject sores = (MatrixObject) this.getVariable (varName);
- sores.enableUpdateInPlace( true );
- }
-
- //default case
- setMatrixOutput(varName, outputData);
- }
-
- /**
- * Pin a given list of variables i.e., set the "clean up" state in
- * corresponding matrix objects, so that the cached data inside these
- * objects is not cleared and the corresponding HDFS files are not
- * deleted (through rmvar instructions).
- *
- * This is necessary for: function input variables, parfor result variables,
- * parfor shared inputs that are passed to functions.
- *
- * The function returns the OLD "clean up" state of matrix objects.
- */
- public HashMap<String,Boolean> pinVariables(ArrayList<String> varList)
- {
- //2-pass approach since multiple vars might refer to same matrix object
- HashMap<String, Boolean> varsState = new HashMap<String,Boolean>();
-
- //step 1) get current information
- for( String var : varList )
- {
- Data dat = _variables.get(var);
- if( dat instanceof MatrixObject )
- {
- MatrixObject mo = (MatrixObject)dat;
- varsState.put( var, mo.isCleanupEnabled() );
- //System.out.println("pre-pin "+var+" ("+mo.isCleanupEnabled()+")");
- }
- }
-
- //step 2) pin variables
- for( String var : varList )
- {
- Data dat = _variables.get(var);
- if( dat instanceof MatrixObject )
- {
- MatrixObject mo = (MatrixObject)dat;
- mo.enableCleanup(false);
- //System.out.println("pin "+var);
- }
- }
-
- return varsState;
- }
-
- /**
- * Unpin the a given list of variables by setting their "cleanup" status
- * to the values specified by <code>varsStats</code>.
- *
- * Typical usage:
- * <code>
- * oldStatus = pinVariables(varList);
- * ...
- * unpinVariables(varList, oldStatus);
- * </code>
- *
- * i.e., a call to unpinVariables() is preceded by pinVariables().
- */
- public void unpinVariables(ArrayList<String> varList, HashMap<String,Boolean> varsState)
- {
- for( String var : varList)
- {
- //System.out.println("unpin "+var+" ("+varsState.get(var)+")");
- Data dat = _variables.get(var);
- if( dat instanceof MatrixObject )
- ((MatrixObject)dat).enableCleanup(varsState.get(var));
- }
- }
-
- /**
- * NOTE: No order guaranteed, so keep same list for pin and unpin.
- *
- * @return
- */
- public ArrayList<String> getVarList()
- {
- ArrayList<String> varlist = new ArrayList<String>();
- varlist.addAll(_variables.keySet());
- return varlist;
- }
-
-
- /**
- *
- * @param mo
- * @throws CacheException
- * @throws IOException
- */
- public void cleanupMatrixObject(MatrixObject mo)
- throws DMLRuntimeException
- {
- try
- {
- if ( mo.isCleanupEnabled() )
- {
- //compute ref count only if matrix cleanup actually necessary
- if ( !getVariables().hasReferences(mo) ) {
- //clean cached data
- mo.clearData();
- if( mo.isFileExists() )
- {
- //clean hdfs data
- String fpath = mo.getFileName();
- if (fpath != null) {
- MapReduceTool.deleteFileIfExistOnHDFS(fpath);
- MapReduceTool.deleteFileIfExistOnHDFS(fpath + ".mtd");
- }
- }
- }
- }
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
- }
-
-
- ///////////////////////////////
- // Debug State Functionality
- ///////////////////////////////
-
- public void initDebugProgramCounters()
- {
- if (DMLScript.ENABLE_DEBUG_MODE){
- _dbState.pc = new DMLProgramCounter(DMLProgram.DEFAULT_NAMESPACE, "main", 0, 0); //initialize program counter (pc)
- _dbState.prevPC = new DMLProgramCounter(DMLProgram.DEFAULT_NAMESPACE, "main", 0, 0); //initialize previous pc
- }
- }
-
- /**
- *
- * @param index
- * @throws DMLRuntimeException
- */
- public void updateDebugState( int index ) throws DMLRuntimeException
- {
- if(DMLScript.ENABLE_DEBUG_MODE) {
- _dbState.getPC().setProgramBlockNumber(index);
- }
- }
-
- /**
- *
- * @param currInst
- * @throws DMLRuntimeException
- */
- public void updateDebugState( Instruction currInst ) throws DMLRuntimeException
- {
- if (DMLScript.ENABLE_DEBUG_MODE) {
- // New change so that shell doesnot seem like it is hanging while running MR job
- // Since UI cannot accept instructions when user is submitting the program
- _dbState.nextCommand = false;
- // Change to stop before first instruction of a given line
- //update current instruction ID and line number
- _dbState.getPC().setInstID(currInst.getInstID());
- _dbState.getPC().setLineNumber(currInst.getLineNum());
- // Change to stop before first instruction of a given line
- suspendIfAskedInDebugMode(currInst);
- }
- }
-
- /**
- *
- */
- public void clearDebugProgramCounters()
- {
- if(DMLScript.ENABLE_DEBUG_MODE) {
- _dbState.pc = null;
- }
- }
-
- public void handleDebugException( Exception ex )
- {
- _dbState.getDMLStackTrace(ex);
- _dbState.suspend = true;
- }
-
- public void handleDebugFunctionEntry( FunctionCallCPInstruction funCallInst ) throws DMLRuntimeException
- {
- //push caller frame into call stack
- _dbState.pushFrame(getVariables(), _dbState.getPC());
- //initialize pc for callee's frame
- _dbState.pc = new DMLProgramCounter(funCallInst.getNamespace(), funCallInst.getFunctionName(), 0, 0);
- }
-
- public void handleDebugFunctionExit( FunctionCallCPInstruction funCallInst )
- {
- //pop caller frame from call stack
- DMLFrame fr = _dbState.popFrame();
- //update pc to caller's frame
- _dbState.pc = fr.getPC();
- }
-
- public DebugState getDebugState() {
- return _dbState;
- }
-
- /**
- * This function should be called only if user has specified -debug option.
- * In this function, if the user has issued one of the step instructions or
- * has enabled suspend flag in previous instruction (through breakpoint),
- * then it will wait until user issues a new debugger command.
- * @param currInst
- * @param ec
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("deprecation")
- private void suspendIfAskedInDebugMode(Instruction currInst ) throws DMLRuntimeException {
- if (!DMLScript.ENABLE_DEBUG_MODE) {
- System.err.println("ERROR: The function suspendIfAskedInDebugMode should not be called in non-debug mode.");
- }
- //check for stepping options
- if (!_dbState.suspend && _dbState.dbCommand != null) {
- if (_dbState.dbCommand.equalsIgnoreCase("step_instruction")) {
- System.out.format("Step instruction reached at %s.\n", _dbState.getPC().toString());
- _dbState.suspend = true;
- }
- else if (_dbState.dbCommand.equalsIgnoreCase("step_line") && _dbState.prevPC.getLineNumber() != currInst.getLineNum()
- && _dbState.prevPC.getLineNumber() != 0) {
- // Don't step into first instruction of first line
- // System.out.format("Step reached at %s.\n", this._prog.getPC().toString());
- System.out.format("Step reached at %s.\n", _dbState.getPC().toStringWithoutInstructionID());
- _dbState.suspend = true;
- }
- else if (_dbState.dbCommand.equalsIgnoreCase("step return") && currInst instanceof FunctionCallCPInstruction) {
- FunctionCallCPInstruction funCallInst = (FunctionCallCPInstruction) currInst;
- if (_dbState.dbCommandArg == null || funCallInst.getFunctionName().equalsIgnoreCase(_dbState.dbCommandArg)) {
- System.out.format("Step return reached at %s.\n", _dbState.getPC().toStringWithoutInstructionID());
- _dbState.suspend = true;
- }
- }
- }
- //check if runtime suspend signal is set
- if (_dbState.suspend) {
- //flush old commands and arguments
- _dbState.dbCommand = null;
- _dbState.dbCommandArg = null;
- //print current DML script source line
- if (currInst.getLineNum() != 0)
- _dbState.printDMLSourceLine(currInst.getLineNum());
- //save current symbol table
- _dbState.setVariables(this.getVariables());
- //send next command signal to debugger control module
- _dbState.nextCommand = true;
- //suspend runtime execution thread
- Thread.currentThread().suspend();
- //reset next command signal
- _dbState.nextCommand = false;
- }
- //reset runtime suspend signal
- _dbState.suspend = false;
- //update previous pc
- _dbState.prevPC.setFunctionName(_dbState.getPC().getFunctionName());
- _dbState.prevPC.setProgramBlockNumber(_dbState.getPC().getProgramBlockNumber());
- _dbState.prevPC.setLineNumber(currInst.getLineNum());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/ExecutionContextFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/ExecutionContextFactory.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/ExecutionContextFactory.java
deleted file mode 100644
index 62b517c..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/ExecutionContextFactory.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.context;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.runtime.controlprogram.Program;
-
-public class ExecutionContextFactory
-{
-
- /**
- *
- * @return
- */
- public static ExecutionContext createContext()
- {
- return createContext( null );
- }
-
- public static ExecutionContext createContext( Program prog )
- {
- return createContext(true, prog);
- }
-
- /**
- *
- * @param platform
- * @return
- */
- public static ExecutionContext createContext( boolean allocateVars, Program prog )
- {
- ExecutionContext ec = null;
-
- switch( DMLScript.rtplatform )
- {
- case SINGLE_NODE:
- case HADOOP:
- case HYBRID:
- ec = new ExecutionContext(allocateVars, prog);
- break;
-
- case SPARK:
- case HYBRID_SPARK:
- ec = new SparkExecutionContext(allocateVars, prog);
- break;
- }
-
- return ec;
- }
-}