You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:45:38 UTC
[07/78] [abbrv] [partial] incubator-systemml git commit: Move files
to new package folder structure
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
deleted file mode 100644
index 48b4699..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
+++ /dev/null
@@ -1,1155 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.context;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.storage.RDDInfo;
-import org.apache.spark.storage.StorageLevel;
-
-import scala.Tuple2;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.api.MLContext;
-import com.ibm.bi.dml.api.MLContextProxy;
-import com.ibm.bi.dml.hops.OptimizerUtils;
-import com.ibm.bi.dml.lops.Checkpoint;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.Program;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import com.ibm.bi.dml.runtime.instructions.spark.CheckpointSPInstruction;
-import com.ibm.bi.dml.runtime.instructions.spark.SPInstruction;
-import com.ibm.bi.dml.runtime.instructions.spark.data.BlockPartitioner;
-import com.ibm.bi.dml.runtime.instructions.spark.data.BroadcastObject;
-import com.ibm.bi.dml.runtime.instructions.spark.data.LineageObject;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
-import com.ibm.bi.dml.runtime.instructions.spark.data.RDDObject;
-import com.ibm.bi.dml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
-import com.ibm.bi.dml.runtime.instructions.spark.functions.CopyBlockPairFunction;
-import com.ibm.bi.dml.runtime.instructions.spark.functions.CopyTextInputFunction;
-import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
-import com.ibm.bi.dml.runtime.instructions.spark.utils.SparkUtils;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-import com.ibm.bi.dml.utils.Statistics;
-
-
-public class SparkExecutionContext extends ExecutionContext
-{
-
- private static final Log LOG = LogFactory.getLog(SparkExecutionContext.class.getName());
-
- //internal configurations
- private static boolean LAZY_SPARKCTX_CREATION = true;
- private static boolean ASYNCHRONOUS_VAR_DESTROY = true;
- private static boolean FAIR_SCHEDULER_MODE = true;
-
- //executor memory and relative fractions as obtained from the spark configuration
- private static long _memExecutors = -1; //mem per executors
- private static double _memRatioData = -1;
- private static double _memRatioShuffle = -1;
- private static int _numExecutors = -1; //total executors
- private static int _defaultPar = -1; //total vcores
- private static boolean _confOnly = false; //infrastructure info based on config
-
- // Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.
- // This limitation may eventually be removed; see SPARK-2243 for more details.
- private static JavaSparkContext _spctx = null;
-
- protected SparkExecutionContext(Program prog)
- {
- //protected constructor to force use of ExecutionContextFactory
- this( true, prog );
- }
-
- protected SparkExecutionContext(boolean allocateVars, Program prog)
- {
- //protected constructor to force use of ExecutionContextFactory
- super( allocateVars, prog );
-
- //spark context creation via internal initializer
- if( !(LAZY_SPARKCTX_CREATION && OptimizerUtils.isHybridExecutionMode()) ) {
- initSparkContext();
- }
- }
-
- /**
- * Returns the used singleton spark context. In case of lazy spark context
- * creation, this methods blocks until the spark context is created.
- *
- * @return
- */
- public JavaSparkContext getSparkContext()
- {
- //lazy spark context creation on demand (lazy instead of asynchronous
- //to avoid wait for uninitialized spark context on close)
- if( LAZY_SPARKCTX_CREATION ) {
- initSparkContext();
- }
-
- //return the created spark context
- return _spctx;
- }
-
- /**
- *
- * @return
- */
- public static JavaSparkContext getSparkContextStatic()
- {
- initSparkContext();
- return _spctx;
- }
-
- /**
- *
- */
- public void close()
- {
- synchronized( SparkExecutionContext.class ) {
- if( _spctx != null )
- {
- //stop the spark context if existing
- _spctx.stop();
-
- //make sure stopped context is never used again
- _spctx = null;
- }
-
- }
- }
-
- public static boolean isLazySparkContextCreation(){
- return LAZY_SPARKCTX_CREATION;
- }
-
- /**
- *
- */
- private synchronized static void initSparkContext()
- {
- //check for redundant spark context init
- if( _spctx != null )
- return;
-
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
- //create a default spark context (master, appname, etc refer to system properties
- //as given in the spark configuration or during spark-submit)
-
- MLContext mlCtx = MLContextProxy.getActiveMLContext();
- if(mlCtx != null)
- {
- // This is when DML is called through spark shell
- // Will clean the passing of static variables later as this involves minimal change to DMLScript
- _spctx = new JavaSparkContext(mlCtx.getSparkContext());
- }
- else
- {
- if(DMLScript.USE_LOCAL_SPARK_CONFIG) {
- // For now set 4 cores for integration testing :)
- SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("My local integration test app");
- // This is discouraged in spark but have added only for those testcase that cannot stop the context properly
- // conf.set("spark.driver.allowMultipleContexts", "true");
- conf.set("spark.ui.enabled", "false");
- _spctx = new JavaSparkContext(conf);
- }
- else //default cluster setup
- {
- //setup systemml-preferred spark configuration (w/o user choice)
- SparkConf conf = new SparkConf();
-
- //always set unlimited result size (required for cp collect)
- conf.set("spark.driver.maxResultSize", "0");
-
- //always use the fair scheduler (for single jobs, it's equivalent to fifo
- //but for concurrent jobs in parfor it ensures better data locality because
- //round robin assignment mitigates the problem of 'sticky slots')
- if( FAIR_SCHEDULER_MODE ) {
- conf.set("spark.scheduler.mode", "FAIR");
- }
-
- _spctx = new JavaSparkContext(conf);
- }
- }
-
- //globally add binaryblock serialization framework for all hdfs read/write operations
- //TODO if spark context passed in from outside (mlcontext), we need to clean this up at the end
- if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
- MRJobConfiguration.addBinaryBlockSerializationFramework( _spctx.hadoopConfiguration() );
-
- //statistics maintenance
- if( DMLScript.STATISTICS ){
- Statistics.setSparkCtxCreateTime(System.nanoTime()-t0);
- }
- }
-
- /**
- * Spark instructions should call this for all matrix inputs except broadcast
- * variables.
- *
- * @param varname
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- @SuppressWarnings("unchecked")
- public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- return (JavaPairRDD<MatrixIndexes,MatrixBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo);
- }
-
- /**
- *
- * @param varname
- * @param inputInfo
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- MatrixObject mo = getMatrixObject(varname);
- return getRDDHandleForMatrixObject(mo, inputInfo);
- }
-
- /**
- * This call returns an RDD handle for a given matrix object. This includes
- * the creation of RDDs for in-memory or binary-block HDFS data.
- *
- *
- * @param varname
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- @SuppressWarnings("unchecked")
- public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- //NOTE: MB this logic should be integrated into MatrixObject
- //However, for now we cannot assume that spark libraries are
- //always available and hence only store generic references in
- //matrix object while all the logic is in the SparkExecContext
-
- JavaPairRDD<?,?> rdd = null;
- //CASE 1: rdd already existing (reuse if checkpoint or trigger
- //pending rdd operations if not yet cached but prevent to re-evaluate
- //rdd operations if already executed and cached
- if( mo.getRDDHandle()!=null
- && (mo.getRDDHandle().isCheckpointRDD() || !mo.isCached(false)) )
- {
- //return existing rdd handling (w/o input format change)
- rdd = mo.getRDDHandle().getRDD();
- }
- //CASE 2: dirty in memory data or cached result of rdd operations
- else if( mo.isDirty() || mo.isCached(false) )
- {
- //get in-memory matrix block and parallelize it
- //w/ guarded parallelize (fallback to export, rdd from file if too large)
- boolean fromFile = false;
- if( !OptimizerUtils.checkSparkCollectMemoryBudget(mo.getMatrixCharacteristics(), 0) ) {
- if( mo.isDirty() ) { //write only if necessary
- mo.exportData();
- }
- rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
- rdd = ((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd).mapToPair( new CopyBlockPairFunction() ); //cp is workaround for read bug
- fromFile = true;
- }
- else { //default case
- MatrixBlock mb = mo.acquireRead(); //pin matrix in memory
- rdd = toJavaPairRDD(getSparkContext(), mb, (int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock());
- mo.release(); //unpin matrix
- }
-
- //keep rdd handle for future operations on it
- RDDObject rddhandle = new RDDObject(rdd, mo.getVarName());
- rddhandle.setHDFSFile(fromFile);
- mo.setRDDHandle(rddhandle);
- }
- //CASE 3: non-dirty (file exists on HDFS)
- else
- {
- // parallelize hdfs-resident file
- // For binary block, these are: SequenceFileInputFormat.class, MatrixIndexes.class, MatrixBlock.class
- if(inputInfo == InputInfo.BinaryBlockInputInfo) {
- rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
- //note: this copy is still required in Spark 1.4 because spark hands out whatever the inputformat
- //recordreader returns; the javadoc explicitly recommend to copy all key/value pairs
- rdd = ((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd).mapToPair( new CopyBlockPairFunction() ); //cp is workaround for read bug
- }
- else if(inputInfo == InputInfo.TextCellInputInfo || inputInfo == InputInfo.CSVInputInfo || inputInfo == InputInfo.MatrixMarketInputInfo) {
- rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
- rdd = ((JavaPairRDD<LongWritable, Text>)rdd).mapToPair( new CopyTextInputFunction() ); //cp is workaround for read bug
- }
- else if(inputInfo == InputInfo.BinaryCellInputInfo) {
- rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
- rdd = ((JavaPairRDD<MatrixIndexes, MatrixCell>)rdd).mapToPair( new CopyBinaryCellFunction() ); //cp is workaround for read bug
- }
- else {
- throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable");
- }
-
- //keep rdd handle for future operations on it
- RDDObject rddhandle = new RDDObject(rdd, mo.getVarName());
- rddhandle.setHDFSFile(true);
- mo.setRDDHandle(rddhandle);
- }
-
- return rdd;
- }
-
- /**
- * TODO So far we only create broadcast variables but never destroy
- * them. This is a memory leak which might lead to executor out-of-memory.
- * However, in order to handle this, we need to keep track when broadcast
- * variables are no longer required.
- *
- * @param varname
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- @SuppressWarnings("unchecked")
- public PartitionedBroadcastMatrix getBroadcastForVariable( String varname )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- MatrixObject mo = getMatrixObject(varname);
-
- PartitionedBroadcastMatrix bret = null;
-
- if( mo.getBroadcastHandle()!=null
- && mo.getBroadcastHandle().isValid() )
- {
- //reuse existing broadcast handle
- bret = mo.getBroadcastHandle().getBroadcast();
- }
- else
- {
- //obtain meta data for matrix
- int brlen = (int) mo.getNumRowsPerBlock();
- int bclen = (int) mo.getNumColumnsPerBlock();
-
- //create partitioned matrix block and release memory consumed by input
- MatrixBlock mb = mo.acquireRead();
- PartitionedMatrixBlock pmb = new PartitionedMatrixBlock(mb, brlen, bclen);
- mo.release();
-
- //determine coarse-grained partitioning
- int numPerPart = PartitionedBroadcastMatrix.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), brlen, bclen);
- int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
- Broadcast<PartitionedMatrixBlock>[] ret = new Broadcast[numParts];
-
- //create coarse-grained partitioned broadcasts
- if( numParts > 1 ) {
- for( int i=0; i<numParts; i++ ) {
- int offset = i * numPerPart;
- int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
- PartitionedMatrixBlock tmp = pmb.createPartition(offset, numBlks);
- ret[i] = getSparkContext().broadcast(tmp);
- }
- }
- else { //single partition
- ret[0] = getSparkContext().broadcast( pmb);
- }
-
- bret = new PartitionedBroadcastMatrix(ret);
- BroadcastObject bchandle = new BroadcastObject(bret, varname);
- mo.setBroadcastHandle(bchandle);
- }
-
- return bret;
- }
-
- /**
- *
- * @param varname
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- public BlockPartitioner getPartitionerForRDDVariable(String varname)
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- //get input rdd and matrix characteristics
- JavaPairRDD<MatrixIndexes,MatrixBlock> in = getBinaryBlockRDDHandleForVariable(varname);
- MatrixCharacteristics mc = getMatrixCharacteristics(varname);
-
- //create tile-based matrix partitioner
- return new BlockPartitioner(mc, in.partitions().size());
- }
-
- /**
- * Keep the output rdd of spark rdd operations as meta data of matrix objects in the
- * symbol table.
- *
- * Spark instructions should call this for all matrix outputs.
- *
- *
- * @param varname
- * @param rdd
- * @throws DMLRuntimeException
- */
- public void setRDDHandleForVariable(String varname, JavaPairRDD<MatrixIndexes,?> rdd)
- throws DMLRuntimeException
- {
- MatrixObject mo = getMatrixObject(varname);
- RDDObject rddhandle = new RDDObject(rdd, varname);
- mo.setRDDHandle( rddhandle );
- }
-
- /**
- * Utility method for creating an RDD out of an in-memory matrix block.
- *
- * @param sc
- * @param block
- * @return
- * @throws DMLUnsupportedOperationException
- * @throws DMLRuntimeException
- */
- public static JavaPairRDD<MatrixIndexes,MatrixBlock> toJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen)
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- LinkedList<Tuple2<MatrixIndexes,MatrixBlock>> list = new LinkedList<Tuple2<MatrixIndexes,MatrixBlock>>();
-
- if( src.getNumRows() <= brlen
- && src.getNumColumns() <= bclen )
- {
- list.addLast(new Tuple2<MatrixIndexes,MatrixBlock>(new MatrixIndexes(1,1), src));
- }
- else
- {
- boolean sparse = src.isInSparseFormat();
-
- //create and write subblocks of matrix
- for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++)
- for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++)
- {
- int maxRow = (blockRow*brlen + brlen < src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen;
- int maxCol = (blockCol*bclen + bclen < src.getNumColumns()) ? bclen : src.getNumColumns() - blockCol*bclen;
-
- MatrixBlock block = new MatrixBlock(maxRow, maxCol, sparse);
-
- int row_offset = blockRow*brlen;
- int col_offset = blockCol*bclen;
-
- //copy submatrix to block
- src.sliceOperations( row_offset, row_offset+maxRow-1,
- col_offset, col_offset+maxCol-1, block );
-
- //append block to sequence file
- MatrixIndexes indexes = new MatrixIndexes(blockRow+1, blockCol+1);
- list.addLast(new Tuple2<MatrixIndexes,MatrixBlock>(indexes, block));
- }
- }
-
- return sc.parallelizePairs(list);
- }
-
- /**
- * This method is a generic abstraction for calls from the buffer pool.
- * See toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int numRows, int numCols);
- *
- * @param rdd
- * @param numRows
- * @param numCols
- * @return
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("unchecked")
- public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int brlen, int bclen, long nnz)
- throws DMLRuntimeException
- {
- return toMatrixBlock(
- (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(),
- rlen, clen, brlen, bclen, nnz);
- }
-
- /**
- * Utility method for creating a single matrix block out of an RDD. Note that this collect call
- * might trigger execution of any pending transformations.
- *
- * NOTE: This is an unguarded utility function, which requires memory for both the output matrix
- * and its collected, blocked representation.
- *
- * @param rdd
- * @param numRows
- * @param numCols
- * @return
- * @throws DMLRuntimeException
- */
- public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz)
- throws DMLRuntimeException
- {
- MatrixBlock out = null;
-
- if( rlen <= brlen && clen <= bclen ) //SINGLE BLOCK
- {
- //special case without copy and nnz maintenance
- List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
- if( list.size()>1 )
- throw new DMLRuntimeException("Expecting no more than one result block.");
- else if( list.size()==1 )
- out = list.get(0)._2();
- else //empty (e.g., after ops w/ outputEmpty=false)
- out = new MatrixBlock(rlen, clen, true);
- }
- else //MULTIPLE BLOCKS
- {
- //determine target sparse/dense representation
- long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
- boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
-
- //create output matrix block (w/ lazy allocation)
- out = new MatrixBlock(rlen, clen, sparse);
- List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-
- //copy blocks one-at-a-time into output matrix block
- for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
- {
- //unpack index-block pair
- MatrixIndexes ix = keyval._1();
- MatrixBlock block = keyval._2();
-
- //compute row/column block offsets
- int row_offset = (int)(ix.getRowIndex()-1)*brlen;
- int col_offset = (int)(ix.getColumnIndex()-1)*bclen;
- int rows = block.getNumRows();
- int cols = block.getNumColumns();
-
- if( sparse ) { //SPARSE OUTPUT
- //append block to sparse target in order to avoid shifting
- //note: this append requires a final sort of sparse rows
- out.appendToSparse(block, row_offset, col_offset);
- }
- else { //DENSE OUTPUT
- out.copy( row_offset, row_offset+rows-1,
- col_offset, col_offset+cols-1, block, false );
- }
- }
-
- //post-processing output matrix
- if( sparse )
- out.sortSparseRows();
- out.recomputeNonZeros();
- out.examSparsity();
- }
-
- return out;
- }
-
- /**
- *
- * @param rdd
- * @param rlen
- * @param clen
- * @param brlen
- * @param bclen
- * @param nnz
- * @return
- * @throws DMLRuntimeException
- */
- public static PartitionedMatrixBlock toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz)
- throws DMLRuntimeException
- {
- PartitionedMatrixBlock out = new PartitionedMatrixBlock(rlen, clen, brlen, bclen);
-
- List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-
- //copy blocks one-at-a-time into output matrix block
- for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
- {
- //unpack index-block pair
- MatrixIndexes ix = keyval._1();
- MatrixBlock block = keyval._2();
- out.setMatrixBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex(), block);
- }
-
- return out;
- }
-
- /**
- *
- * @param rdd
- * @param oinfo
- */
- @SuppressWarnings("unchecked")
- public static long writeRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo )
- {
- JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD();
-
- //recompute nnz
- long nnz = SparkUtils.computeNNZFromBlocks(lrdd);
-
- //save file is an action which also triggers nnz maintenance
- lrdd.saveAsHadoopFile(path,
- oinfo.outputKeyClass,
- oinfo.outputValueClass,
- oinfo.outputFormatClass);
-
- //return nnz aggregate of all blocks
- return nnz;
- }
-
-
- /**
- * Returns the available memory budget for broadcast variables in bytes.
- * In detail, this takes into account the total executor memory as well
- * as relative ratios for data and shuffle. Note, that this is a conservative
- * estimate since both data memory and shuffle memory might not be fully
- * utilized.
- *
- * @return
- */
- public static double getBroadcastMemoryBudget()
- {
- if( _memExecutors < 0 || _memRatioData < 0 || _memRatioShuffle < 0 )
- analyzeSparkConfiguation();
-
- //70% of remaining free memory
- double membudget = OptimizerUtils.MEM_UTIL_FACTOR *
- ( _memExecutors
- - _memExecutors*(_memRatioData+_memRatioShuffle) );
-
- return membudget;
- }
-
- /**
- *
- * @return
- */
- public static double getConfiguredTotalDataMemory() {
- return getConfiguredTotalDataMemory(false);
- }
-
- /**
- *
- * @param refresh
- * @return
- */
- public static double getConfiguredTotalDataMemory(boolean refresh)
- {
- if( _memExecutors < 0 || _memRatioData < 0 )
- analyzeSparkConfiguation();
-
- //always get the current num executors on refresh because this might
- //change if not all executors are initially allocated and it is plan-relevant
- if( refresh && !_confOnly ) {
- JavaSparkContext jsc = getSparkContextStatic();
- int numExec = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
- return _memExecutors * _memRatioData * numExec;
- }
- else
- return ( _memExecutors * _memRatioData * _numExecutors );
- }
-
- public static int getNumExecutors()
- {
- if( _numExecutors < 0 )
- analyzeSparkConfiguation();
-
- return _numExecutors;
- }
-
- public static int getDefaultParallelism() {
- return getDefaultParallelism(false);
- }
-
- /**
- *
- * @return
- */
- public static int getDefaultParallelism(boolean refresh)
- {
- if( _defaultPar < 0 && !refresh )
- analyzeSparkConfiguation();
-
- //always get the current default parallelism on refresh because this might
- //change if not all executors are initially allocated and it is plan-relevant
- if( refresh && !_confOnly )
- return getSparkContextStatic().defaultParallelism();
- else
- return _defaultPar;
- }
-
- /**
- *
- */
- public static void analyzeSparkConfiguation()
- {
- SparkConf sconf = new SparkConf();
-
- //parse absolute executor memory
- String tmp = sconf.get("spark.executor.memory", "512m");
- if ( tmp.endsWith("g") || tmp.endsWith("G") )
- _memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024 * 1024 * 1024;
- else if ( tmp.endsWith("m") || tmp.endsWith("M") )
- _memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024 * 1024;
- else if( tmp.endsWith("k") || tmp.endsWith("K") )
- _memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024;
- else
- _memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-2));
-
- //get data and shuffle memory ratios (defaults not specified in job conf)
- _memRatioData = sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60%
- _memRatioShuffle = sconf.getDouble("spark.shuffle.memoryFraction", 0.2); //default 20%
-
- int numExecutors = sconf.getInt("spark.executor.instances", -1);
- int numCoresPerExec = sconf.getInt("spark.executor.cores", -1);
- int defaultPar = sconf.getInt("spark.default.parallelism", -1);
-
- if( numExecutors > 1 && (defaultPar > 1 || numCoresPerExec > 1) ) {
- _numExecutors = numExecutors;
- _defaultPar = (defaultPar>1) ? defaultPar : numExecutors * numCoresPerExec;
- _confOnly = true;
- }
- else {
- //get default parallelism (total number of executors and cores)
- //note: spark context provides this information while conf does not
- //(for num executors we need to correct for driver and local mode)
- JavaSparkContext jsc = getSparkContextStatic();
- _numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
- _defaultPar = jsc.defaultParallelism();
- _confOnly = false; //implies env info refresh w/ spark context
- }
-
- //note: required time for infrastructure analysis on 5 node cluster: ~5-20ms.
- }
-
- /**
- *
- */
- public void checkAndRaiseValidationWarningJDKVersion()
- {
- //check for jdk version less than jdk8
- boolean isLtJDK8 = InfrastructureAnalyzer.isJavaVersionLessThanJDK8();
-
- //check multi-threaded executors
- int numExecutors = getNumExecutors();
- int numCores = getDefaultParallelism();
- boolean multiThreaded = (numCores > numExecutors);
-
- //check for jdk version less than 8 (and raise warning if multi-threaded)
- if( isLtJDK8 && multiThreaded)
- {
- //get the jre version
- String version = System.getProperty("java.version");
-
- LOG.warn("########################################################################################");
- LOG.warn("### WARNING: Multi-threaded text reblock may lead to thread contention on JRE < 1.8 ####");
- LOG.warn("### java.version = " + version);
- LOG.warn("### total number of executors = " + numExecutors);
- LOG.warn("### total number of cores = " + numCores);
- LOG.warn("### JDK-7032154: Performance tuning of sun.misc.FloatingDecimal/FormattedFloatingDecimal");
- LOG.warn("### Workaround: Convert text to binary w/ changed configuration of one executor per core");
- LOG.warn("########################################################################################");
- }
- }
-
- ///////////////////////////////////////////
- // Cleanup of RDDs and Broadcast variables
- ///////
-
- /**
- * Adds a child rdd object to the lineage of a parent rdd.
- *
- * @param varParent
- * @param varChild
- * @throws DMLRuntimeException
- */
- public void addLineageRDD(String varParent, String varChild)
- throws DMLRuntimeException
- {
- RDDObject parent = getMatrixObject(varParent).getRDDHandle();
- RDDObject child = getMatrixObject(varChild).getRDDHandle();
-
- parent.addLineageChild( child );
- }
-
- /**
- * Adds a child broadcast object to the lineage of a parent rdd.
- *
- * @param varParent
- * @param varChild
- * @throws DMLRuntimeException
- */
- public void addLineageBroadcast(String varParent, String varChild)
- throws DMLRuntimeException
- {
- RDDObject parent = getMatrixObject(varParent).getRDDHandle();
- BroadcastObject child = getMatrixObject(varChild).getBroadcastHandle();
-
- parent.addLineageChild( child );
- }
-
- @Override
- public void cleanupMatrixObject( MatrixObject mo )
- throws DMLRuntimeException
- {
- //NOTE: this method overwrites the default behavior of cleanupMatrixObject
- //and hence is transparently used by rmvar instructions and other users. The
- //core difference is the lineage-based cleanup of RDD and broadcast variables.
-
- try
- {
- if ( mo.isCleanupEnabled() )
- {
- //compute ref count only if matrix cleanup actually necessary
- if ( !getVariables().hasReferences(mo) )
- {
- //clean cached data
- mo.clearData();
-
- //clean hdfs data if no pending rdd operations on it
- if( mo.isFileExists() && mo.getFileName()!=null ) {
- if( mo.getRDDHandle()==null ) {
- MapReduceTool.deleteFileWithMTDIfExistOnHDFS(mo.getFileName());
- }
- else { //deferred file removal
- RDDObject rdd = mo.getRDDHandle();
- rdd.setHDFSFilename(mo.getFileName());
- }
- }
-
- //cleanup RDD and broadcast variables (recursive)
- //note: requires that mo.clearData already removed back references
- if( mo.getRDDHandle()!=null ) {
- rCleanupLineageObject(mo.getRDDHandle());
- }
- if( mo.getBroadcastHandle()!=null ) {
- rCleanupLineageObject(mo.getBroadcastHandle());
- }
- }
- }
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
- }
-
- /**
- *
- * @param lob
- * @throws IOException
- */
- private void rCleanupLineageObject(LineageObject lob)
- throws IOException
- {
- //abort recursive cleanup if still consumers
- if( lob.getNumReferences() > 0 )
- return;
-
- //abort if still reachable through matrix object (via back references for
- //robustness in function calls and to prevent repeated scans of the symbol table)
- if( lob.hasBackReference() )
- return;
-
- //cleanup current lineage object (from driver/executors)
- //incl deferred hdfs file removal (only if metadata set by cleanup call)
- if( lob instanceof RDDObject ) {
- RDDObject rdd = (RDDObject)lob;
- cleanupRDDVariable(rdd.getRDD());
- if( rdd.getHDFSFilename()!=null ) { //deferred file removal
- MapReduceTool.deleteFileWithMTDIfExistOnHDFS(rdd.getHDFSFilename());
- }
- }
- else if( lob instanceof BroadcastObject ) {
- PartitionedBroadcastMatrix pbm = ((BroadcastObject)lob).getBroadcast();
- for( Broadcast<PartitionedMatrixBlock> bc : pbm.getBroadcasts() )
- cleanupBroadcastVariable(bc);
- }
-
- //recursively process lineage children
- for( LineageObject c : lob.getLineageChilds() ){
- c.decrementNumReferences();
- rCleanupLineageObject(c);
- }
- }
-
- /**
- * This call destroys a broadcast variable at all executors and the driver.
- * Hence, it is intended to be used on rmvar only. Depending on the
- * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
- *
- *
- * @param inV
- */
- public void cleanupBroadcastVariable(Broadcast<?> bvar)
- {
- //in comparison to 'unpersist' (which would only delete the broadcast from the executors),
- //this call also deletes related data from the driver.
- if( bvar.isValid() ) {
- bvar.destroy( ASYNCHRONOUS_VAR_DESTROY );
- }
- }
-
- /**
- * This call removes an rdd variable from executor memory and disk if required.
- * Hence, it is intended to be used on rmvar only. Depending on the
- * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
- *
- * @param rvar
- */
- public void cleanupRDDVariable(JavaPairRDD<?,?> rvar)
- {
- if( rvar.getStorageLevel()!=StorageLevel.NONE() ) {
- rvar.unpersist( ASYNCHRONOUS_VAR_DESTROY );
- }
- }
-
- /**
- *
- * @param var
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- @SuppressWarnings("unchecked")
- public void repartitionAndCacheMatrixObject( String var )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- //get input rdd and default storage level
- MatrixObject mo = getMatrixObject(var);
- MatrixCharacteristics mcIn = mo.getMatrixCharacteristics();
- JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
- getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
-
- //avoid unnecessary caching of input in order to reduce memory pressure
- if( mo.getRDDHandle().allowsShortCircuitRead()
- && isRDDMarkedForCaching(in.id()) && !isRDDCached(in.id()) ) {
- in = (JavaPairRDD<MatrixIndexes,MatrixBlock>)
- ((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
-
- //investigate issue of unnecessarily large number of partitions
- int numPartitions = CheckpointSPInstruction.getNumCoalescePartitions(mcIn, in);
- if( numPartitions < in.partitions().size() )
- in = in.coalesce( numPartitions );
- }
-
- //repartition and persist rdd (force creation of shuffled rdd via merge)
- JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in);
- out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL )
- .count(); //trigger caching to prevent contention
-
- //create new rdd handle, in-place of current matrix object
- RDDObject inro = mo.getRDDHandle(); //guaranteed to exist (see above)
- RDDObject outro = new RDDObject(out, var); //create new rdd object
- outro.setCheckpointRDD(true); //mark as checkpointed
- outro.addLineageChild(inro); //keep lineage to prevent cycles on cleanup
- mo.setRDDHandle(outro);
- }
-
- /**
- *
- * @param var
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- @SuppressWarnings("unchecked")
- public void cacheMatrixObject( String var )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- //get input rdd and default storage level
- MatrixObject mo = getMatrixObject(var);
- JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
- getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
-
- //persist rdd (force rdd caching)
- in.count(); //trigger caching to prevent contention
- }
-
- /**
- *
- * @param poolName
- */
- public void setThreadLocalSchedulerPool(String poolName) {
- if( FAIR_SCHEDULER_MODE ) {
- getSparkContext().sc().setLocalProperty(
- "spark.scheduler.pool", poolName);
- }
- }
-
- /**
- *
- */
- public void cleanupThreadLocalSchedulerPool() {
- if( FAIR_SCHEDULER_MODE ) {
- getSparkContext().sc().setLocalProperty(
- "spark.scheduler.pool", null);
- }
- }
-
- /**
- *
- * @param rddID
- * @return
- */
- private boolean isRDDMarkedForCaching( int rddID ) {
- JavaSparkContext jsc = getSparkContext();
- return jsc.sc().getPersistentRDDs().contains(rddID);
- }
-
- /**
- *
- * @param rddID
- * @return
- */
- private boolean isRDDCached( int rddID ) {
- //check that rdd is marked for caching
- JavaSparkContext jsc = getSparkContext();
- if( !jsc.sc().getPersistentRDDs().contains(rddID) ) {
- return false;
- }
-
- //check that rdd is actually already cached
- for( RDDInfo info : jsc.sc().getRDDStorageInfo() ) {
- if( info.id() == rddID )
- return info.isCached();
- }
- return false;
- }
-
- ///////////////////////////////////////////
- // Debug String Handling (see explain); TODO to be removed
- ///////
-
- /**
- *
- * @param inst
- * @param outputVarName
- * @throws DMLRuntimeException
- */
- public void setDebugString(SPInstruction inst, String outputVarName)
- throws DMLRuntimeException
- {
- RDDObject parentLineage = getMatrixObject(outputVarName).getRDDHandle();
-
- if( parentLineage == null || parentLineage.getRDD() == null )
- return;
-
- MLContextProxy.addRDDForInstructionForMonitoring(inst, parentLineage.getRDD().id());
-
- JavaPairRDD<?, ?> out = parentLineage.getRDD();
- JavaPairRDD<?, ?> in1 = null;
- JavaPairRDD<?, ?> in2 = null;
- String input1VarName = null;
- String input2VarName = null;
- if(parentLineage.getLineageChilds() != null) {
- for(LineageObject child : parentLineage.getLineageChilds()) {
- if(child instanceof RDDObject) {
- if(in1 == null) {
- in1 = ((RDDObject) child).getRDD();
- input1VarName = child.getVarName();
- }
- else if(in2 == null) {
- in2 = ((RDDObject) child).getRDD();
- input2VarName = child.getVarName();
- }
- else {
- throw new DMLRuntimeException("PRINT_EXPLAIN_WITH_LINEAGE not yet supported for three outputs");
- }
- }
- }
- }
- setLineageInfoForExplain(inst, out, in1, input1VarName, in2, input2VarName);
- }
-
- // The most expensive operation here is rdd.toDebugString() which can be a major hit because
- // of unrolling lazy evaluation of Spark. Hence, it is guarded against it along with flag 'PRINT_EXPLAIN_WITH_LINEAGE' which is
- // enabled only through MLContext. This way, it doesnot affect our performance evaluation through non-MLContext path
- private void setLineageInfoForExplain(SPInstruction inst,
- JavaPairRDD<?, ?> out,
- JavaPairRDD<?, ?> in1, String in1Name,
- JavaPairRDD<?, ?> in2, String in2Name) throws DMLRuntimeException {
-
-
- // RDDInfo outInfo = org.apache.spark.storage.RDDInfo.fromRdd(out.rdd());
-
- // First fetch start lines from input RDDs
- String startLine1 = null;
- String startLine2 = null;
- int i1length = 0, i2length = 0;
- if(in1 != null) {
- String [] lines = in1.toDebugString().split("\\r?\\n");
- startLine1 = SparkUtils.getStartLineFromSparkDebugInfo(lines[0]); // lines[0].substring(4, lines[0].length());
- i1length = lines.length;
- }
- if(in2 != null) {
- String [] lines = in2.toDebugString().split("\\r?\\n");
- startLine2 = SparkUtils.getStartLineFromSparkDebugInfo(lines[0]); // lines[0].substring(4, lines[0].length());
- i2length = lines.length;
- }
-
- String outDebugString = "";
- int skip = 0;
-
- // Now process output RDD and replace inputRDD debug string by the matrix variable name
- String [] outLines = out.toDebugString().split("\\r?\\n");
- for(int i = 0; i < outLines.length; i++) {
- if(skip > 0) {
- skip--;
- // outDebugString += "\nSKIP:" + outLines[i];
- }
- else if(startLine1 != null && outLines[i].contains(startLine1)) {
- String prefix = SparkUtils.getPrefixFromSparkDebugInfo(outLines[i]); // outLines[i].substring(0, outLines[i].length() - startLine1.length());
- outDebugString += "\n" + prefix + "[[" + in1Name + "]]";
- //outDebugString += "\n{" + prefix + "}[[" + in1Name + "]] => " + outLines[i];
- skip = i1length - 1;
- }
- else if(startLine2 != null && outLines[i].contains(startLine2)) {
- String prefix = SparkUtils.getPrefixFromSparkDebugInfo(outLines[i]); // outLines[i].substring(0, outLines[i].length() - startLine2.length());
- outDebugString += "\n" + prefix + "[[" + in2Name + "]]";
- skip = i2length - 1;
- }
- else {
- outDebugString += "\n" + outLines[i];
- }
- }
-
- MLContext mlContext = MLContextProxy.getActiveMLContext();
- if(mlContext != null && mlContext.getMonitoringUtil() != null) {
- mlContext.getMonitoringUtil().setLineageInfo(inst, outDebugString);
- }
- else {
- throw new DMLRuntimeException("The method setLineageInfoForExplain should be called only through MLContext");
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitioner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitioner.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitioner.java
deleted file mode 100644
index a5c05e0..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitioner.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.ibm.bi.dml.hops.Hop;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixFormatMetaData;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-
-
-/**
- * This is the base class for all data partitioner.
- *
- */
-public abstract class DataPartitioner
-{
-
- protected static final Log LOG = LogFactory.getLog(DataPartitioner.class.getName());
-
- //note: the following value has been empirically determined but might change in the future,
- //MatrixBlockDSM.SPARCITY_TURN_POINT (with 0.4) was too high because we create 3-4 values per nnz and
- //have some computation overhead for binary cell.
- protected static final double SPARSITY_CELL_THRESHOLD = 0.1d;
-
- protected static final String NAME_SUFFIX = "_dp";
-
- //instance variables
- protected PDataPartitionFormat _format = null;
- protected int _n = -1; //blocksize if applicable
- protected boolean _allowBinarycell = true;
-
- protected DataPartitioner( PDataPartitionFormat dpf, int n )
- {
- _format = dpf;
- _n = n;
- }
-
-
-
- /**
- *
- * @param in
- * @param fnameNew
- * @return
- * @throws DMLRuntimeException
- */
- public MatrixObject createPartitionedMatrixObject( MatrixObject in, String fnameNew )
- throws DMLRuntimeException
- {
- return createPartitionedMatrixObject(in, fnameNew, false);
- }
-
- /**
- *
- * @param in
- * @param fnameNew
- * @param force
- * @return
- * @throws DMLRuntimeException
- */
- public MatrixObject createPartitionedMatrixObject( MatrixObject in, String fnameNew, boolean force )
- throws DMLRuntimeException
- {
- ValueType vt = in.getValueType();
- String varname = in.getVarName();
- MatrixObject out = new MatrixObject(vt, fnameNew );
- out.setDataType( DataType.MATRIX );
- out.setVarName( varname+NAME_SUFFIX );
-
- return createPartitionedMatrixObject(in, out, force);
- }
-
-
- /**
- * Creates a partitioned matrix object based on the given input matrix object,
- * according to the specified split format. The input matrix can be in-memory
- * or still on HDFS and the partitioned output matrix is written to HDFS. The
- * created matrix object can be used transparently for obtaining the full matrix
- * or reading 1 or multiple partitions based on given index ranges.
- *
- * @param in
- * @param force
- * @return
- * @throws DMLRuntimeException
- */
- public MatrixObject createPartitionedMatrixObject( MatrixObject in, MatrixObject out, boolean force )
- throws DMLRuntimeException
- {
- //check for naive partitioning
- if( _format == PDataPartitionFormat.NONE )
- return in;
-
- //analyze input matrix object
- MatrixFormatMetaData meta = (MatrixFormatMetaData)in.getMetaData();
- MatrixCharacteristics mc = meta.getMatrixCharacteristics();
- InputInfo ii = meta.getInputInfo();
- OutputInfo oi = meta.getOutputInfo();
- long rows = mc.getRows();
- long cols = mc.getCols();
- int brlen = mc.getRowsPerBlock();
- int bclen = mc.getColsPerBlock();
- long nonZeros = mc.getNonZeros();
- double sparsity = (nonZeros>=0 && rows>0 && cols>0)?
- ((double)nonZeros)/(rows*cols) : 1.0;
-
- if( !force ) //try to optimize, if format not forced
- {
- //check lower bound of useful data partitioning
- if( rows < Hop.CPThreshold && cols < Hop.CPThreshold ) //or matrix already fits in mem
- {
- return in;
- }
-
- //check for changing to blockwise representations
- if( _format == PDataPartitionFormat.ROW_WISE && cols < Hop.CPThreshold )
- {
- LOG.debug("Changing format from "+PDataPartitionFormat.ROW_WISE+" to "+PDataPartitionFormat.ROW_BLOCK_WISE+".");
- _format = PDataPartitionFormat.ROW_BLOCK_WISE;
- }
- if( _format == PDataPartitionFormat.COLUMN_WISE && rows < Hop.CPThreshold )
- {
- LOG.debug("Changing format from "+PDataPartitionFormat.COLUMN_WISE+" to "+PDataPartitionFormat.ROW_BLOCK_WISE+".");
- _format = PDataPartitionFormat.COLUMN_BLOCK_WISE;
- }
- //_format = PDataPartitionFormat.ROW_BLOCK_WISE_N;
- }
-
- //check changing to binarycell in case of sparse cols (robustness)
- boolean convertBlock2Cell = false;
- if( ii == InputInfo.BinaryBlockInputInfo
- && _allowBinarycell
- && _format == PDataPartitionFormat.COLUMN_WISE
- && sparsity < SPARSITY_CELL_THRESHOLD )
- {
- LOG.debug("Changing partition outputinfo from binaryblock to binarycell due to sparsity="+sparsity);
- oi = OutputInfo.BinaryCellOutputInfo;
- convertBlock2Cell = true;
- }
-
- //prepare filenames and cleanup if required
- String fnameNew = out.getFileName();
- try{
- MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
- }
- catch(Exception ex){
- throw new DMLRuntimeException( ex );
- }
-
- //core partitioning (depending on subclass)
- partitionMatrix( in, fnameNew, ii, oi, rows, cols, brlen, bclen );
-
- //create output matrix object
- out.setPartitioned( _format, _n );
-
- MatrixCharacteristics mcNew = new MatrixCharacteristics( rows, cols, (int)brlen, (int)bclen );
- mcNew.setNonZeros( nonZeros );
- if( convertBlock2Cell )
- ii = InputInfo.BinaryCellInputInfo;
- MatrixFormatMetaData metaNew = new MatrixFormatMetaData(mcNew,oi,ii);
- out.setMetaData(metaNew);
-
- return out;
-
- }
-
- /**
- *
- */
- public void disableBinaryCell()
- {
- _allowBinarycell = false;
- }
-
- /**
- *
- * @param fname
- * @param fnameNew
- * @param ii
- * @param oi
- * @param rlen
- * @param clen
- * @param brlen
- * @param bclen
- * @throws DMLRuntimeException
- */
- protected abstract void partitionMatrix( MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen )
- throws DMLRuntimeException;
-
-
- public static MatrixBlock createReuseMatrixBlock( PDataPartitionFormat dpf, int rows, int cols )
- {
- MatrixBlock tmp = null;
-
- switch( dpf )
- {
- case ROW_WISE:
- //default assumption sparse, but reset per input block anyway
- tmp = new MatrixBlock( 1, (int)cols, true, (int)(cols*0.1) );
- break;
- case COLUMN_WISE:
- //default dense because single column alwyas below SKINNY_MATRIX_TURN_POINT
- tmp = new MatrixBlock( (int)rows, 1, false );
- break;
- default:
- //do nothing
- }
-
- return tmp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerLocal.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerLocal.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerLocal.java
deleted file mode 100644
index b3a5075..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerLocal.java
+++ /dev/null
@@ -1,888 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.Cell;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.IDSequence;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.StagingFileUtils;
-import com.ibm.bi.dml.runtime.io.MatrixReader;
-import com.ibm.bi.dml.runtime.matrix.data.IJV;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.SparseRowsIterator;
-import com.ibm.bi.dml.runtime.util.FastStringTokenizer;
-import com.ibm.bi.dml.runtime.util.LocalFileUtils;
-
-/**
- * Partitions a given matrix into row or column partitions with a two pass-approach.
- * In the first phase the input matrix is read from HDFS and sorted into block partitions
- * in a staging area in the local file system according to the partition format.
- * In order to allow for scalable partitioning, we process one block at a time.
- * Furthermore, in the second phase, all blocks of a partition are append to a sequence file
- * on HDFS. Block-wise partitioning and write-once semantics of sequence files require the
- * indirection over the local staging area. For scalable computation, we process one
- * sequence file at a time.
- *
- * NOTE: For the resulting partitioned matrix, we store block and cell indexes wrt partition boundaries.
- * This means that the partitioned matrix CANNOT be read as a traditional matrix because there are
- * for example multiple blocks with same index (while the actual index is encoded in the path).
- * In order to enable full read of partition matrices, data converter would need to handle negative
- * row/col offsets for partitioned read. Currently not done in order to avoid overhead from normal read
- * and since partitioning only applied if exclusively indexed access.
- *
- *
- */
-public class DataPartitionerLocal extends DataPartitioner
-{
-
- private static final boolean PARALLEL = true;
-
- private IDSequence _seq = null;
- private MatrixBlock _reuseBlk = null;
-
- private int _par = -1;
-
- /**
- *
- * @param dpf
- * @param n
- * @param par -1 for serial otherwise number of threads, can be ignored by implementation
- * @throws DMLRuntimeException
- */
- public DataPartitionerLocal(PDataPartitionFormat dpf, int n, int par)
- throws DMLRuntimeException
- {
- super(dpf, n);
-
- //TODO
- if( dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N || dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N )
- throw new DMLRuntimeException("Data partitioning formt '"+dpf+"' not supported by DataPartitionerLocal" );
-
- _seq = new IDSequence();
- _par = (par > 0) ? par : 1;
- }
-
- @Override
- protected void partitionMatrix(MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen)
- throws DMLRuntimeException
- {
- //force writing to disk (typically not required since partitioning only applied if dataset exceeds CP size)
- in.exportData(); //written to disk iff dirty
-
- String fname = in.getFileName();
- String fnameStaging = LocalFileUtils.getUniqueWorkingDir( LocalFileUtils.CATEGORY_PARTITIONING );
-
- //reblock input matrix
- if( ii == InputInfo.TextCellInputInfo )
- partitionTextCell( fname, fnameStaging, fnameNew, rlen, clen, brlen, bclen );
- else if( ii == InputInfo.BinaryCellInputInfo )
- partitionBinaryCell( fname, fnameStaging, fnameNew, rlen, clen, brlen, bclen );
- else if( ii == InputInfo.BinaryBlockInputInfo )
- {
- if( oi == OutputInfo.BinaryBlockOutputInfo )
- partitionBinaryBlock( fname, fnameStaging, fnameNew, rlen, clen, brlen, bclen );
- else if ( oi == OutputInfo.BinaryCellOutputInfo )
- partitionBinaryBlock2BinaryCell( fname, fnameStaging, fnameNew, rlen, clen, brlen, bclen );
- }
- else
- throw new DMLRuntimeException("Cannot create data partitions of format: "+ii.toString());
-
- LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
- }
-
-
-
-
- /**
- *
- * @param fname
- * @param fnameStaging
- * @param fnameNew
- * @param brlen
- * @param bclen
- * @throws DMLRuntimeException
- */
- private void partitionTextCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen )
- throws DMLRuntimeException
- {
- long row = -1;
- long col = -1;
-
- try
- {
- //STEP 1: read matrix from HDFS and write blocks to local staging area
- //check and add input path
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path(fname);
- FileInputFormat.addInputPath(job, path);
- TextInputFormat informat = new TextInputFormat();
- informat.configure(job);
- InputSplit[] splits = informat.getSplits(job, 1);
-
- LinkedList<Cell> buffer = new LinkedList<Cell>();
- LongWritable key = new LongWritable();
- Text value = new Text();
- FastStringTokenizer st = new FastStringTokenizer(' ');
-
- for(InputSplit split: splits)
- {
- RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
- try
- {
- while(reader.next(key, value))
- {
- st.reset( value.toString() ); //reset tokenizer
- row = st.nextLong();
- col = st.nextLong();
- double lvalue = st.nextDouble();
- Cell tmp = new Cell( row, col, lvalue );
-
- buffer.addLast( tmp );
- if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
- {
- appendCellBufferToStagingArea(fnameStaging, buffer, brlen, bclen);
- buffer.clear();
- }
- }
-
- //final flush
- if( !buffer.isEmpty() )
- {
- appendCellBufferToStagingArea(fnameStaging, buffer, brlen, bclen);
- buffer.clear();
- }
- }
- finally
- {
- if( reader != null )
- reader.close();
- }
- }
-
- //STEP 2: read matrix blocks from staging area and write matrix to HDFS
- String[] fnamesPartitions = new File(fnameStaging).list();
- if(PARALLEL)
- {
- int len = Math.min(fnamesPartitions.length, _par);
- Thread[] threads = new Thread[len];
- for( int i=0;i<len;i++ )
- {
- int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
- int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
- end = Math.min(end, fnamesPartitions.length-1);
- threads[i] = new Thread(new DataPartitionerWorkerTextCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
- threads[i].start();
- }
-
- for( Thread t : threads )
- t.join();
- }
- else
- {
- for( String pdir : fnamesPartitions )
- writeTextCellFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir );
- }
- }
- catch (Exception e)
- {
- //post-mortem error handling and bounds checking
- if( row < 1 || row > rlen || col < 1 || col > clen )
- {
- throw new DMLRuntimeException("Matrix cell ["+(row)+","+(col)+"] " +
- "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
- }
- else
- throw new DMLRuntimeException("Unable to partition text cell matrix.", e);
- }
- }
-
- /**
- *
- * @param fname
- * @param fnameStaging
- * @param fnameNew
- * @param brlen
- * @param bclen
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("deprecation")
- private void partitionBinaryCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen )
- throws DMLRuntimeException
- {
- long row = -1;
- long col = -1;
-
- try
- {
- //STEP 1: read matrix from HDFS and write blocks to local staging area
- //check and add input path
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path(fname);
- FileSystem fs = FileSystem.get(job);
-
- //prepare sequence file reader, and write to local staging area
- LinkedList<Cell> buffer = new LinkedList<Cell>();
- MatrixIndexes key = new MatrixIndexes();
- MatrixCell value = new MatrixCell();
-
- for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
- {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
- try
- {
- while(reader.next(key, value))
- {
- row = key.getRowIndex();
- col = key.getColumnIndex();
- Cell tmp = new Cell( row, col, value.getValue() );
-
- buffer.addLast( tmp );
- if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
- {
- appendCellBufferToStagingArea(fnameStaging, buffer, brlen, bclen);
- buffer.clear();
- }
- }
-
- //final flush
- if( !buffer.isEmpty() )
- {
- appendCellBufferToStagingArea(fnameStaging, buffer, brlen, bclen);
- buffer.clear();
- }
- }
- finally
- {
- if( reader != null )
- reader.close();
- }
- }
-
- //STEP 2: read matrix blocks from staging area and write matrix to HDFS
- String[] fnamesPartitions = new File(fnameStaging).list();
- if(PARALLEL)
- {
- int len = Math.min(fnamesPartitions.length, _par);
- Thread[] threads = new Thread[len];
- for( int i=0;i<len;i++ )
- {
- int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
- int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
- end = Math.min(end, fnamesPartitions.length-1);
- threads[i] = new Thread(new DataPartitionerWorkerBinaryCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
- threads[i].start();
- }
-
- for( Thread t : threads )
- t.join();
- }
- else
- {
- for( String pdir : fnamesPartitions )
- writeBinaryCellSequenceFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir );
- }
- }
- catch (Exception e)
- {
- //post-mortem error handling and bounds checking
- if( row < 1 || row > rlen || col < 1 || col > clen )
- {
- throw new DMLRuntimeException("Matrix cell ["+(row)+","+(col)+"] " +
- "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
- }
- else
- throw new DMLRuntimeException("Unable to partition binary cell matrix.", e);
- }
- }
-
- /**
- *
- * @param fname
- * @param fnameStaging
- * @param fnameNew
- * @param brlen
- * @param bclen
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("deprecation")
- private void partitionBinaryBlock( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen )
- throws DMLRuntimeException
- {
- try
- {
- //create reuse object
- _reuseBlk = DataPartitioner.createReuseMatrixBlock(_format, brlen, bclen);
-
- //STEP 1: read matrix from HDFS and write blocks to local staging area
- //check and add input path
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path(fname);
- FileSystem fs = FileSystem.get(job);
-
- //prepare sequence file reader, and write to local staging area
- MatrixIndexes key = new MatrixIndexes();
- MatrixBlock value = new MatrixBlock();
-
- for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
- {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
- try
- {
- while(reader.next(key, value)) //for each block
- {
- long row_offset = (key.getRowIndex()-1)*brlen;
- long col_offset = (key.getColumnIndex()-1)*bclen;
- long rows = value.getNumRows();
- long cols = value.getNumColumns();
-
- //bound check per block
- if( row_offset + rows < 1 || row_offset + rows > rlen || col_offset + cols<1 || col_offset + cols > clen )
- {
- throw new IOException("Matrix block ["+(row_offset+1)+":"+(row_offset+rows)+","+(col_offset+1)+":"+(col_offset+cols)+"] " +
- "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
- }
-
- appendBlockToStagingArea(fnameStaging, value, row_offset, col_offset, brlen, bclen);
- }
- }
- finally
- {
- if( reader != null )
- reader.close();
- }
- }
-
- //STEP 2: read matrix blocks from staging area and write matrix to HDFS
- String[] fnamesPartitions = new File(fnameStaging).list();
- if(PARALLEL)
- {
- int len = Math.min(fnamesPartitions.length, _par);
- Thread[] threads = new Thread[len];
- for( int i=0;i<len;i++ )
- {
- int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
- int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
- end = Math.min(end, fnamesPartitions.length-1);
- threads[i] = new Thread(new DataPartitionerWorkerBinaryBlock(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
- threads[i].start();
- }
-
- for( Thread t : threads )
- t.join();
- }
- else
- {
- for( String pdir : fnamesPartitions )
- writeBinaryBlockSequenceFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir, false );
- }
- }
- catch (Exception e)
- {
- throw new DMLRuntimeException("Unable to partition binary block matrix.", e);
- }
- }
-
- /**
- *
- * @param fname
- * @param fnameStaging
- * @param fnameNew
- * @param brlen
- * @param bclen
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("deprecation")
- private void partitionBinaryBlock2BinaryCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen )
- throws DMLRuntimeException
- {
- try
- {
- //STEP 1: read matrix from HDFS and write blocks to local staging area
- //check and add input path
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path(fname);
- FileSystem fs = FileSystem.get(job);
-
- //prepare sequence file reader, and write to local staging area
- MatrixIndexes key = new MatrixIndexes();
- MatrixBlock value = new MatrixBlock();
-
- LinkedList<Cell> buffer = new LinkedList<Cell>();
-
- for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
- {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
- try
- {
- while(reader.next(key, value)) //for each block
- {
- long row_offset = (key.getRowIndex()-1)*brlen;
- long col_offset = (key.getColumnIndex()-1)*bclen;
- long rows = value.getNumRows();
- long cols = value.getNumColumns();
-
- //bound check per block
- if( row_offset + rows < 1 || row_offset + rows > rlen || col_offset + cols<1 || col_offset + cols > clen )
- {
- throw new IOException("Matrix block ["+(row_offset+1)+":"+(row_offset+rows)+","+(col_offset+1)+":"+(col_offset+cols)+"] " +
- "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
- }
-
- boolean sparse = value.isInSparseFormat();
- if( sparse ) //SPARSE
- {
- SparseRowsIterator iter = value.getSparseRowsIterator();
- while( iter.hasNext() )
- {
- IJV lcell = iter.next();
- Cell tmp = new Cell( row_offset + lcell.i + 1,
- col_offset + lcell.j + 1,
- lcell.v );
- buffer.addLast( tmp );
- }
- }
- else //DENSE
- {
- for( int i=0; i<rows; i++ )
- for( int j=0; j<cols; j++ )
- {
- double lvalue = value.getValueDenseUnsafe(i, j);
- if( lvalue != 0 ) //for nnz
- {
- Cell tmp = new Cell( row_offset + i + 1,
- col_offset + j + 1,
- lvalue );
- buffer.addLast( tmp );
- }
- }
- }
-
- appendCellBufferToStagingArea(fnameStaging, buffer, brlen, bclen);
- buffer.clear();
- }
- }
- finally
- {
- if( reader != null )
- reader.close();
- }
- }
-
- //STEP 2: read matrix blocks from staging area and write matrix to HDFS
- String[] fnamesPartitions = new File(fnameStaging).list();
- if(PARALLEL)
- {
- int len = Math.min(fnamesPartitions.length, _par);
- Thread[] threads = new Thread[len];
- for( int i=0;i<len;i++ )
- {
- int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
- int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
- end = Math.min(end, fnamesPartitions.length-1);
- threads[i] = new Thread(new DataPartitionerWorkerBinaryCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
- threads[i].start();
- }
-
- for( Thread t : threads )
- t.join();
- }
- else
- {
- for( String pdir : fnamesPartitions )
- writeBinaryCellSequenceFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir );
- }
- }
- catch (Exception e)
- {
- throw new DMLRuntimeException("Unable to partition binary block matrix.", e);
- }
- }
-
-
- /**
- *
- * @param dir
- * @param mb
- * @param row_offset
- * @param col_offset
- * @param brlen
- * @param bclen
- * @throws DMLRuntimeException
- * @throws IOException
- * @throws DMLUnsupportedOperationException
- */
- private void appendBlockToStagingArea( String dir, MatrixBlock mb, long row_offset, long col_offset, long brlen, long bclen )
- throws DMLRuntimeException, IOException, DMLUnsupportedOperationException
- {
- //NOTE: for temporary block we always create dense representations
- boolean sparse = mb.isInSparseFormat();
- long nnz = mb.getNonZeros();
- long rows = mb.getNumRows();
- long cols = mb.getNumColumns();
- double sparsity = ((double)nnz)/(rows*cols);
-
- if( _format == PDataPartitionFormat.ROW_WISE )
- {
- _reuseBlk.reset( 1, (int)cols, sparse, (int) (cols*sparsity) );
- for( int i=0; i<rows; i++ )
- {
- String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+(row_offset+1+i));
- String pfname = pdir+"/"+"block_"+(col_offset/bclen+1);
- mb.sliceOperations(i, i, 0, (int)(cols-1), _reuseBlk);
- LocalFileUtils.writeMatrixBlockToLocal(pfname, _reuseBlk);
- _reuseBlk.reset();
- }
- }
- else if( _format == PDataPartitionFormat.ROW_BLOCK_WISE )
- {
- String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+(row_offset/brlen+1));
- String pfname = pdir+"/"+"block_"+(col_offset/bclen+1);
- LocalFileUtils.writeMatrixBlockToLocal(pfname, mb);
- }
- else if( _format == PDataPartitionFormat.COLUMN_WISE )
- {
- //create object for reuse
- _reuseBlk.reset( (int)rows, 1, false );
-
- for( int i=0; i<cols; i++ )
- {
- String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+(col_offset+1+i));
- String pfname = pdir+"/"+"block_"+(row_offset/brlen+1);
- mb.sliceOperations(0, (int)(rows-1), i, i, _reuseBlk);
- LocalFileUtils.writeMatrixBlockToLocal(pfname, _reuseBlk);
- _reuseBlk.reset();
- }
- }
- else if( _format == PDataPartitionFormat.COLUMN_BLOCK_WISE )
- {
- String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+(col_offset/bclen+1));
- String pfname = pdir+"/"+"block_"+(row_offset/brlen+1);
- LocalFileUtils.writeMatrixBlockToLocal(pfname, mb);
- }
- }
-
- /**
- *
- * @param dir
- * @param buffer
- * @param brlen
- * @param bclen
- * @throws DMLRuntimeException
- * @throws IOException
- */
- private void appendCellBufferToStagingArea( String dir, LinkedList<Cell> buffer, int brlen, int bclen )
- throws DMLRuntimeException, IOException
- {
- HashMap<Long,LinkedList<Cell>> sortedBuffer = new HashMap<Long,LinkedList<Cell>>();
-
- //sort cells in buffer wrt key
- long key = -1;
- for( Cell c : buffer )
- {
- switch(_format)
- {
- case ROW_WISE:
- key = c.getRow();
- c.setRow(1);
- break;
- case ROW_BLOCK_WISE:
- key = (c.getRow()-1)/brlen+1;
- c.setRow((c.getRow()-1)%brlen+1);
- break;
- case COLUMN_WISE:
- key = c.getCol();
- c.setCol(1);
- break;
- case COLUMN_BLOCK_WISE:
- key = (c.getCol()-1)/bclen+1;
- c.setCol((c.getCol()-1)%bclen+1);
- break;
- default:
- //do nothing
- }
-
- if( !sortedBuffer.containsKey(key) )
- sortedBuffer.put(key, new LinkedList<Cell>());
- sortedBuffer.get(key).addLast(c);
- }
-
- //write lists of cells to local files
- for( Entry<Long,LinkedList<Cell>> e : sortedBuffer.entrySet() )
- {
- String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+e.getKey());
- String pfname = pdir+"/"+"block_"+_seq.getNextID();
- StagingFileUtils.writeCellListToLocal(pfname, e.getValue());
- }
- }
-
-
- /////////////////////////////////////
- // Helper methods for HDFS //
- // read/write in different formats //
- /////////////////////////////////////
-
- @SuppressWarnings("deprecation")
- public void writeBinaryBlockSequenceFileToHDFS( JobConf job, String dir, String lpdir, boolean threadsafe )
- throws IOException
- {
- long key = getKeyFromFilePath(lpdir);
- FileSystem fs = FileSystem.get(job);
- Path path = new Path(dir+"/"+key);
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class); //beware ca 50ms
-
- try
- {
- String[] fnameBlocks = new File( lpdir ).list();
- for( String fnameBlock : fnameBlocks )
- {
- long key2 = getKey2FromFileName(fnameBlock);
- MatrixBlock tmp = null;
- if( threadsafe )
- tmp = LocalFileUtils.readMatrixBlockFromLocal(lpdir+"/"+fnameBlock);
- else
- tmp = LocalFileUtils.readMatrixBlockFromLocal(lpdir+"/"+fnameBlock, _reuseBlk);
-
- if( _format == PDataPartitionFormat.ROW_WISE || _format == PDataPartitionFormat.ROW_BLOCK_WISE )
- {
- writer.append(new MatrixIndexes(1,key2), tmp);
- }
- else if( _format == PDataPartitionFormat.COLUMN_WISE || _format == PDataPartitionFormat.COLUMN_BLOCK_WISE )
- {
- writer.append(new MatrixIndexes(key2,1), tmp);
- }
- }
- }
- finally
- {
- if( writer != null )
- writer.close();
- }
- }
-
- @SuppressWarnings("deprecation")
- public void writeBinaryCellSequenceFileToHDFS( JobConf job, String dir, String lpdir )
- throws IOException
- {
- long key = getKeyFromFilePath(lpdir);
- FileSystem fs = FileSystem.get(job);
- Path path = new Path(dir+"/"+key);
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms
-
- try
- {
- MatrixIndexes indexes = new MatrixIndexes();
- MatrixCell cell = new MatrixCell();
-
- String[] fnameBlocks = new File( lpdir ).list();
- for( String fnameBlock : fnameBlocks )
- {
- LinkedList<Cell> tmp = StagingFileUtils.readCellListFromLocal(lpdir+"/"+fnameBlock);
- for( Cell c : tmp )
- {
- indexes.setIndexes(c.getRow(), c.getCol());
- cell.setValue(c.getValue());
- writer.append(indexes, cell);
- }
- }
- }
- finally
- {
- if( writer != null )
- writer.close();
- }
- }
-
- public void writeTextCellFileToHDFS( JobConf job, String dir, String lpdir )
- throws IOException
- {
- long key = getKeyFromFilePath(lpdir);
- FileSystem fs = FileSystem.get(job);
- Path path = new Path(dir+"/"+key);
- BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
- try
- {
- //for obj reuse and preventing repeated buffer re-allocations
- StringBuilder sb = new StringBuilder();
-
- String[] fnameBlocks = new File( lpdir ).list();
- for( String fnameBlock : fnameBlocks )
- {
- LinkedList<Cell> tmp = StagingFileUtils.readCellListFromLocal(lpdir+"/"+fnameBlock);
- for( Cell c : tmp )
- {
- sb.append(c.getRow());
- sb.append(' ');
- sb.append(c.getCol());
- sb.append(' ');
- sb.append(c.getValue());
- sb.append('\n');
- out.write( sb.toString() );
- sb.setLength(0);
- }
- }
- }
- finally
- {
- if( out != null )
- out.close();
- }
- }
-
-
- /////////////////////////////////
- // Helper methods for local fs //
- // read/write //
- /////////////////////////////////
-
- /**
- *
- * @param dir
- * @return
- */
- private long getKeyFromFilePath( String dir )
- {
- String[] dirparts = dir.split("/");
- long key = Long.parseLong( dirparts[dirparts.length-1] );
- return key;
- }
-
- /**
- *
- * @param fname
- * @return
- */
- private long getKey2FromFileName( String fname )
- {
- return Long.parseLong( fname.split("_")[1] );
- }
-
- private abstract class DataPartitionerWorker implements Runnable
- {
- private JobConf _job = null;
- private String _fnameNew = null;
- private String _fnameStaging = null;
- private String[] _fnamesPartitions = null;
- private int _start = -1;
- private int _end = -1;
-
- public DataPartitionerWorker(JobConf job, String fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end)
- {
- _job = job;
- _fnameNew = fnameNew;
- _fnameStaging = fnameStaging;
- _fnamesPartitions = fnamesPartitions;
- _start = start;
- _end = end;
- }
-
- @Override
- public void run()
- {
- //read each input if required
- try
- {
- for( int i=_start; i<=_end; i++ )
- {
- String pdir = _fnamesPartitions[i];
- writeFileToHDFS( _job, _fnameNew, _fnameStaging+"/"+pdir );
- }
- }
- catch(Exception ex)
- {
- throw new RuntimeException("Failed on parallel data partitioning.", ex);
- }
- }
-
- public abstract void writeFileToHDFS( JobConf job, String fnameNew, String stagingDir )
- throws IOException;
- }
-
- private class DataPartitionerWorkerTextCell extends DataPartitionerWorker
- {
- public DataPartitionerWorkerTextCell(JobConf job, String fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end)
- {
- super(job, fnameNew, fnameStaging, fnamesPartitions, start, end);
- }
-
- @Override
- public void writeFileToHDFS(JobConf job, String fnameNew, String stagingDir)
- throws IOException
- {
- writeTextCellFileToHDFS( job, fnameNew, stagingDir );
- }
- }
-
- private class DataPartitionerWorkerBinaryCell extends DataPartitionerWorker
- {
- public DataPartitionerWorkerBinaryCell(JobConf job, String fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end)
- {
- super(job, fnameNew, fnameStaging, fnamesPartitions, start, end);
- }
-
- @Override
- public void writeFileToHDFS(JobConf job, String fnameNew, String stagingDir)
- throws IOException
- {
- writeBinaryCellSequenceFileToHDFS( job, fnameNew, stagingDir );
- }
- }
-
- private class DataPartitionerWorkerBinaryBlock extends DataPartitionerWorker
- {
- public DataPartitionerWorkerBinaryBlock(JobConf job, String fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end)
- {
- super(job, fnameNew, fnameStaging, fnamesPartitions, start, end);
- }
-
- @Override
- public void writeFileToHDFS(JobConf job, String fnameNew, String stagingDir)
- throws IOException
- {
- writeBinaryBlockSequenceFileToHDFS( job, fnameNew, stagingDir, true );
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
deleted file mode 100644
index d0cb7fc..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.conf.DMLConfig;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.PairWritableBlock;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.PairWritableCell;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-import com.ibm.bi.dml.utils.Statistics;
-import com.ibm.bi.dml.yarn.DMLAppMasterUtils;
-
-/**
- * MR job class for submitting parfor remote partitioning MR jobs.
- *
- */
-public class DataPartitionerRemoteMR extends DataPartitioner
-{
-
- private long _pfid = -1;
- private int _numReducers = -1;
- private int _replication = -1;
- //private int _max_retry = -1;
- private boolean _jvmReuse = false;
- private boolean _keepIndexes = false;
-
-
- public DataPartitionerRemoteMR(PDataPartitionFormat dpf, int n, long pfid, int numReducers, int replication, int max_retry, boolean jvmReuse, boolean keepIndexes)
- {
- super(dpf, n);
-
- _pfid = pfid;
- _numReducers = numReducers;
- _replication = replication;
- //_max_retry = max_retry;
- _jvmReuse = jvmReuse;
- _keepIndexes = keepIndexes;
- }
-
-
- @Override
- protected void partitionMatrix(MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen)
- throws DMLRuntimeException
- {
- String jobname = "ParFor-DPMR";
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
- JobConf job;
- job = new JobConf( DataPartitionerRemoteMR.class );
- if( _pfid >= 0 ) //use in parfor
- job.setJobName(jobname+_pfid);
- else //use for partition instruction
- job.setJobName("Partition-MR");
-
- //maintain dml script counters
- Statistics.incrementNoOfCompiledMRJobs();
-
- try
- {
- //force writing to disk (typically not required since partitioning only applied if dataset exceeds CP size)
- in.exportData(); //written to disk iff dirty
-
- Path path = new Path(in.getFileName());
-
- /////
- //configure the MR job
- MRJobConfiguration.setPartitioningInfo(job, rlen, clen, brlen, bclen, ii, oi, _format, _n, fnameNew, _keepIndexes);
-
- //set mappers, reducers, combiners
- job.setMapperClass(DataPartitionerRemoteMapper.class);
- job.setReducerClass(DataPartitionerRemoteReducer.class);
-
- if( oi == OutputInfo.TextCellOutputInfo )
- {
- //binary cell intermediates for reduced IO
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(PairWritableCell.class);
- }
- else if( oi == OutputInfo.BinaryCellOutputInfo )
- {
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(PairWritableCell.class);
- }
- else if ( oi == OutputInfo.BinaryBlockOutputInfo )
- {
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(PairWritableBlock.class);
-
- //check Alignment
- if( (_format == PDataPartitionFormat.ROW_BLOCK_WISE_N && rlen>_n && _n % brlen !=0)
- || (_format == PDataPartitionFormat.COLUMN_BLOCK_WISE_N && clen>_n && _n % bclen !=0) )
- {
- throw new DMLRuntimeException("Data partitioning format "+_format+" requires aligned blocks.");
- }
- }
-
- //set input format
- job.setInputFormat(ii.inputFormatClass);
-
- //set the input path and output path
- FileInputFormat.setInputPaths(job, path);
-
- //set output path
- MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
- //FileOutputFormat.setOutputPath(job, pathNew);
- job.setOutputFormat(NullOutputFormat.class);
-
- //////
- //set optimization parameters
-
- //set the number of mappers and reducers
- //job.setNumMapTasks( _numMappers ); //use default num mappers
- long reducerGroups = -1;
- switch( _format )
- {
- case ROW_WISE: reducerGroups = rlen; break;
- case COLUMN_WISE: reducerGroups = clen; break;
- case ROW_BLOCK_WISE: reducerGroups = (rlen/brlen)+((rlen%brlen==0)?0:1); break;
- case COLUMN_BLOCK_WISE: reducerGroups = (clen/bclen)+((clen%bclen==0)?0:1); break;
- case ROW_BLOCK_WISE_N: reducerGroups = (rlen/_n)+((rlen%_n==0)?0:1); break;
- case COLUMN_BLOCK_WISE_N: reducerGroups = (clen/_n)+((clen%_n==0)?0:1); break;
- default:
- //do nothing
- }
- job.setNumReduceTasks( (int)Math.min( _numReducers, reducerGroups) );
-
- //use FLEX scheduler configuration properties
- /*if( ParForProgramBlock.USE_FLEX_SCHEDULER_CONF )
- {
- job.setInt("flex.map.min", 0);
- job.setInt("flex.map.max", _numMappers);
- job.setInt("flex.reduce.min", 0);
- job.setInt("flex.reduce.max", _numMappers);
- }*/
-
- //disable automatic tasks timeouts and speculative task exec
- job.setInt("mapred.task.timeout", 0);
- job.setMapSpeculativeExecution(false);
-
- //set up preferred custom serialization framework for binary block format
- if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
- MRJobConfiguration.addBinaryBlockSerializationFramework( job );
-
- //enables the reuse of JVMs (multiple tasks per MR task)
- if( _jvmReuse )
- job.setNumTasksToExecutePerJvm(-1); //unlimited
-
- //enables compression - not conclusive for different codecs (empirically good compression ratio, but significantly slower)
- //job.set("mapred.compress.map.output", "true");
- //job.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
-
- //set the replication factor for the results
- job.setInt("dfs.replication", _replication);
-
- //set up map/reduce memory configurations (if in AM context)
- DMLConfig config = ConfigurationManager.getConfig();
- DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, config);
-
- //set the max number of retries per map task
- // disabled job-level configuration to respect cluster configuration
- // note: this refers to hadoop2, hence it never had effect on mr1
- //job.setInt("mapreduce.map.maxattempts", _max_retry);
-
- //set unique working dir
- MRJobConfiguration.setUniqueWorkingDir(job);
-
- /////
- // execute the MR job
- JobClient.runJob(job);
-
- //maintain dml script counters
- Statistics.incrementNoOfExecutedMRJobs();
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
-
- if( DMLScript.STATISTICS && _pfid >= 0 ){
- long t1 = System.nanoTime(); //only for parfor
- Statistics.maintainCPHeavyHitters("MR-Job_"+jobname, t1-t0);
- }
- }
-
-}