You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:45:34 UTC
[03/78] [abbrv] [partial] incubator-systemml git commit: Move files
to new package folder structure
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
deleted file mode 100644
index 9d6f320..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.runtime.controlprogram.LocalVariableMap;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.caching.CacheStatistics;
-import com.ibm.bi.dml.runtime.controlprogram.caching.CacheableData;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.StatisticMonitor;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.IDHandler;
-import com.ibm.bi.dml.runtime.instructions.cp.Data;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.LocalFileUtils;
-import com.ibm.bi.dml.utils.Statistics;
-
-/**
- * Remote ParWorker implementation, realized as MR mapper.
- *
- * NOTE: In a cluster setup, reuse jvm will not lead to reusing jvms of different jobs or different
- * task types due to job-level specification of jvm max sizes for map/reduce
- *
- */
-public class RemoteParWorkerMapper extends ParWorker //MapReduceBase not required (no op implementations of configure, close)
- implements Mapper<LongWritable, Text, Writable, Writable>
-{
-
- //cache for future reuse (in case of JVM reuse)
- //NOTE: Hashmap to support multiple parfor MR jobs for local mode and if JVM reuse across jobs
- private static HashMap<String,RemoteParWorkerMapper> _sCache = null;
-
- //MR ParWorker attributes
- protected String _stringID = null;
- protected HashMap<String, String> _rvarFnames = null;
-
- static
- {
- //init cache (once per JVM)
- _sCache = new HashMap<String, RemoteParWorkerMapper>();
- }
-
-
- public RemoteParWorkerMapper( )
- {
- //only used if JVM reuse is enabled in order to ensure consistent output
- //filenames across tasks of one reused worker (preaggregation)
- _rvarFnames = new HashMap<String, String>();
- }
-
- /**
- *
- */
- @Override
- public void map(LongWritable key, Text value, OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException
- {
- LOG.trace("execute RemoteParWorkerMapper "+_stringID+" ("+_workerID+")");
-
- //state for jvm reuse and multiple iterations
- long numIters = getExecutedIterations();
-
- try
- {
- //parse input task
- Task lTask = Task.parseCompactString( value.toString() );
-
- //execute task (on error: re-try via Hadoop)
- executeTask( lTask );
-
- //write output if required (matrix indexed write)
- RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), _resultVars, _rvarFnames, out );
- }
- catch(Exception ex)
- {
- //throw IO exception to adhere to API specification
- throw new IOException("ParFOR: Failed to execute task.",ex);
- }
-
- //statistic maintenance
- RemoteParForUtils.incrementParForMRCounters(reporter, 1, getExecutedIterations()-numIters);
-
- //print heaver hitter per task
- JobConf job = ConfigurationManager.getCachedJobConf();
- if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
- LOG.info("\nSystemML Statistics:\nHeavy hitter instructions (name, time, count):\n" + Statistics.getHeavyHitters(10));
- }
-
- /**
- *
- */
- @Override
- public void configure(JobConf job)
- {
- boolean requiresConfigure = true;
- String jobID = job.get("mapred.job.id");
-
- //probe cache for existing worker (parfor body, symbol table, etc)
- if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER )
- {
- synchronized( _sCache ) //for multiple jobs in local mode
- {
- if( _sCache.containsKey(jobID) )
- {
- RemoteParWorkerMapper tmp = _sCache.get(jobID);
-
- _stringID = tmp._stringID;
- _workerID = tmp._workerID;
-
- _childBlocks = tmp._childBlocks;
- _resultVars = tmp._resultVars;
- _ec = tmp._ec;
-
- _numIters = tmp._numIters;
- _numTasks = tmp._numTasks;
-
- _rvarFnames = tmp._rvarFnames;
-
- requiresConfigure = false;
- }
- }
- }
-
- if( requiresConfigure )
- {
- LOG.trace("configure RemoteParWorkerMapper "+job.get("mapred.tip.id"));
-
- try
- {
- _stringID = job.get("mapred.tip.id"); //task ID
- _workerID = IDHandler.extractIntID(_stringID); //int task ID
-
- //use the given job configuration as source for all new job confs
- //NOTE: this is required because on HDP 2.3, the classpath of mr tasks contained hadoop-common.jar
- //which includes a core-default.xml configuration which hides the actual default cluster configuration
- //in the context of mr jobs (for example this config points to local fs instead of hdfs by default).
- if( !InfrastructureAnalyzer.isLocalMode(job) ) {
- ConfigurationManager.setCachedJobConf(job);
- }
-
- //create local runtime program
- String in = MRJobConfiguration.getProgramBlocks(job);
- ParForBody body = ProgramConverter.parseParForBody(in, (int)_workerID);
- _childBlocks = body.getChildBlocks();
- _ec = body.getEc();
- _resultVars = body.getResultVarNames();
-
- //init local cache manager
- if( !CacheableData.isCachingActive() ) {
- String uuid = IDHandler.createDistributedUniqueID();
- LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
- CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
- }
- if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
- CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID;
- }
-
- //ensure that resultvar files are not removed
- super.pinResultVariables();
-
- //enable/disable caching (if required)
- boolean cpCaching = MRJobConfiguration.getParforCachingConfig( job );
- if( !cpCaching )
- CacheableData.disableCaching();
-
- _numTasks = 0;
- _numIters = 0;
-
- }
- catch(Exception ex)
- {
- throw new RuntimeException(ex);
- }
-
- //disable stat monitoring, reporting execution times via counters not useful
- StatisticMonitor.disableStatMonitoring();
-
- //put into cache if required
- if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER )
- synchronized( _sCache ){ //for multiple jobs in local mode
- _sCache.put(jobID, this);
- }
- }
- else
- {
- LOG.trace("reuse configured RemoteParWorkerMapper "+_stringID);
- }
-
- //always reset stats because counters per map task (for case of JVM reuse)
- if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
- {
- CacheStatistics.reset();
- Statistics.reset();
- }
- }
-
- /**
- *
- */
- @Override
- public void close()
- throws IOException
- {
- //cleanup cache and local tmp dir
- RemoteParForUtils.cleanupWorkingDirectories();
-
- //change cache status for jvm_reuse (make empty allows us to
- //reuse in-memory objects if still present, re-load from HDFS
- //if evicted by garbage collector - without this approach, we
- //could not cleanup the local working dir, because this would
- //delete evicted matrices as well.
- if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER )
- {
- for( RemoteParWorkerMapper pw : _sCache.values() )
- {
- LocalVariableMap vars = pw._ec.getVariables();
- for( String varName : vars.keySet() )
- {
- Data dat = vars.get(varName);
- if( dat instanceof MatrixObject )
- ((MatrixObject)dat).setEmptyStatus();
- }
- }
- }
-
- //ensure caching is not disabled for CP in local mode
- CacheableData.enableCaching();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMerge.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMerge.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMerge.java
deleted file mode 100644
index c4ff3c5..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMerge.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-
-/**
- * Due to independence of all iterations, any result has the following properties:
- * (1) non local var, (2) matrix object, and (3) completely independent.
- * These properties allow us to realize result merging in parallel without any synchronization.
- *
- */
-public abstract class ResultMerge
-{
-
- protected static final Log LOG = LogFactory.getLog(ResultMerge.class.getName());
-
- protected static final String NAME_SUFFIX = "_rm";
-
- //inputs to result merge
- protected MatrixObject _output = null;
- protected MatrixObject[] _inputs = null;
- protected String _outputFName = null;
-
- protected ResultMerge( )
- {
-
- }
-
- public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename )
- {
- _output = out;
- _inputs = in;
- _outputFName = outputFilename;
- }
-
- /**
- * Merge all given input matrices sequentially into the given output matrix.
- * The required space in-memory is the size of the output matrix plus the size
- * of one input matrix at a time.
- *
- * @return output (merged) matrix
- * @throws DMLRuntimeException
- */
- public abstract MatrixObject executeSerialMerge()
- throws DMLRuntimeException;
-
- /**
- * Merge all given input matrices in parallel into the given output matrix.
- * The required space in-memory is the size of the output matrix plus the size
- * of all input matrices.
- *
- * @param par degree of parallelism
- * @return output (merged) matrix
- * @throws DMLRuntimeException
- */
- public abstract MatrixObject executeParallelMerge( int par )
- throws DMLRuntimeException;
-
- /**
- *
- * @param out initially empty block
- * @param in
- * @param appendOnly
- * @throws DMLRuntimeException
- */
- protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly )
- throws DMLRuntimeException
- {
- //pass through to matrix block operations
- out.merge(in, appendOnly);
- }
-
- /**
- * NOTE: append only not applicable for wiht compare because output must be populated with
- * initial state of matrix - with append, this would result in duplicates.
- *
- * @param out
- * @param in
- * @throws DMLRuntimeException
- */
- protected void mergeWithComp( MatrixBlock out, MatrixBlock in, double[][] compare )
- throws DMLRuntimeException
- {
- //Notes for result correctness:
- // * Always iterate over entire block in order to compare all values
- // (using sparse iterator would miss values set to 0)
- // * Explicit NaN awareness because for cases were original matrix contains
- // NaNs, since NaN != NaN, otherwise we would potentially overwrite results
-
- if( in.isInSparseFormat() ) //sparse input format
- {
- int rows = in.getNumRows();
- int cols = in.getNumColumns();
- for( int i=0; i<rows; i++ )
- for( int j=0; j<cols; j++ )
- {
- double value = in.getValueSparseUnsafe(i,j); //input value
- if( (value != compare[i][j] && !Double.isNaN(value) ) //for new values only (div)
- || Double.isNaN(value) != Double.isNaN(compare[i][j]) ) //NaN awareness
- {
- out.quickSetValue( i, j, value );
- }
- }
- }
- else //dense input format
- {
- //for a merge this case will seldom happen, as each input MatrixObject
- //has at most 1/numThreads of all values in it.
- int rows = in.getNumRows();
- int cols = in.getNumColumns();
- for( int i=0; i<rows; i++ )
- for( int j=0; j<cols; j++ )
- {
- double value = in.getValueDenseUnsafe(i,j); //input value
- if( (value != compare[i][j] && !Double.isNaN(value) ) //for new values only (div)
- || Double.isNaN(value) != Double.isNaN(compare[i][j]) ) //NaN awareness
- {
- out.quickSetValue( i, j, value );
- }
- }
- }
- }
-
- protected long computeNonZeros( MatrixObject out, ArrayList<MatrixObject> in )
- {
- MatrixCharacteristics mc = out.getMatrixCharacteristics();
- long outNNZ = mc.getNonZeros();
- long ret = outNNZ;
- for( MatrixObject tmp : in )
- {
- MatrixCharacteristics tmpmc = tmp.getMatrixCharacteristics();
- long inNNZ = tmpmc.getNonZeros();
- ret += (inNNZ - outNNZ);
- }
-
- return ret;
- }
-
- /**
- *
- * @param in
- * @return
- */
- protected ArrayList<MatrixObject> convertToList(MatrixObject[] in)
- {
- ArrayList<MatrixObject> ret = new ArrayList<MatrixObject>();
- for( MatrixObject mo : in )
- ret.add( mo );
-
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
deleted file mode 100644
index 745ae08..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-
-import com.ibm.bi.dml.hops.OptimizerUtils;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.opt.OptimizerRuleBased;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.Timing;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-
-/**
- *
- */
-public class ResultMergeLocalAutomatic extends ResultMerge
-{
-
- private ResultMerge _rm = null;
-
- public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, String outputFilename )
- {
- super( out, in, outputFilename );
- }
-
- @Override
- public MatrixObject executeSerialMerge()
- throws DMLRuntimeException
- {
- Timing time = new Timing(true);
-
- MatrixCharacteristics mc = _output.getMatrixCharacteristics();
- long rows = mc.getRows();
- long cols = mc.getCols();
-
- if( OptimizerRuleBased.isInMemoryResultMerge(rows, cols, OptimizerUtils.getLocalMemBudget()) )
- _rm = new ResultMergeLocalMemory( _output, _inputs, _outputFName );
- else
- _rm = new ResultMergeLocalFile( _output, _inputs, _outputFName );
-
- MatrixObject ret = _rm.executeSerialMerge();
-
- LOG.trace("Automatic result merge ("+_rm.getClass().getName()+") executed in "+time.stop()+"ms.");
-
- return ret;
- }
-
- @Override
- public MatrixObject executeParallelMerge(int par)
- throws DMLRuntimeException
- {
- MatrixCharacteristics mc = _output.getMatrixCharacteristics();
- long rows = mc.getRows();
- long cols = mc.getCols();
-
- if( OptimizerRuleBased.isInMemoryResultMerge(par * rows, cols, OptimizerUtils.getLocalMemBudget()) )
- _rm = new ResultMergeLocalMemory( _output, _inputs, _outputFName );
- else
- _rm = new ResultMergeLocalFile( _output, _inputs, _outputFName );
-
- return _rm.executeParallelMerge(par);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
deleted file mode 100644
index 8020719..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ /dev/null
@@ -1,1174 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.CacheException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.Cell;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.IDSequence;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.StagingFileUtils;
-import com.ibm.bi.dml.runtime.io.MatrixReader;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixFormatMetaData;
-import com.ibm.bi.dml.runtime.matrix.data.IJV;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.SparseRowsIterator;
-import com.ibm.bi.dml.runtime.util.DataConverter;
-import com.ibm.bi.dml.runtime.util.FastStringTokenizer;
-import com.ibm.bi.dml.runtime.util.LocalFileUtils;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-
-/**
- *
- * TODO potential extension: parallel merge (create individual staging files concurrently)
- *
- * NOTE: file merge typically used due to memory constraints - parallel merge would increase the memory
- * consumption again.
- */
-public class ResultMergeLocalFile extends ResultMerge
-{
-
- //NOTE: if we allow simple copies, this might result in a scattered file and many MR tasks for subsequent jobs
- public static final boolean ALLOW_COPY_CELLFILES = false;
-
- //internal comparison matrix
- private IDSequence _seq = null;
-
- public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in, String outputFilename )
- {
- super( out, in, outputFilename );
-
- _seq = new IDSequence();
- }
-
-
- @Override
- public MatrixObject executeSerialMerge()
- throws DMLRuntimeException
- {
- MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-
- //Timing time = null;
- LOG.trace("ResultMerge (local, file): Execute serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
- // time = new Timing();
- // time.start();
-
-
- try
- {
-
-
- //collect all relevant inputs
- ArrayList<MatrixObject> inMO = new ArrayList<MatrixObject>();
- for( MatrixObject in : _inputs )
- {
- //check for empty inputs (no iterations executed)
- if( in !=null && in != _output )
- {
- //ensure that input file resides on disk
- in.exportData();
-
- //add to merge list
- inMO.add( in );
- }
- }
-
- if( !inMO.isEmpty() )
- {
- //ensure that outputfile (for comparison) resides on disk
- _output.exportData();
-
- //actual merge
- merge( _outputFName, _output, inMO );
-
- //create new output matrix (e.g., to prevent potential export<->read file access conflict
- moNew = createNewMatrixObject( _output, inMO );
- }
- else
- {
- moNew = _output; //return old matrix, to prevent copy
- }
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
-
- //LOG.trace("ResultMerge (local, file): Executed serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+") in "+time.stop()+"ms");
-
- return moNew;
- }
-
- @Override
- public MatrixObject executeParallelMerge(int par)
- throws DMLRuntimeException
- {
- //graceful degradation to serial merge
- return executeSerialMerge();
- }
-
- /**
- *
- * @param output
- * @param inMO
- * @return
- * @throws DMLRuntimeException
- */
- private MatrixObject createNewMatrixObject(MatrixObject output, ArrayList<MatrixObject> inMO )
- throws DMLRuntimeException
- {
- String varName = _output.getVarName();
- ValueType vt = _output.getValueType();
- MatrixFormatMetaData metadata = (MatrixFormatMetaData) _output.getMetaData();
-
- MatrixObject moNew = new MatrixObject( vt, _outputFName );
- moNew.setVarName( varName.contains(NAME_SUFFIX) ? varName : varName+NAME_SUFFIX );
- moNew.setDataType( DataType.MATRIX );
-
- //create deep copy of metadata obj
- MatrixCharacteristics mcOld = metadata.getMatrixCharacteristics();
- OutputInfo oiOld = metadata.getOutputInfo();
- InputInfo iiOld = metadata.getInputInfo();
- MatrixCharacteristics mc = new MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
- mcOld.getRowsPerBlock(),mcOld.getColsPerBlock());
- mc.setNonZeros( computeNonZeros(output, inMO) );
- MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,oiOld,iiOld);
- moNew.setMetaData( meta );
-
- return moNew;
- }
-
- /**
- *
- * @param fnameNew
- * @param outMo
- * @param inMO
- * @throws DMLRuntimeException
- */
- private void merge( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
- throws DMLRuntimeException
- {
- OutputInfo oi = ((MatrixFormatMetaData)outMo.getMetaData()).getOutputInfo();
- boolean withCompare = ( outMo.getNnz() != 0 ); //if nnz exist or unknown (-1)
-
- if( oi == OutputInfo.TextCellOutputInfo )
- {
- if(withCompare)
- mergeTextCellWithComp(fnameNew, outMo, inMO);
- else
- mergeTextCellWithoutComp( fnameNew, outMo, inMO );
- }
- else if( oi == OutputInfo.BinaryCellOutputInfo )
- {
- if(withCompare)
- mergeBinaryCellWithComp(fnameNew, outMo, inMO);
- else
- mergeBinaryCellWithoutComp( fnameNew, outMo, inMO );
- }
- else if( oi == OutputInfo.BinaryBlockOutputInfo )
- {
- if(withCompare)
- mergeBinaryBlockWithComp( fnameNew, outMo, inMO );
- else
- mergeBinaryBlockWithoutComp( fnameNew, outMo, inMO );
- }
- }
-
- /**
- *
- * @param fnameNew
- * @param outMo
- * @param inMO
- * @throws DMLRuntimeException
- */
- private void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
- throws DMLRuntimeException
- {
- try
- {
- //delete target file if already exists
- MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-
- if( ALLOW_COPY_CELLFILES )
- {
- copyAllFiles(fnameNew, inMO);
- return; //we're done
- }
-
- //actual merge
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- FileSystem fs = FileSystem.get(job);
- Path path = new Path( fnameNew );
- BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
-
- String valueStr = null;
-
- try
- {
- for( MatrixObject in : inMO ) //read/write all inputs
- {
- LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+") via stream merge");
-
- JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
- Path tmpPath = new Path(in.getFileName());
- FileInputFormat.addInputPath(tmpJob, tmpPath);
- TextInputFormat informat = new TextInputFormat();
- informat.configure(tmpJob);
- InputSplit[] splits = informat.getSplits(tmpJob, 1);
-
- LongWritable key = new LongWritable();
- Text value = new Text();
-
- for(InputSplit split: splits)
- {
- RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
- try
- {
- while(reader.next(key, value))
- {
- valueStr = value.toString().trim();
- out.write( valueStr+"\n" );
- }
- }
- finally
- {
- if( reader != null )
- reader.close();
- }
- }
- }
- }
- finally
- {
- if( out != null )
- out.close();
- }
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException("Unable to merge text cell results.", ex);
- }
- }
-
- /**
- *
- * @param fnameNew
- * @param outMo
- * @param inMO
- * @throws DMLRuntimeException
- */
- private void mergeTextCellWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
- throws DMLRuntimeException
- {
- String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
- String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-
- try
- {
- //delete target file if already exists
- MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-
- //Step 0) write compare blocks to staging area (if necessary)
- LOG.trace("ResultMerge (local, file): Create merge compare matrix for output "+outMo.getVarName()+" (fname="+outMo.getFileName()+")");
- createTextCellStagingFile(fnameStagingCompare, outMo, 0);
-
- //Step 1) read and write blocks to staging area
- for( MatrixObject in : inMO )
- {
- LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");
-
- long ID = _seq.getNextID();
- createTextCellStagingFile( fnameStaging, in, ID );
- }
-
- //Step 2) read blocks, consolidate, and write to HDFS
- createTextCellResultFile(fnameStaging, fnameStagingCompare, fnameNew, (MatrixFormatMetaData)outMo.getMetaData(), true);
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException("Unable to merge text cell results.", ex);
- }
-
- LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
- LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
- }
-
- /**
- *
- * @param fnameNew
- * @param outMo
- * @param inMO
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("deprecation")
- private void mergeBinaryCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
- throws DMLRuntimeException
- {
- try
- {
- //delete target file if already exists
- MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-
- if( ALLOW_COPY_CELLFILES )
- {
- copyAllFiles(fnameNew, inMO);
- return; //we're done
- }
-
- //actual merge
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- FileSystem fs = FileSystem.get(job);
- Path path = new Path( fnameNew );
- SequenceFile.Writer out = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms
-
- MatrixIndexes key = new MatrixIndexes();
- MatrixCell value = new MatrixCell();
-
- try
- {
- for( MatrixObject in : inMO ) //read/write all inputs
- {
- LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+") via stream merge");
-
- JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
- Path tmpPath = new Path(in.getFileName());
-
- for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath) )
- {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob);
- try
- {
- while(reader.next(key, value))
- {
- out.append(key, value);
- }
- }
- finally
- {
- if( reader != null )
- reader.close();
- }
- }
- }
- }
- finally
- {
- if( out != null )
- out.close();
- }
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException("Unable to merge binary cell results.", ex);
- }
- }
-
- /**
- *
- * @param fnameNew
- * @param outMo
- * @param inMO
- * @throws DMLRuntimeException
- */
- private void mergeBinaryCellWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
- throws DMLRuntimeException
- {
- String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
- String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-
- try
- {
- //delete target file if already exists
- MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-
- //Step 0) write compare blocks to staging area (if necessary)
- LOG.trace("ResultMerge (local, file): Create merge compare matrix for output "+outMo.getVarName()+" (fname="+outMo.getFileName()+")");
- createBinaryCellStagingFile(fnameStagingCompare, outMo, 0);
-
- //Step 1) read and write blocks to staging area
- for( MatrixObject in : inMO )
- {
- LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");
-
- long ID = _seq.getNextID();
- createBinaryCellStagingFile( fnameStaging, in, ID );
- }
-
- //Step 2) read blocks, consolidate, and write to HDFS
- createBinaryCellResultFile(fnameStaging, fnameStagingCompare, fnameNew, (MatrixFormatMetaData)outMo.getMetaData(), true);
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException("Unable to merge binary cell results.", ex);
- }
-
- LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
- LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
- }
-
- /**
- *
- * @param fnameNew
- * @param outMo
- * @param inMO
- * @throws DMLRuntimeException
- */
- private void mergeBinaryBlockWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
- throws DMLRuntimeException
- {
- String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-
- try
- {
- //delete target file if already exists
- MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-
- //Step 1) read and write blocks to staging area
- for( MatrixObject in : inMO )
- {
- LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");
-
- createBinaryBlockStagingFile( fnameStaging, in );
- }
-
- //Step 2) read blocks, consolidate, and write to HDFS
- createBinaryBlockResultFile(fnameStaging, null, fnameNew, (MatrixFormatMetaData)outMo.getMetaData(), false);
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException("Unable to merge binary block results.", ex);
- }
-
- LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
- }
-
- /**
- *
- * @param fnameNew
- * @param outMo
- * @param inMO
- * @throws DMLRuntimeException
- */
- private void mergeBinaryBlockWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO )
- throws DMLRuntimeException
- {
- String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
- String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-
- try
- {
- //delete target file if already exists
- MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-
- //Step 0) write compare blocks to staging area (if necessary)
- LOG.trace("ResultMerge (local, file): Create merge compare matrix for output "+outMo.getVarName()+" (fname="+outMo.getFileName()+")");
-
- createBinaryBlockStagingFile(fnameStagingCompare, outMo);
-
- //Step 1) read and write blocks to staging area
- for( MatrixObject in : inMO )
- {
- LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");
- createBinaryBlockStagingFile( fnameStaging, in );
- }
-
- //Step 2) read blocks, consolidate, and write to HDFS
- createBinaryBlockResultFile(fnameStaging, fnameStagingCompare, fnameNew, (MatrixFormatMetaData)outMo.getMetaData(), true);
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException("Unable to merge binary block results.", ex);
- }
-
- LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
- LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
- }
-
- /**
- *
- * @param fnameStaging
- * @param mo
- * @throws IOException
- */
- @SuppressWarnings("deprecation")
- private void createBinaryBlockStagingFile( String fnameStaging, MatrixObject mo )
- throws IOException
- {
- MatrixIndexes key = new MatrixIndexes();
- MatrixBlock value = new MatrixBlock();
-
- JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
- FileSystem fs = FileSystem.get(tmpJob);
- Path tmpPath = new Path(mo.getFileName());
-
- for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath))
- {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob);
- try
- {
- while(reader.next(key, value)) //for each block
- {
- String lname = key.getRowIndex()+"_"+key.getColumnIndex();
- String dir = fnameStaging+"/"+lname;
- if( value.getNonZeros()>0 ) //write only non-empty blocks
- {
- LocalFileUtils.checkAndCreateStagingDir( dir );
- LocalFileUtils.writeMatrixBlockToLocal(dir+"/"+_seq.getNextID(), value);
- }
- }
- }
- finally
- {
- if( reader != null )
- reader.close();
- }
- }
- }
-
- /**
- *
- * @param fnameStaging
- * @param mo
- * @param ID
- * @throws IOException
- * @throws DMLRuntimeException
- */
-
- private void createTextCellStagingFile( String fnameStaging, MatrixObject mo, long ID )
- throws IOException, DMLRuntimeException
- {
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path(mo.getFileName());
- FileInputFormat.addInputPath(job, path);
- TextInputFormat informat = new TextInputFormat();
- informat.configure(job);
- InputSplit[] splits = informat.getSplits(job, 1);
-
- LinkedList<Cell> buffer = new LinkedList<Cell>();
- LongWritable key = new LongWritable();
- Text value = new Text();
-
- MatrixCharacteristics mc = mo.getMatrixCharacteristics();
- int brlen = mc.getRowsPerBlock();
- int bclen = mc.getColsPerBlock();
- //long row = -1, col = -1; //FIXME needs reconsideration whenever textcell is used actively
- //NOTE MB: Originally, we used long row, col but this led reproducibly to JIT compilation
- // errors during runtime; experienced under WINDOWS, Intel x86-64, IBM JDK 64bit/32bit.
- // It works fine with int row, col but we require long for larger matrices.
- // Since, textcell is never used for result merge (hybrid/hadoop: binaryblock, singlenode:binarycell)
- // we just propose the to exclude it with -Xjit:exclude={package.method*}(count=0,optLevel=0)
-
- FastStringTokenizer st = new FastStringTokenizer(' ');
-
- for(InputSplit split : splits)
- {
- RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
- try
- {
- while(reader.next(key, value))
- {
- st.reset( value.toString() ); //reset tokenizer
- long row = st.nextLong();
- long col = st.nextLong();
- double lvalue = Double.parseDouble( st.nextToken() );
-
- Cell tmp = new Cell( row, col, lvalue );
-
- buffer.addLast( tmp );
- if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
- {
- appendCellBufferToStagingArea(fnameStaging, ID, buffer, brlen, bclen);
- buffer.clear();
- }
- }
-
- //final flush
- if( !buffer.isEmpty() )
- {
- appendCellBufferToStagingArea(fnameStaging, ID, buffer, brlen, bclen);
- buffer.clear();
- }
- }
- finally
- {
- if( reader != null )
- reader.close();
- }
- }
- }
-
- /**
- *
- * @param fnameStaging
- * @param mo
- * @param ID
- * @throws IOException
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("deprecation")
- private void createBinaryCellStagingFile( String fnameStaging, MatrixObject mo, long ID )
- throws IOException, DMLRuntimeException
- {
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path(mo.getFileName());
- FileSystem fs = FileSystem.get(job);
-
- LinkedList<Cell> buffer = new LinkedList<Cell>();
- MatrixIndexes key = new MatrixIndexes();
- MatrixCell value = new MatrixCell();
-
- MatrixCharacteristics mc = mo.getMatrixCharacteristics();
- int brlen = mc.getRowsPerBlock();
- int bclen = mc.getColsPerBlock();
-
- for(Path lpath: MatrixReader.getSequenceFilePaths(fs, path))
- {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
- try
- {
- while(reader.next(key, value))
- {
- Cell tmp = new Cell( key.getRowIndex(), key.getColumnIndex(), value.getValue() );
-
- buffer.addLast( tmp );
- if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
- {
- appendCellBufferToStagingArea(fnameStaging, ID, buffer, brlen, bclen);
- buffer.clear();
- }
- }
-
- //final flush
- if( !buffer.isEmpty() )
- {
- appendCellBufferToStagingArea(fnameStaging, ID, buffer, brlen, bclen);
- buffer.clear();
- }
- }
- finally
- {
- if( reader != null )
- reader.close();
- }
- }
- }
-
- /**
- * @param fnameStaging
- * @param ID
- * @param buffer
- * @param brlen
- * @param bclen
- * @throws DMLRuntimeException
- * @throws IOException
- */
- private void appendCellBufferToStagingArea( String fnameStaging, long ID, LinkedList<Cell> buffer, int brlen, int bclen )
- throws DMLRuntimeException, IOException
- {
- HashMap<Long,HashMap<Long,LinkedList<Cell>>> sortedBuffer = new HashMap<Long, HashMap<Long,LinkedList<Cell>>>();
- long brow, bcol, row_offset, col_offset;
-
- for( Cell c : buffer )
- {
- brow = (c.getRow()-1)/brlen + 1;
- bcol = (c.getCol()-1)/bclen + 1;
- row_offset = (brow-1)*brlen + 1;
- col_offset = (bcol-1)*bclen + 1;
-
- c.setRow( c.getRow() - row_offset);
- c.setCol(c.getCol() - col_offset);
-
- if( !sortedBuffer.containsKey(brow) )
- sortedBuffer.put(brow, new HashMap<Long,LinkedList<Cell>>());
- if( !sortedBuffer.get(brow).containsKey(bcol) )
- sortedBuffer.get(brow).put(bcol, new LinkedList<Cell>());
- sortedBuffer.get(brow).get(bcol).addLast(c);
- }
-
- //write lists of cells to local files
- for( Entry<Long,HashMap<Long,LinkedList<Cell>>> e : sortedBuffer.entrySet() )
- {
- brow = e.getKey();
- for( Entry<Long,LinkedList<Cell>> e2 : e.getValue().entrySet() )
- {
- bcol = e2.getKey();
- String lname = brow+"_"+bcol;
- String dir = fnameStaging+"/"+lname;
- LocalFileUtils.checkAndCreateStagingDir( dir );
- StagingFileUtils.writeCellListToLocal(dir+"/"+ID, e2.getValue());
- }
- }
- }
-
- /**
- *
- * @param fnameStaging
- * @param fnameStagingCompare
- * @param fnameNew
- * @param metadata
- * @param withCompare
- * @throws IOException
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("deprecation")
- private void createBinaryBlockResultFile( String fnameStaging, String fnameStagingCompare, String fnameNew, MatrixFormatMetaData metadata, boolean withCompare )
- throws IOException, DMLRuntimeException
- {
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- FileSystem fs = FileSystem.get(job);
- Path path = new Path( fnameNew );
-
- MatrixCharacteristics mc = metadata.getMatrixCharacteristics();
- long rlen = mc.getRows();
- long clen = mc.getCols();
- int brlen = mc.getRowsPerBlock();
- int bclen = mc.getColsPerBlock();
-
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class); //beware ca 50ms
- try
- {
- MatrixIndexes indexes = new MatrixIndexes();
- for(long brow = 1; brow <= (long)Math.ceil(rlen/(double)brlen); brow++)
- for(long bcol = 1; bcol <= (long)Math.ceil(clen/(double)bclen); bcol++)
- {
- File dir = new File(fnameStaging+"/"+brow+"_"+bcol);
- File dir2 = new File(fnameStagingCompare+"/"+brow+"_"+bcol);
- MatrixBlock mb = null;
-
- if( dir.exists() )
- {
- if( withCompare && dir2.exists() ) //WITH COMPARE BLOCK
- {
- //copy only values that are different from the original
- String[] lnames2 = dir2.list();
- if( lnames2.length != 1 ) //there should be exactly 1 compare block
- throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found.");
- mb = LocalFileUtils.readMatrixBlockFromLocal( dir2+"/"+lnames2[0] );
- boolean appendOnly = mb.isInSparseFormat();
- double[][] compare = DataConverter.convertToDoubleMatrix(mb);
-
- String[] lnames = dir.list();
- for( String lname : lnames )
- {
- MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
- mergeWithComp(mb, tmp, compare);
- }
-
- //sort sparse due to append-only
- if( appendOnly )
- mb.sortSparseRows();
-
- //change sparsity if required after
- mb.examSparsity();
- }
- else //WITHOUT COMPARE BLOCK
- {
- //copy all non-zeros from all workers
- String[] lnames = dir.list();
- boolean appendOnly = false;
- for( String lname : lnames )
- {
- if( mb == null )
- {
- mb = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
- appendOnly = mb.isInSparseFormat();
- }
- else
- {
- MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
- mergeWithoutComp(mb, tmp, appendOnly);
- }
- }
-
- //sort sparse due to append-only
- if( appendOnly )
- mb.sortSparseRows();
-
- //change sparsity if required after
- mb.examSparsity();
- }
- }
- else
- {
- //NOTE: whenever runtime does not need all blocks anymore, this can be removed
- int maxRow = (int)(((brow-1)*brlen + brlen < rlen) ? brlen : rlen - (brow-1)*brlen);
- int maxCol = (int)(((bcol-1)*bclen + bclen < clen) ? bclen : clen - (bcol-1)*bclen);
-
- mb = new MatrixBlock(maxRow, maxCol, true);
- }
-
- //mb.examSparsity(); //done on write anyway and mb not reused
- indexes.setIndexes(brow, bcol);
- writer.append(indexes, mb);
- }
- }
- finally
- {
- if( writer != null )
- writer.close();
- }
- }
-
- /**
- *
- * @param fnameStaging
- * @param fnameStagingCompare
- * @param fnameNew
- * @param metadata
- * @param withCompare
- * @throws IOException
- * @throws DMLRuntimeException
- */
- private void createTextCellResultFile( String fnameStaging, String fnameStagingCompare, String fnameNew, MatrixFormatMetaData metadata, boolean withCompare )
- throws IOException, DMLRuntimeException
- {
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- FileSystem fs = FileSystem.get(job);
- Path path = new Path( fnameNew );
-
- MatrixCharacteristics mc = metadata.getMatrixCharacteristics();
- long rlen = mc.getRows();
- long clen = mc.getCols();
- int brlen = mc.getRowsPerBlock();
- int bclen = mc.getColsPerBlock();
-
- BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
- try
- {
- //for obj reuse and preventing repeated buffer re-allocations
- StringBuilder sb = new StringBuilder();
-
- boolean written=false;
- for(long brow = 1; brow <= (long)Math.ceil(rlen/(double)brlen); brow++)
- for(long bcol = 1; bcol <= (long)Math.ceil(clen/(double)bclen); bcol++)
- {
- File dir = new File(fnameStaging+"/"+brow+"_"+bcol);
- File dir2 = new File(fnameStagingCompare+"/"+brow+"_"+bcol);
- MatrixBlock mb = null;
-
- long row_offset = (brow-1)*brlen + 1;
- long col_offset = (bcol-1)*bclen + 1;
-
-
- if( dir.exists() )
- {
- if( withCompare && dir2.exists() ) //WITH COMPARE BLOCK
- {
- //copy only values that are different from the original
- String[] lnames2 = dir2.list();
- if( lnames2.length != 1 ) //there should be exactly 1 compare block
- throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found.");
- mb = StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], brlen, bclen );
- boolean appendOnly = mb.isInSparseFormat();
- double[][] compare = DataConverter.convertToDoubleMatrix(mb);
-
- String[] lnames = dir.list();
- for( String lname : lnames )
- {
- MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen );
- mergeWithComp(mb, tmp, compare);
- }
-
- //sort sparse and exam sparsity due to append-only
- if( appendOnly )
- mb.sortSparseRows();
-
- //change sparsity if required after
- mb.examSparsity();
- }
- else //WITHOUT COMPARE BLOCK
- {
- //copy all non-zeros from all workers
- String[] lnames = dir.list();
- boolean appendOnly = false;
- for( String lname : lnames )
- {
- if( mb == null )
- {
- mb = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen );
- appendOnly = mb.isInSparseFormat();
- }
- else
- {
- MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen );
- mergeWithoutComp(mb, tmp, appendOnly);
- }
- }
-
- //sort sparse due to append-only
- if( appendOnly )
- mb.sortSparseRows();
-
- //change sparsity if required after
- mb.examSparsity();
- }
- }
-
- //write the block to text cell
- if( mb!=null )
- {
- if( mb.isInSparseFormat() )
- {
- SparseRowsIterator iter = mb.getSparseRowsIterator();
- while( iter.hasNext() )
- {
- IJV lcell = iter.next();
- sb.append(row_offset+lcell.i);
- sb.append(' ');
- sb.append(col_offset+lcell.j);
- sb.append(' ');
- sb.append(lcell.v);
- sb.append('\n');
- out.write( sb.toString() );
- sb.setLength(0);
- written = true;
- }
- }
- else
- {
- for( int i=0; i<brlen; i++ )
- for( int j=0; j<bclen; j++ )
- {
- double lvalue = mb.getValueDenseUnsafe(i, j);
- if( lvalue != 0 ) //for nnz
- {
- sb.append(row_offset+i);
- sb.append(' ');
- sb.append(col_offset+j);
- sb.append(' ');
- sb.append(lvalue);
- sb.append('\n');
- out.write( sb.toString() );
- sb.setLength(0);
- written = true;
- }
- }
- }
- }
- }
-
- if( !written )
- out.write("1 1 0\n");
- }
- finally
- {
- if( out != null )
- out.close();
- }
- }
-
- /**
- *
- * @param fnameStaging
- * @param fnameStagingCompare
- * @param fnameNew
- * @param metadata
- * @param withCompare
- * @throws IOException
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("deprecation")
- private void createBinaryCellResultFile( String fnameStaging, String fnameStagingCompare, String fnameNew, MatrixFormatMetaData metadata, boolean withCompare )
- throws IOException, DMLRuntimeException
- {
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- FileSystem fs = FileSystem.get(job);
- Path path = new Path( fnameNew );
-
- MatrixCharacteristics mc = metadata.getMatrixCharacteristics();
- long rlen = mc.getRows();
- long clen = mc.getCols();
- int brlen = mc.getRowsPerBlock();
- int bclen = mc.getColsPerBlock();
-
-
- MatrixIndexes indexes = new MatrixIndexes(1,1);
- MatrixCell cell = new MatrixCell(0);
-
- SequenceFile.Writer out = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms
- try
- {
- boolean written=false;
- for(long brow = 1; brow <= (long)Math.ceil(rlen/(double)brlen); brow++)
- for(long bcol = 1; bcol <= (long)Math.ceil(clen/(double)bclen); bcol++)
- {
- File dir = new File(fnameStaging+"/"+brow+"_"+bcol);
- File dir2 = new File(fnameStagingCompare+"/"+brow+"_"+bcol);
- MatrixBlock mb = null;
-
- long row_offset = (brow-1)*brlen + 1;
- long col_offset = (bcol-1)*bclen + 1;
-
-
- if( dir.exists() )
- {
- if( withCompare && dir2.exists() ) //WITH COMPARE BLOCK
- {
- //copy only values that are different from the original
- String[] lnames2 = dir2.list();
- if( lnames2.length != 1 ) //there should be exactly 1 compare block
- throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found.");
- mb = StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], brlen, bclen );
- boolean appendOnly = mb.isInSparseFormat();
- double[][] compare = DataConverter.convertToDoubleMatrix(mb);
-
- String[] lnames = dir.list();
- for( String lname : lnames )
- {
- MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen );
- mergeWithComp(mb, tmp, compare);
- }
-
- //sort sparse due to append-only
- if( appendOnly )
- mb.sortSparseRows();
-
- //change sparsity if required after
- mb.examSparsity();
- }
- else //WITHOUT COMPARE BLOCK
- {
- //copy all non-zeros from all workers
- String[] lnames = dir.list();
- boolean appendOnly = false;
- for( String lname : lnames )
- {
- if( mb == null )
- {
- mb = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen );
- appendOnly = mb.isInSparseFormat();
- }
- else
- {
- MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen );
- mergeWithoutComp(mb, tmp, appendOnly);
- }
- }
-
- //sort sparse due to append-only
- if( appendOnly )
- mb.sortSparseRows();
-
- //change sparsity if required after
- mb.examSparsity();
- }
- }
-
- //write the block to binary cell
- if( mb!=null )
- {
- if( mb.isInSparseFormat() )
- {
- SparseRowsIterator iter = mb.getSparseRowsIterator();
- while( iter.hasNext() )
- {
- IJV lcell = iter.next();
- indexes.setIndexes(row_offset+lcell.i, col_offset+lcell.j);
- cell.setValue(lcell.v);
- out.append(indexes,cell);
- written = true;
- }
- }
- else
- {
- for( int i=0; i<brlen; i++ )
- for( int j=0; j<bclen; j++ )
- {
- double lvalue = mb.getValueDenseUnsafe(i, j);
- if( lvalue != 0 ) //for nnz
- {
- indexes.setIndexes(row_offset+i, col_offset+j);
- cell.setValue(lvalue);
- out.append(indexes,cell);
- written = true;
- }
- }
- }
- }
- }
-
- if( !written )
- out.append(indexes,cell);
- }
- finally
- {
- if( out != null )
- out.close();
- }
- }
-
- /**
- *
- * @param fnameNew
- * @param inMO
- * @throws CacheException
- * @throws IOException
- */
- private void copyAllFiles( String fnameNew, ArrayList<MatrixObject> inMO )
- throws CacheException, IOException
- {
- JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
- FileSystem fs = FileSystem.get(job);
- Path path = new Path( fnameNew );
-
- //create output dir
- fs.mkdirs(path);
-
- //merge in all input matrix objects
- IDSequence seq = new IDSequence();
- for( MatrixObject in : inMO )
- {
- LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+") via file rename.");
-
- //copy over files (just rename file or entire dir)
- Path tmpPath = new Path(in.getFileName());
- String lname = tmpPath.getName();
- fs.rename(tmpPath, new Path(fnameNew+"/"+lname+seq.getNextID()));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
deleted file mode 100644
index 77b3bb5..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.ArrayList;
-
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixFormatMetaData;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.util.DataConverter;
-
-/**
- * Local in-memory realization of result merge. If the resulting matrix is
- * small enough to fit into the JVM memory, this class can be used for efficient
- * serial or multi-threaded merge.
- *
- *
- */
-public class ResultMergeLocalMemory extends ResultMerge
-{
-
- //internal comparison matrix
- private double[][] _compare = null;
-
- public ResultMergeLocalMemory( MatrixObject out, MatrixObject[] in, String outputFilename )
- {
- super( out, in, outputFilename );
- }
-
- @Override
- public MatrixObject executeSerialMerge()
- throws DMLRuntimeException
- {
- MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-
- LOG.trace("ResultMerge (local, in-memory): Execute serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
-
- try
- {
- //get matrix blocks through caching
- MatrixBlock outMB = _output.acquireRead();
-
- //get old output matrix from cache for compare
- int estnnz = outMB.getNumRows()*outMB.getNumColumns();
- MatrixBlock outMBNew = new MatrixBlock(outMB.getNumRows(), outMB.getNumColumns(),
- outMB.isInSparseFormat(), estnnz);
- boolean appendOnly = outMBNew.isInSparseFormat();
-
- //create compare matrix if required (existing data in result)
- _compare = createCompareMatrix(outMB);
- if( _compare != null )
- outMBNew.copy(outMB);
-
- //serial merge all inputs
- boolean flagMerged = false;
- for( MatrixObject in : _inputs )
- {
- //check for empty inputs (no iterations executed)
- if( in !=null && in != _output )
- {
- LOG.trace("ResultMerge (local, in-memory): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");
-
- //read/pin input_i
- MatrixBlock inMB = in.acquireRead();
-
- //core merge
- merge( outMBNew, inMB, appendOnly );
-
- //unpin and clear in-memory input_i
- in.release();
- in.clearData();
- flagMerged = true;
-
- //determine need for sparse2dense change during merge
- boolean sparseToDense = appendOnly && !MatrixBlock.evalSparseFormatInMemory(
- outMBNew.getNumRows(), outMBNew.getNumColumns(), outMBNew.getNonZeros());
- if( sparseToDense ) {
- outMBNew.sortSparseRows(); //sort sparse due to append-only
- outMBNew.examSparsity(); //sparse-dense representation change
- appendOnly = false; //change merge state for subsequent inputs
- }
- }
- }
-
- //sort sparse due to append-only
- if( appendOnly )
- outMBNew.sortSparseRows();
-
- //change sparsity if required after
- outMBNew.examSparsity();
-
- //create output
- if( flagMerged )
- {
- //create new output matrix
- //(e.g., to prevent potential export<->read file access conflict in specific cases of
- // local-remote nested parfor))
- moNew = createNewMatrixObject( outMBNew );
- }
- else
- {
- moNew = _output; //return old matrix, to prevent copy
- }
-
- //release old output, and all inputs
- _output.release();
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
-
- //LOG.trace("ResultMerge (local, in-memory): Executed serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+") in "+time.stop()+"ms");
-
- return moNew;
- }
-
- @Override
- public MatrixObject executeParallelMerge( int par )
- throws DMLRuntimeException
- {
- MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-
- //Timing time = null;
- LOG.trace("ResultMerge (local, in-memory): Execute parallel (par="+par+") merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
- // time = new Timing();
- // time.start();
-
-
- try
- {
- //get matrix blocks through caching
- MatrixBlock outMB = _output.acquireRead();
- ArrayList<MatrixObject> inMO = new ArrayList<MatrixObject>();
- for( MatrixObject in : _inputs )
- {
- //check for empty inputs (no iterations executed)
- if( in !=null && in != _output )
- inMO.add( in );
- }
-
- if( !inMO.isEmpty() ) //if there exist something to merge
- {
- //get old output matrix from cache for compare
- //NOTE: always in dense representation in order to allow for parallel unsynchronized access
- long rows = outMB.getNumRows();
- long cols = outMB.getNumColumns();
- MatrixBlock outMBNew = new MatrixBlock((int)rows, (int)cols, false);
- outMBNew.allocateDenseBlockUnsafe((int)rows, (int)cols);
-
- //create compare matrix if required (existing data in result)
- _compare = createCompareMatrix(outMB);
- if( _compare != null )
- outMBNew.copy(outMB);
-
- //parallel merge of all inputs
-
- int numThreads = Math.min(par, inMO.size()); //number of inputs can be lower than par
- numThreads = Math.min(numThreads, InfrastructureAnalyzer.getLocalParallelism()); //ensure robustness for remote exec
- Thread[] threads = new Thread[ numThreads ];
- for( int k=0; k<inMO.size(); k+=numThreads ) //multiple waves if necessary
- {
- //create and start threads
- for( int i=0; i<threads.length; i++ )
- {
- ResultMergeWorker rmw = new ResultMergeWorker(inMO.get(k+i), outMBNew);
- threads[i] = new Thread(rmw);
- threads[i].setPriority(Thread.MAX_PRIORITY);
- threads[i].start(); // start execution
- }
- //wait for all workers to finish
- for( int i=0; i<threads.length; i++ )
- {
- threads[i].join();
- }
- }
-
- //create new output matrix
- //(e.g., to prevent potential export<->read file access conflict in specific cases of
- // local-remote nested parfor))
- moNew = createNewMatrixObject( outMBNew );
- }
- else
- {
- moNew = _output; //return old matrix, to prevent copy
- }
-
- //release old output, and all inputs
- _output.release();
- //_output.clearData(); //save, since it respects pin/unpin
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
-
- //LOG.trace("ResultMerge (local, in-memory): Executed parallel (par="+par+") merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+") in "+time.stop()+"ms");
-
- return moNew;
- }
-
- /**
- *
- * @param output
- * @return
- */
- private double[][] createCompareMatrix( MatrixBlock output )
- {
- double[][] ret = null;
-
- //create compare matrix only if required
- if( output.getNonZeros() > 0 )
- {
- ret = DataConverter.convertToDoubleMatrix( output );
- }
-
- return ret;
- }
-
- /**
- *
- * @param varName
- * @param vt
- * @param metadata
- * @param data
- * @return
- * @throws DMLRuntimeException
- */
- private MatrixObject createNewMatrixObject( MatrixBlock data )
- throws DMLRuntimeException
- {
- String varName = _output.getVarName();
- ValueType vt = _output.getValueType();
- MatrixFormatMetaData metadata = (MatrixFormatMetaData) _output.getMetaData();
-
- MatrixObject moNew = new MatrixObject( vt, _outputFName );
- moNew.setVarName( varName.contains(NAME_SUFFIX) ? varName : varName+NAME_SUFFIX );
- moNew.setDataType( DataType.MATRIX );
-
- //create deep copy of metadata obj
- MatrixCharacteristics mcOld = metadata.getMatrixCharacteristics();
- OutputInfo oiOld = metadata.getOutputInfo();
- InputInfo iiOld = metadata.getInputInfo();
- MatrixCharacteristics mc = new MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
- mcOld.getRowsPerBlock(),mcOld.getColsPerBlock());
- mc.setNonZeros(data.getNonZeros());
- MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,oiOld,iiOld);
- moNew.setMetaData( meta );
-
- //adjust dense/sparse representation
- data.examSparsity();
-
- //release new output
- moNew.acquireModify(data);
- moNew.release();
-
- return moNew;
- }
-
-
- /**
- * Merges <code>in</code> into <code>out</code> by inserting all non-zeros of <code>in</code>
- * into <code>out</code> at their given positions. This is an update-in-place.
- *
- * NOTE: similar to converters, but not directly applicable as we are interested in combining
- * two objects with each other; not unary transformation.
- *
- * @param out
- * @param in
- * @throws DMLRuntimeException
- */
- private void merge( MatrixBlock out, MatrixBlock in, boolean appendOnly )
- throws DMLRuntimeException
- {
- if( _compare == null )
- mergeWithoutComp(out, in, appendOnly);
- else
- mergeWithComp(out, in, _compare);
- }
-
-
- /**
- * NOTE: only used if matrix in dense
- */
- private class ResultMergeWorker implements Runnable
- {
- private MatrixObject _inMO = null;
- private MatrixBlock _outMB = null;
-
- public ResultMergeWorker(MatrixObject inMO, MatrixBlock outMB)
- {
- _inMO = inMO;
- _outMB = outMB;
- }
-
- @Override
- public void run()
- {
- //read each input if required
- try
- {
- LOG.trace("ResultMerge (local, in-memory): Merge input "+_inMO.getVarName()+" (fname="+_inMO.getFileName()+")");
-
- MatrixBlock inMB = _inMO.acquireRead(); //incl. implicit read from HDFS
- merge( _outMB, inMB, false );
- _inMO.release();
- _inMO.clearData();
- }
- catch(Exception ex)
- {
- throw new RuntimeException("Failed to parallel merge result.", ex);
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
deleted file mode 100644
index 041b0bd..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-public class ResultMergeRemoteGrouping extends WritableComparator
-{
-
-
- protected ResultMergeRemoteGrouping()
- {
- super(ResultMergeTaggedMatrixIndexes.class,true);
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public int compare(WritableComparable k1, WritableComparable k2)
- {
- ResultMergeTaggedMatrixIndexes key1 = (ResultMergeTaggedMatrixIndexes)k1;
- ResultMergeTaggedMatrixIndexes key2 = (ResultMergeTaggedMatrixIndexes)k2;
-
- //group by matrix indexes only (including all tags)
- return key1.getIndexes().compareTo(key2.getIndexes());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
deleted file mode 100644
index 0a406e3..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.StagingFileUtils;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixFormatMetaData;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixCell;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.LocalFileUtils;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-import com.ibm.bi.dml.utils.Statistics;
-
-/**
- * MR job class for submitting parfor result merge MR jobs.
- *
- */
-public class ResultMergeRemoteMR extends ResultMerge
-{
-
- public static final byte COMPARE_TAG = 'c';
- public static final byte DATA_TAG = 'd';
-
- private long _pfid = -1;
- private int _numMappers = -1;
- private int _numReducers = -1;
- private int _replication = -1;
- //private int _max_retry = -1;
- private boolean _jvmReuse = false;
-
- public ResultMergeRemoteMR(MatrixObject out, MatrixObject[] in, String outputFilename, long pfid, int numMappers, int numReducers, int replication, int max_retry, boolean jvmReuse)
- {
- super(out, in, outputFilename);
-
- _pfid = pfid;
- _numMappers = numMappers;
- _numReducers = numReducers;
- _replication = replication;
- //_max_retry = max_retry;
- _jvmReuse = jvmReuse;
- }
-
- @Override
- public MatrixObject executeSerialMerge()
- throws DMLRuntimeException
- {
- //graceful degradation to parallel merge
- return executeParallelMerge( _numMappers );
- }
-
- @Override
- public MatrixObject executeParallelMerge(int par)
- throws DMLRuntimeException
- {
- MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-
- //Timing time = null;
- LOG.trace("ResultMerge (remote, mr): Execute serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
- // time = new Timing();
- // time.start();
-
-
- try
- {
- //collect all relevant inputs
- Collection<String> srcFnames = new LinkedList<String>();
- ArrayList<MatrixObject> inMO = new ArrayList<MatrixObject>();
- for( MatrixObject in : _inputs )
- {
- //check for empty inputs (no iterations executed)
- if( in !=null && in != _output )
- {
- //ensure that input file resides on disk
- in.exportData();
-
- //add to merge list
- srcFnames.add( in.getFileName() );
- inMO.add(in);
- }
- }
-
- if( !srcFnames.isEmpty() )
- {
- //ensure that outputfile (for comparison) resides on disk
- _output.exportData();
-
- //actual merge
- MatrixFormatMetaData metadata = (MatrixFormatMetaData) _output.getMetaData();
- MatrixCharacteristics mcOld = metadata.getMatrixCharacteristics();
-
- String fnameCompare = _output.getFileName();
- if( mcOld.getNonZeros()==0 )
- fnameCompare = null; //no compare required
-
- executeMerge(fnameCompare, _outputFName, srcFnames.toArray(new String[0]),
- metadata.getInputInfo(),metadata.getOutputInfo(), mcOld.getRows(), mcOld.getCols(),
- mcOld.getRowsPerBlock(), mcOld.getColsPerBlock());
-
- //create new output matrix (e.g., to prevent potential export<->read file access conflict
- String varName = _output.getVarName();
- ValueType vt = _output.getValueType();
- moNew = new MatrixObject( vt, _outputFName );
- moNew.setVarName( varName.contains(NAME_SUFFIX) ? varName : varName+NAME_SUFFIX );
- moNew.setDataType( DataType.MATRIX );
- OutputInfo oiOld = metadata.getOutputInfo();
- InputInfo iiOld = metadata.getInputInfo();
- MatrixCharacteristics mc = new MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
- mcOld.getRowsPerBlock(),mcOld.getColsPerBlock());
- mc.setNonZeros( computeNonZeros(_output, inMO) );
- MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,oiOld,iiOld);
- moNew.setMetaData( meta );
- }
- else
- {
- moNew = _output; //return old matrix, to prevent copy
- }
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
-
- //LOG.trace("ResultMerge (local, file): Executed serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+") in "+time.stop()+"ms");
-
- return moNew;
- }
-
- /**
- *
- * @param fname null if no comparison required
- * @param fnameNew
- * @param srcFnames
- * @param ii
- * @param oi
- * @param rlen
- * @param clen
- * @param brlen
- * @param bclen
- * @throws DMLRuntimeException
- */
- @SuppressWarnings({ "unused", "deprecation" })
- protected void executeMerge(String fname, String fnameNew, String[] srcFnames, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen)
- throws DMLRuntimeException
- {
- String jobname = "ParFor-RMMR";
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
- JobConf job;
- job = new JobConf( ResultMergeRemoteMR.class );
- job.setJobName(jobname+_pfid);
-
- //maintain dml script counters
- Statistics.incrementNoOfCompiledMRJobs();
-
- //warning for textcell/binarycell without compare
- boolean withCompare = (fname!=null);
- if( (oi == OutputInfo.TextCellOutputInfo || oi == OutputInfo.BinaryCellOutputInfo) && !withCompare && ResultMergeLocalFile.ALLOW_COPY_CELLFILES )
- LOG.warn("Result merge for "+OutputInfo.outputInfoToString(oi)+" without compare can be realized more efficiently with LOCAL_FILE than REMOTE_MR.");
-
- try
- {
- Path pathCompare = null;
- Path pathNew = new Path(fnameNew);
-
- /////
- //configure the MR job
- if( withCompare ) {
- pathCompare = new Path(fname).makeQualified(FileSystem.get(job));
- MRJobConfiguration.setResultMergeInfo(job, pathCompare.toString(), ii, LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, brlen, bclen);
- }
- else
- MRJobConfiguration.setResultMergeInfo(job, "null", ii, LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, bclen, bclen);
-
-
- //set mappers, reducers, combiners
- job.setMapperClass(ResultMergeRemoteMapper.class);
- job.setReducerClass(ResultMergeRemoteReducer.class);
-
- if( oi == OutputInfo.TextCellOutputInfo )
- {
- job.setMapOutputKeyClass(MatrixIndexes.class);
- job.setMapOutputValueClass(TaggedMatrixCell.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
- }
- else if( oi == OutputInfo.BinaryCellOutputInfo )
- {
- job.setMapOutputKeyClass(MatrixIndexes.class);
- job.setMapOutputValueClass(TaggedMatrixCell.class);
- job.setOutputKeyClass(MatrixIndexes.class);
- job.setOutputValueClass(MatrixCell.class);
- }
- else if ( oi == OutputInfo.BinaryBlockOutputInfo )
- {
- //setup partitioning, grouping, sorting for composite key (old API)
- job.setPartitionerClass(ResultMergeRemotePartitioning.class); //partitioning
- job.setOutputValueGroupingComparator(ResultMergeRemoteGrouping.class); //grouping
- job.setOutputKeyComparatorClass(ResultMergeRemoteSorting.class); //sorting
-
- job.setMapOutputKeyClass(ResultMergeTaggedMatrixIndexes.class);
- job.setMapOutputValueClass(TaggedMatrixBlock.class);
- job.setOutputKeyClass(MatrixIndexes.class);
- job.setOutputValueClass(MatrixBlock.class);
- }
-
- //set input format
- job.setInputFormat(ii.inputFormatClass);
-
- //set the input path
- Path[] paths = null;
- if( withCompare ) {
- paths= new Path[ srcFnames.length+1 ];
- paths[0] = pathCompare;
- for(int i=1; i<paths.length; i++)
- paths[i] = new Path( srcFnames[i-1] );
- }
- else {
- paths= new Path[ srcFnames.length ];
- for(int i=0; i<paths.length; i++)
- paths[i] = new Path( srcFnames[i] );
- }
- FileInputFormat.setInputPaths(job, paths);
-
- //set output format
- job.setOutputFormat(oi.outputFormatClass);
-
- //set output path
- MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
- FileOutputFormat.setOutputPath(job, pathNew);
-
-
- //////
- //set optimization parameters
-
- //set the number of mappers and reducers
- //job.setNumMapTasks( _numMappers ); //use default num mappers
- long reducerGroups = _numReducers;
- if( oi == OutputInfo.BinaryBlockOutputInfo )
- reducerGroups = Math.max(rlen/brlen,1) * Math.max(clen/bclen, 1);
- else //textcell/binarycell
- reducerGroups = Math.max((rlen*clen)/StagingFileUtils.CELL_BUFFER_SIZE, 1);
- job.setNumReduceTasks( (int)Math.min( _numReducers, reducerGroups) );
-
- //use FLEX scheduler configuration properties
- if( ParForProgramBlock.USE_FLEX_SCHEDULER_CONF )
- {
- job.setInt("flex.map.min", 0);
- job.setInt("flex.map.max", _numMappers);
- job.setInt("flex.reduce.min", 0);
- job.setInt("flex.reduce.max", _numMappers);
- }
-
- //disable automatic tasks timeouts and speculative task exec
- job.setInt("mapred.task.timeout", 0);
- job.setMapSpeculativeExecution(false);
-
- //set up preferred custom serialization framework for binary block format
- if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
- MRJobConfiguration.addBinaryBlockSerializationFramework( job );
-
- //enables the reuse of JVMs (multiple tasks per MR task)
- if( _jvmReuse )
- job.setNumTasksToExecutePerJvm(-1); //unlimited
-
- //enables compression - not conclusive for different codecs (empirically good compression ratio, but significantly slower)
- //job.set("mapred.compress.map.output", "true");
- //job.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
-
- //set the replication factor for the results
- job.setInt("dfs.replication", _replication);
-
- //set the max number of retries per map task
- // disabled job-level configuration to respect cluster configuration
- // note: this refers to hadoop2, hence it never had effect on mr1
- //job.setInt("mapreduce.map.maxattempts", _max_retry);
-
- //set unique working dir
- MRJobConfiguration.setUniqueWorkingDir(job);
-
- /////
- // execute the MR job
-
- JobClient.runJob(job);
-
- //maintain dml script counters
- Statistics.incrementNoOfExecutedMRJobs();
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
-
- if( DMLScript.STATISTICS ){
- long t1 = System.nanoTime();
- Statistics.maintainCPHeavyHitters("MR-Job_"+jobname, t1-t0);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java
deleted file mode 100644
index 52690ee..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixCell;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.FastStringTokenizer;
-import com.ibm.bi.dml.runtime.util.UtilFunctions;
-
-/**
- * Remote resultmerge mapper implementation that does the preprocessing
- * in terms of tagging .
- *
- */
-public class ResultMergeRemoteMapper
- implements Mapper<Writable, Writable, Writable, Writable>
-{
-
- private ResultMergeMapper _mapper;
-
- public void map(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException
- {
- //tag and pass-through matrix values
- _mapper.processKeyValue(key, value, out, reporter);
- }
-
- public void configure(JobConf job)
- {
- InputInfo ii = MRJobConfiguration.getResultMergeInputInfo(job);
- long[] tmp = MRJobConfiguration.getResultMergeMatrixCharacteristics( job );
- String compareFname = MRJobConfiguration.getResultMergeInfoCompareFilename(job);
- String currentFname = job.get("map.input.file");
-
- byte tag = 0;
- //startsWith comparison in order to account for part names in currentFname
- if( currentFname.startsWith(compareFname) )
- tag = ResultMergeRemoteMR.COMPARE_TAG;
- else
- tag = ResultMergeRemoteMR.DATA_TAG;
-
- if( ii == InputInfo.TextCellInputInfo )
- _mapper = new ResultMergeMapperTextCell(tag);
- else if( ii == InputInfo.BinaryCellInputInfo )
- _mapper = new ResultMergeMapperBinaryCell(tag);
- else if( ii == InputInfo.BinaryBlockInputInfo )
- _mapper = new ResultMergeMapperBinaryBlock(tag, tmp[0], tmp[1], tmp[2], tmp[3]);
- else
- throw new RuntimeException("Unable to configure mapper with unknown input info: "+ii.toString());
- }
-
- /**
- *
- */
- @Override
- public void close() throws IOException
- {
- //do nothing
- }
-
- private static abstract class ResultMergeMapper
- {
- protected byte _tag = 0;
-
- protected ResultMergeMapper( byte tag )
- {
- _tag = tag;
- }
-
- protected abstract void processKeyValue(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException;
- }
-
- protected static class ResultMergeMapperTextCell extends ResultMergeMapper
- {
- private MatrixIndexes _objKey;
- private MatrixCell _objValueHelp;
- private TaggedMatrixCell _objValue;
- private FastStringTokenizer _st;
-
- protected ResultMergeMapperTextCell(byte tag)
- {
- super(tag);
- _objKey = new MatrixIndexes();
- _objValueHelp = new MatrixCell();
- _objValue = new TaggedMatrixCell();
- _objValue.setTag( _tag );
-
- _st = new FastStringTokenizer(' ');
- }
-
- @Override
- protected void processKeyValue(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException
- {
- _st.reset( value.toString() ); //reset tokenizer
- long row = _st.nextLong();
- long col = _st.nextLong();
- double lvalue = _st.nextDouble();
-
- _objKey.setIndexes(row,col);
- _objValueHelp.setValue(lvalue);
- _objValue.setBaseObject(_objValueHelp);
-
- out.collect(_objKey, _objValue);
- }
- }
-
- protected static class ResultMergeMapperBinaryCell extends ResultMergeMapper
- {
- private TaggedMatrixCell _objValue;
-
- protected ResultMergeMapperBinaryCell(byte tag)
- {
- super(tag);
- _objValue = new TaggedMatrixCell();
- _objValue.setTag( _tag );
- }
-
- @Override
- protected void processKeyValue(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException
- {
- _objValue.setBaseObject((MatrixCell)value);
- out.collect(key, _objValue);
- }
- }
-
- protected static class ResultMergeMapperBinaryBlock extends ResultMergeMapper
- {
- private ResultMergeTaggedMatrixIndexes _objKey;
- private TaggedMatrixBlock _objValue;
- private long _rlen = -1;
- private long _clen = -1;
- private long _brlen = -1;
- private long _bclen = -1;
-
- protected ResultMergeMapperBinaryBlock(byte tag, long rlen, long clen, long brlen, long bclen)
- {
- super(tag);
- _objKey = new ResultMergeTaggedMatrixIndexes();
- _objValue = new TaggedMatrixBlock();
- _objKey.setTag( _tag );
- _objValue.setTag( _tag );
-
- _rlen = rlen;
- _clen = clen;
- _brlen = brlen;
- _bclen = bclen;
- }
-
- @Override
- protected void processKeyValue(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException
- {
- MatrixIndexes inkey = (MatrixIndexes)key;
- MatrixBlock inval = (MatrixBlock)value;
-
- //check valid block sizes
- if( inval.getNumRows() != UtilFunctions.computeBlockSize(_rlen, inkey.getRowIndex(), _brlen) )
- throw new IOException("Invalid number of rows for block "+inkey+": "+inval.getNumRows());
- if( inval.getNumColumns() != UtilFunctions.computeBlockSize(_clen, inkey.getColumnIndex(), _bclen) )
- throw new IOException("Invalid number of columns for block "+inkey+": "+inval.getNumColumns());
-
- //pass-through matrix blocks
- _objKey.getIndexes().setIndexes( inkey );
- _objValue.setBaseObject( inval );
- out.collect(_objKey, _objValue);
- }
- }
-}