You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2016/01/26 02:13:01 UTC
[37/55] [partial] incubator-systemml git commit: [SYSTEMML-482]
[SYSTEMML-480] Adding a Git attributes file to enfore Unix-styled line
endings, and normalizing all of the line endings.
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
index d54ce97..cb9fd06 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParWorkerReducer.java
@@ -1,372 +1,372 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
-import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.StatisticMonitor;
-import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
-import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
-import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
-import org.apache.sysml.runtime.instructions.cp.IntObject;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.LocalFileUtils;
-import org.apache.sysml.utils.Statistics;
-
-/**
- *
- */
-public class RemoteDPParWorkerReducer extends ParWorker
- implements Reducer<LongWritable, Writable, Writable, Writable>
-{
-
- //MR data partitioning attributes
- private String _inputVar = null;
- private String _iterVar = null;
- private PDataPartitionFormat _dpf = null;
- private OutputInfo _info = null;
- private int _rlen = -1;
- private int _clen = -1;
- private int _brlen = -1;
- private int _bclen = -1;
-
- //reuse matrix partition
- private MatrixBlock _partition = null;
- private boolean _tSparseCol = false;
-
- //MR ParWorker attributes
- protected String _stringID = null;
- protected HashMap<String, String> _rvarFnames = null;
-
- //cached collector/reporter
- protected OutputCollector<Writable, Writable> _out = null;
- protected Reporter _report = null;
-
- /**
- *
- */
- public RemoteDPParWorkerReducer()
- {
-
- }
-
- @Override
- public void reduce(LongWritable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException
- {
- //cache collector/reporter (for write in close)
- _out = out;
- _report = reporter;
-
- //collect input partition
- if( _info == OutputInfo.BinaryBlockOutputInfo )
- _partition = collectBinaryBlock( valueList );
- else
- _partition = collectBinaryCellInput( valueList );
-
- //update in-memory matrix partition
- MatrixObject mo = (MatrixObject)_ec.getVariable( _inputVar );
- mo.setInMemoryPartition( _partition );
-
- //execute program
- LOG.trace("execute RemoteDPParWorkerReducer "+_stringID+" ("+_workerID+")");
- try {
- //create tasks for input data
- Task lTask = new Task(TaskType.SET);
- lTask.addIteration( new IntObject(_iterVar,key.get()) );
-
- //execute program
- executeTask( lTask );
- }
- catch(Exception ex)
- {
- throw new IOException("ParFOR: Failed to execute task.",ex);
- }
-
- //statistic maintenance (after final export)
- RemoteParForUtils.incrementParForMRCounters(_report, 1, 1);
- }
-
- /**
- *
- */
- @Override
- public void configure(JobConf job)
- {
- //Step 1: configure data partitioning information
- _rlen = (int)MRJobConfiguration.getPartitioningNumRows( job );
- _clen = (int)MRJobConfiguration.getPartitioningNumCols( job );
- _brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
- _bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
- _iterVar = MRJobConfiguration.getPartitioningItervar( job );
- _inputVar = MRJobConfiguration.getPartitioningMatrixvar( job );
- _dpf = MRJobConfiguration.getPartitioningFormat( job );
- switch( _dpf ) { //create matrix partition for reuse
- case ROW_WISE: _rlen = 1; break;
- case COLUMN_WISE: _clen = 1; break;
- default: throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+_dpf);
- }
- _info = MRJobConfiguration.getPartitioningOutputInfo( job );
- _tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( job );
- if( _tSparseCol )
- _partition = new MatrixBlock((int)_clen, _rlen, true);
- else
- _partition = new MatrixBlock((int)_rlen, _clen, false);
-
- //Step 1: configure parworker
- String taskID = job.get("mapred.tip.id");
- LOG.trace("configure RemoteDPParWorkerReducer "+taskID);
-
- try
- {
- _stringID = taskID;
- _workerID = IDHandler.extractIntID(_stringID); //int task ID
-
- //use the given job configuration as source for all new job confs
- //NOTE: this is required because on HDP 2.3, the classpath of mr tasks contained hadoop-common.jar
- //which includes a core-default.xml configuration which hides the actual default cluster configuration
- //in the context of mr jobs (for example this config points to local fs instead of hdfs by default).
- if( !InfrastructureAnalyzer.isLocalMode(job) ) {
- ConfigurationManager.setCachedJobConf(job);
- }
-
- //create local runtime program
- String in = MRJobConfiguration.getProgramBlocks(job);
- ParForBody body = ProgramConverter.parseParForBody(in, (int)_workerID);
- _childBlocks = body.getChildBlocks();
- _ec = body.getEc();
- _resultVars = body.getResultVarNames();
-
- //init local cache manager
- if( !CacheableData.isCachingActive() ) {
- String uuid = IDHandler.createDistributedUniqueID();
- LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
- CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
- }
- if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
- CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID;
- }
-
- //ensure that resultvar files are not removed
- super.pinResultVariables();
-
- //enable/disable caching (if required)
- boolean cpCaching = MRJobConfiguration.getParforCachingConfig( job );
- if( !cpCaching )
- CacheableData.disableCaching();
-
- _numTasks = 0;
- _numIters = 0;
- }
- catch(Exception ex)
- {
- throw new RuntimeException(ex);
- }
-
- //disable parfor stat monitoring, reporting execution times via counters not useful
- StatisticMonitor.disableStatMonitoring();
-
- //always reset stats because counters per map task (for case of JVM reuse)
- if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
- {
- CacheStatistics.reset();
- Statistics.reset();
- }
- }
-
- /**
- *
- */
- @Override
- public void close()
- throws IOException
- {
- try
- {
- //write output if required (matrix indexed write)
- RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), _resultVars, _out );
-
- //statistic maintenance (after final export)
- RemoteParForUtils.incrementParForMRCounters(_report, 0, 0);
-
- //print heaver hitter per task
- JobConf job = ConfigurationManager.getCachedJobConf();
- if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
- LOG.info("\nSystemML Statistics:\nHeavy hitter instructions (name, time, count):\n" + Statistics.getHeavyHitters(10));
- }
- catch(Exception ex)
- {
- throw new IOException( ex );
- }
-
- //cleanup cache and local tmp dir
- RemoteParForUtils.cleanupWorkingDirectories();
-
- //ensure caching is not disabled for CP in local mode
- CacheableData.enableCaching();
- }
-
- /**
- * Collects a matrixblock partition from a given input iterator over
- * binary blocks.
- *
- * Note it reuses the instance attribute _partition - multiple calls
- * will overwrite the result.
- *
- * @param valueList
- * @return
- * @throws IOException
- */
- private MatrixBlock collectBinaryBlock( Iterator<Writable> valueList )
- throws IOException
- {
- try
- {
- //reset reuse block, keep configured representation
- _partition.reset(_rlen, _clen);
-
- while( valueList.hasNext() )
- {
- PairWritableBlock pairValue = (PairWritableBlock)valueList.next();
- int row_offset = (int)(pairValue.indexes.getRowIndex()-1)*_brlen;
- int col_offset = (int)(pairValue.indexes.getColumnIndex()-1)*_bclen;
- MatrixBlock block = pairValue.block;
- if( !_partition.isInSparseFormat() ) //DENSE
- {
- _partition.copy( row_offset, row_offset+block.getNumRows()-1,
- col_offset, col_offset+block.getNumColumns()-1,
- pairValue.block, false );
- }
- else //SPARSE
- {
- _partition.appendToSparse(pairValue.block, row_offset, col_offset);
- }
- }
-
- //final partition cleanup
- cleanupCollectedMatrixPartition( _partition.isInSparseFormat() );
- }
- catch(DMLRuntimeException ex)
- {
- throw new IOException(ex);
- }
-
- return _partition;
- }
-
-
- /**
- * Collects a matrixblock partition from a given input iterator over
- * binary cells.
- *
- * Note it reuses the instance attribute _partition - multiple calls
- * will overwrite the result.
- *
- * @param valueList
- * @return
- * @throws IOException
- */
- private MatrixBlock collectBinaryCellInput( Iterator<Writable> valueList )
- throws IOException
- {
- //reset reuse block, keep configured representation
- if( _tSparseCol )
- _partition.reset(_clen, _rlen);
- else
- _partition.reset(_rlen, _clen);
-
- switch( _dpf )
- {
- case ROW_WISE:
- while( valueList.hasNext() )
- {
- PairWritableCell pairValue = (PairWritableCell)valueList.next();
- if( pairValue.indexes.getColumnIndex()<0 )
- continue; //cells used to ensure empty partitions
- _partition.quickSetValue(0, (int)pairValue.indexes.getColumnIndex()-1, pairValue.cell.getValue());
- }
- break;
- case COLUMN_WISE:
- while( valueList.hasNext() )
- {
- PairWritableCell pairValue = (PairWritableCell)valueList.next();
- if( pairValue.indexes.getRowIndex()<0 )
- continue; //cells used to ensure empty partitions
- if( _tSparseCol )
- _partition.appendValue(0,(int)pairValue.indexes.getRowIndex()-1, pairValue.cell.getValue());
- else
- _partition.quickSetValue((int)pairValue.indexes.getRowIndex()-1, 0, pairValue.cell.getValue());
- }
- break;
- default:
- throw new IOException("Partition format not yet supported in fused partition-execute: "+_dpf);
- }
-
- //final partition cleanup
- cleanupCollectedMatrixPartition(_tSparseCol);
-
- return _partition;
- }
-
- /**
- *
- * @param sort
- * @throws IOException
- */
- private void cleanupCollectedMatrixPartition(boolean sort)
- throws IOException
- {
- //sort sparse row contents if required
- if( _partition.isInSparseFormat() && sort )
- _partition.sortSparseRows();
-
- //ensure right number of nnz
- if( !_partition.isInSparseFormat() )
- _partition.recomputeNonZeros();
-
- //exam and switch dense/sparse representation
- try {
- _partition.examSparsity();
- }
- catch(Exception ex){
- throw new IOException(ex);
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.parfor;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
+import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.StatisticMonitor;
+import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
+import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
+import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
+import org.apache.sysml.runtime.instructions.cp.IntObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.utils.Statistics;
+
+/**
+ *
+ */
+public class RemoteDPParWorkerReducer extends ParWorker
+ implements Reducer<LongWritable, Writable, Writable, Writable>
+{
+
+ //MR data partitioning attributes
+ private String _inputVar = null;
+ private String _iterVar = null;
+ private PDataPartitionFormat _dpf = null;
+ private OutputInfo _info = null;
+ private int _rlen = -1;
+ private int _clen = -1;
+ private int _brlen = -1;
+ private int _bclen = -1;
+
+ //reuse matrix partition
+ private MatrixBlock _partition = null;
+ private boolean _tSparseCol = false;
+
+ //MR ParWorker attributes
+ protected String _stringID = null;
+ protected HashMap<String, String> _rvarFnames = null;
+
+ //cached collector/reporter
+ protected OutputCollector<Writable, Writable> _out = null;
+ protected Reporter _report = null;
+
+ /**
+ *
+ */
+ public RemoteDPParWorkerReducer()
+ {
+
+ }
+
+ @Override
+ public void reduce(LongWritable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
+ throws IOException
+ {
+ //cache collector/reporter (for write in close)
+ _out = out;
+ _report = reporter;
+
+ //collect input partition
+ if( _info == OutputInfo.BinaryBlockOutputInfo )
+ _partition = collectBinaryBlock( valueList );
+ else
+ _partition = collectBinaryCellInput( valueList );
+
+ //update in-memory matrix partition
+ MatrixObject mo = (MatrixObject)_ec.getVariable( _inputVar );
+ mo.setInMemoryPartition( _partition );
+
+ //execute program
+ LOG.trace("execute RemoteDPParWorkerReducer "+_stringID+" ("+_workerID+")");
+ try {
+ //create tasks for input data
+ Task lTask = new Task(TaskType.SET);
+ lTask.addIteration( new IntObject(_iterVar,key.get()) );
+
+ //execute program
+ executeTask( lTask );
+ }
+ catch(Exception ex)
+ {
+ throw new IOException("ParFOR: Failed to execute task.",ex);
+ }
+
+ //statistic maintenance (after final export)
+ RemoteParForUtils.incrementParForMRCounters(_report, 1, 1);
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void configure(JobConf job)
+ {
+ //Step 1: configure data partitioning information
+ _rlen = (int)MRJobConfiguration.getPartitioningNumRows( job );
+ _clen = (int)MRJobConfiguration.getPartitioningNumCols( job );
+ _brlen = MRJobConfiguration.getPartitioningBlockNumRows( job );
+ _bclen = MRJobConfiguration.getPartitioningBlockNumCols( job );
+ _iterVar = MRJobConfiguration.getPartitioningItervar( job );
+ _inputVar = MRJobConfiguration.getPartitioningMatrixvar( job );
+ _dpf = MRJobConfiguration.getPartitioningFormat( job );
+ switch( _dpf ) { //create matrix partition for reuse
+ case ROW_WISE: _rlen = 1; break;
+ case COLUMN_WISE: _clen = 1; break;
+ default: throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+_dpf);
+ }
+ _info = MRJobConfiguration.getPartitioningOutputInfo( job );
+ _tSparseCol = MRJobConfiguration.getPartitioningTransposedCol( job );
+ if( _tSparseCol )
+ _partition = new MatrixBlock((int)_clen, _rlen, true);
+ else
+ _partition = new MatrixBlock((int)_rlen, _clen, false);
+
+ //Step 1: configure parworker
+ String taskID = job.get("mapred.tip.id");
+ LOG.trace("configure RemoteDPParWorkerReducer "+taskID);
+
+ try
+ {
+ _stringID = taskID;
+ _workerID = IDHandler.extractIntID(_stringID); //int task ID
+
+ //use the given job configuration as source for all new job confs
+ //NOTE: this is required because on HDP 2.3, the classpath of mr tasks contained hadoop-common.jar
+ //which includes a core-default.xml configuration which hides the actual default cluster configuration
+ //in the context of mr jobs (for example this config points to local fs instead of hdfs by default).
+ if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+ ConfigurationManager.setCachedJobConf(job);
+ }
+
+ //create local runtime program
+ String in = MRJobConfiguration.getProgramBlocks(job);
+ ParForBody body = ProgramConverter.parseParForBody(in, (int)_workerID);
+ _childBlocks = body.getChildBlocks();
+ _ec = body.getEc();
+ _resultVars = body.getResultVarNames();
+
+ //init local cache manager
+ if( !CacheableData.isCachingActive() ) {
+ String uuid = IDHandler.createDistributedUniqueID();
+ LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
+ CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
+ }
+ if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
+ CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID;
+ }
+
+ //ensure that resultvar files are not removed
+ super.pinResultVariables();
+
+ //enable/disable caching (if required)
+ boolean cpCaching = MRJobConfiguration.getParforCachingConfig( job );
+ if( !cpCaching )
+ CacheableData.disableCaching();
+
+ _numTasks = 0;
+ _numIters = 0;
+ }
+ catch(Exception ex)
+ {
+ throw new RuntimeException(ex);
+ }
+
+ //disable parfor stat monitoring, reporting execution times via counters not useful
+ StatisticMonitor.disableStatMonitoring();
+
+ //always reset stats because counters per map task (for case of JVM reuse)
+ if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
+ {
+ CacheStatistics.reset();
+ Statistics.reset();
+ }
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void close()
+ throws IOException
+ {
+ try
+ {
+ //write output if required (matrix indexed write)
+ RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), _resultVars, _out );
+
+ //statistic maintenance (after final export)
+ RemoteParForUtils.incrementParForMRCounters(_report, 0, 0);
+
+ //print heaver hitter per task
+ JobConf job = ConfigurationManager.getCachedJobConf();
+ if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
+ LOG.info("\nSystemML Statistics:\nHeavy hitter instructions (name, time, count):\n" + Statistics.getHeavyHitters(10));
+ }
+ catch(Exception ex)
+ {
+ throw new IOException( ex );
+ }
+
+ //cleanup cache and local tmp dir
+ RemoteParForUtils.cleanupWorkingDirectories();
+
+ //ensure caching is not disabled for CP in local mode
+ CacheableData.enableCaching();
+ }
+
+ /**
+ * Collects a matrixblock partition from a given input iterator over
+ * binary blocks.
+ *
+ * Note it reuses the instance attribute _partition - multiple calls
+ * will overwrite the result.
+ *
+ * @param valueList
+ * @return
+ * @throws IOException
+ */
+ private MatrixBlock collectBinaryBlock( Iterator<Writable> valueList )
+ throws IOException
+ {
+ try
+ {
+ //reset reuse block, keep configured representation
+ _partition.reset(_rlen, _clen);
+
+ while( valueList.hasNext() )
+ {
+ PairWritableBlock pairValue = (PairWritableBlock)valueList.next();
+ int row_offset = (int)(pairValue.indexes.getRowIndex()-1)*_brlen;
+ int col_offset = (int)(pairValue.indexes.getColumnIndex()-1)*_bclen;
+ MatrixBlock block = pairValue.block;
+ if( !_partition.isInSparseFormat() ) //DENSE
+ {
+ _partition.copy( row_offset, row_offset+block.getNumRows()-1,
+ col_offset, col_offset+block.getNumColumns()-1,
+ pairValue.block, false );
+ }
+ else //SPARSE
+ {
+ _partition.appendToSparse(pairValue.block, row_offset, col_offset);
+ }
+ }
+
+ //final partition cleanup
+ cleanupCollectedMatrixPartition( _partition.isInSparseFormat() );
+ }
+ catch(DMLRuntimeException ex)
+ {
+ throw new IOException(ex);
+ }
+
+ return _partition;
+ }
+
+
+ /**
+ * Collects a matrixblock partition from a given input iterator over
+ * binary cells.
+ *
+ * Note it reuses the instance attribute _partition - multiple calls
+ * will overwrite the result.
+ *
+ * @param valueList
+ * @return
+ * @throws IOException
+ */
+ private MatrixBlock collectBinaryCellInput( Iterator<Writable> valueList )
+ throws IOException
+ {
+ //reset reuse block, keep configured representation
+ if( _tSparseCol )
+ _partition.reset(_clen, _rlen);
+ else
+ _partition.reset(_rlen, _clen);
+
+ switch( _dpf )
+ {
+ case ROW_WISE:
+ while( valueList.hasNext() )
+ {
+ PairWritableCell pairValue = (PairWritableCell)valueList.next();
+ if( pairValue.indexes.getColumnIndex()<0 )
+ continue; //cells used to ensure empty partitions
+ _partition.quickSetValue(0, (int)pairValue.indexes.getColumnIndex()-1, pairValue.cell.getValue());
+ }
+ break;
+ case COLUMN_WISE:
+ while( valueList.hasNext() )
+ {
+ PairWritableCell pairValue = (PairWritableCell)valueList.next();
+ if( pairValue.indexes.getRowIndex()<0 )
+ continue; //cells used to ensure empty partitions
+ if( _tSparseCol )
+ _partition.appendValue(0,(int)pairValue.indexes.getRowIndex()-1, pairValue.cell.getValue());
+ else
+ _partition.quickSetValue((int)pairValue.indexes.getRowIndex()-1, 0, pairValue.cell.getValue());
+ }
+ break;
+ default:
+ throw new IOException("Partition format not yet supported in fused partition-execute: "+_dpf);
+ }
+
+ //final partition cleanup
+ cleanupCollectedMatrixPartition(_tSparseCol);
+
+ return _partition;
+ }
+
+ /**
+ *
+ * @param sort
+ * @throws IOException
+ */
+ private void cleanupCollectedMatrixPartition(boolean sort)
+ throws IOException
+ {
+ //sort sparse row contents if required
+ if( _partition.isInSparseFormat() && sort )
+ _partition.sortSparseRows();
+
+ //ensure right number of nnz
+ if( !_partition.isInSparseFormat() )
+ _partition.recomputeNonZeros();
+
+ //exam and switch dense/sparse representation
+ try {
+ _partition.examSparsity();
+ }
+ catch(Exception ex){
+ throw new IOException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
index 2426bc2..67beff6 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -1,266 +1,266 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import scala.Tuple2;
-
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
-import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
-import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
-import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
-import org.apache.sysml.runtime.instructions.cp.Data;
-import org.apache.sysml.runtime.util.LocalFileUtils;
-import org.apache.sysml.utils.Statistics;
-
-/**
- * Common functionalities for parfor workers in MR jobs. Used by worker wrappers in
- * mappers (base RemoteParFor) and reducers (fused data partitioning and parfor)
- *
- */
-public class RemoteParForUtils
-{
-
- /**
- *
- * @param reporter
- * @param deltaTasks
- * @param deltaIterations
- */
- public static void incrementParForMRCounters(Reporter reporter, long deltaTasks, long deltaIterations)
- {
- //report parfor counters
- if( deltaTasks>0 )
- reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMTASKS.toString(), deltaTasks);
- if( deltaIterations>0 )
- reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMITERS.toString(), deltaIterations);
-
- JobConf job = ConfigurationManager.getCachedJobConf();
- if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
- {
- //report cache statistics
- reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JITCOMPILE.toString(), Statistics.getJITCompileTime());
- reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_COUNT.toString(), Statistics.getJVMgcCount());
- reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_TIME.toString(), Statistics.getJVMgcTime());
- reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_MEM.toString(), CacheStatistics.getMemHits());
- reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString(), CacheStatistics.getFSBuffHits());
- reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FS.toString(), CacheStatistics.getFSHits());
- reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_HDFS.toString(), CacheStatistics.getHDFSHits());
- reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString(), CacheStatistics.getFSBuffWrites());
- reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FS.toString(), CacheStatistics.getFSWrites());
- reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_HDFS.toString(), CacheStatistics.getHDFSWrites());
- reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQR.toString(), CacheStatistics.getAcquireRTime());
- reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQM.toString(), CacheStatistics.getAcquireMTime());
- reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_RLS.toString(), CacheStatistics.getReleaseTime());
- reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_EXP.toString(), CacheStatistics.getExportTime());
-
- //reset cache statistics to prevent overlapping reporting
- CacheStatistics.reset();
- }
- }
-
- /**
- *
- * @param workerID
- * @param vars
- * @param resultVars
- * @param out
- * @throws DMLRuntimeException
- * @throws IOException
- */
- public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, OutputCollector<Writable, Writable> out )
- throws DMLRuntimeException, IOException
- {
- exportResultVariables(workerID, vars, resultVars, null, out);
- }
-
- /**
- * For remote MR parfor workers.
- *
- * @param workerID
- * @param vars
- * @param resultVars
- * @param rvarFnames
- * @param out
- * @throws DMLRuntimeException
- * @throws IOException
- */
- public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars,
- HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out )
- throws DMLRuntimeException, IOException
- {
- //create key and value for reuse
- LongWritable okey = new LongWritable( workerID );
- Text ovalue = new Text();
-
- //foreach result variables probe if export necessary
- for( String rvar : resultVars )
- {
- Data dat = vars.get( rvar );
-
- //export output variable to HDFS (see RunMRJobs)
- if ( dat != null && dat.getDataType() == DataType.MATRIX )
- {
- MatrixObject mo = (MatrixObject) dat;
- if( mo.isDirty() )
- {
- if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER && rvarFnames!=null )
- {
- String fname = rvarFnames.get( rvar );
- if( fname!=null )
- mo.setFileName( fname );
-
- //export result var (iff actually modified in parfor)
- mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)
- rvarFnames.put(rvar, mo.getFileName());
- }
- else
- {
- //export result var (iff actually modified in parfor)
- mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)
- }
-
- //pass output vars (scalars by value, matrix by ref) to result
- //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
- String datStr = ProgramConverter.serializeDataObject(rvar, mo);
- ovalue.set( datStr );
- out.collect( okey, ovalue );
- }
- }
- }
- }
-
- /**
- * For remote Spark parfor workers. This is a simplified version compared to MR.
- *
- * @param workerID
- * @param vars
- * @param resultVars
- * @param rvarFnames
- * @throws DMLRuntimeException
- * @throws IOException
- */
- public static ArrayList<String> exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars)
- throws DMLRuntimeException, IOException
- {
- ArrayList<String> ret = new ArrayList<String>();
-
- //foreach result variables probe if export necessary
- for( String rvar : resultVars )
- {
- Data dat = vars.get( rvar );
-
- //export output variable to HDFS (see RunMRJobs)
- if ( dat != null && dat.getDataType() == DataType.MATRIX )
- {
- MatrixObject mo = (MatrixObject) dat;
- if( mo.isDirty() )
- {
- //export result var (iff actually modified in parfor)
- mo.exportData();
-
-
- //pass output vars (scalars by value, matrix by ref) to result
- //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
- ret.add( ProgramConverter.serializeDataObject(rvar, mo) );
- }
- }
- }
-
- return ret;
- }
-
-
- /**
- * Cleanup all temporary files created by this SystemML process
- * instance.
- *
- */
- public static void cleanupWorkingDirectories()
- {
- //use the given job configuration for infrastructure analysis (see configure);
- //this is important for robustness w/ misconfigured classpath which also contains
- //core-default.xml and hence hides the actual cluster configuration; otherwise
- //there is missing cleanup of working directories
- JobConf job = ConfigurationManager.getCachedJobConf();
-
- if( !InfrastructureAnalyzer.isLocalMode(job) )
- {
- //delete cache files
- CacheableData.cleanupCacheDir();
- //disable caching (prevent dynamic eviction)
- CacheableData.disableCaching();
- //cleanup working dir (e.g., of CP_FILE instructions)
- LocalFileUtils.cleanupWorkingDirectory();
- }
- }
-
- /**
- *
- * @param out
- * @return
- * @throws DMLRuntimeException
- * @throws IOException
- */
- public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> out, Log LOG )
- throws DMLRuntimeException
- {
- HashMap<Long,LocalVariableMap> tmp = new HashMap<Long,LocalVariableMap>();
-
- int countAll = 0;
- for( Tuple2<Long,String> entry : out )
- {
- Long key = entry._1();
- String val = entry._2();
- if( !tmp.containsKey( key ) )
- tmp.put(key, new LocalVariableMap ());
- Object[] dat = ProgramConverter.parseDataObject( val );
- tmp.get(key).put((String)dat[0], (Data)dat[1]);
- countAll++;
- }
-
- if( LOG != null ) {
- LOG.debug("Num remote worker results (before deduplication): "+countAll);
- LOG.debug("Num remote worker results: "+tmp.size());
- }
-
- //create return array
- return tmp.values().toArray(new LocalVariableMap[0]);
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.parfor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import scala.Tuple2;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
+import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
+import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
+import org.apache.sysml.runtime.instructions.cp.Data;
+import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.utils.Statistics;
+
+/**
+ * Common functionalities for parfor workers in MR jobs. Used by worker wrappers in
+ * mappers (base RemoteParFor) and reducers (fused data partitioning and parfor)
+ *
+ */
+public class RemoteParForUtils
+{
+
+ /**
+ *
+ * @param reporter
+ * @param deltaTasks
+ * @param deltaIterations
+ */
+ public static void incrementParForMRCounters(Reporter reporter, long deltaTasks, long deltaIterations)
+ {
+ //report parfor counters
+ if( deltaTasks>0 )
+ reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMTASKS.toString(), deltaTasks);
+ if( deltaIterations>0 )
+ reporter.incrCounter(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_NUMITERS.toString(), deltaIterations);
+
+ JobConf job = ConfigurationManager.getCachedJobConf();
+ if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
+ {
+ //report cache statistics
+ reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JITCOMPILE.toString(), Statistics.getJITCompileTime());
+ reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_COUNT.toString(), Statistics.getJVMgcCount());
+ reporter.incrCounter( ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME, Stat.PARFOR_JVMGC_TIME.toString(), Statistics.getJVMgcTime());
+ reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_MEM.toString(), CacheStatistics.getMemHits());
+ reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString(), CacheStatistics.getFSBuffHits());
+ reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_FS.toString(), CacheStatistics.getFSHits());
+ reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_HITS_HDFS.toString(), CacheStatistics.getHDFSHits());
+ reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString(), CacheStatistics.getFSBuffWrites());
+ reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_FS.toString(), CacheStatistics.getFSWrites());
+ reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_WRITES_HDFS.toString(), CacheStatistics.getHDFSWrites());
+ reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQR.toString(), CacheStatistics.getAcquireRTime());
+ reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_ACQM.toString(), CacheStatistics.getAcquireMTime());
+ reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_RLS.toString(), CacheStatistics.getReleaseTime());
+ reporter.incrCounter( CacheableData.CACHING_COUNTER_GROUP_NAME, CacheStatistics.Stat.CACHE_TIME_EXP.toString(), CacheStatistics.getExportTime());
+
+ //reset cache statistics to prevent overlapping reporting
+ CacheStatistics.reset();
+ }
+ }
+
+ /**
+ *
+ * @param workerID
+ * @param vars
+ * @param resultVars
+ * @param out
+ * @throws DMLRuntimeException
+ * @throws IOException
+ */
+ public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars, OutputCollector<Writable, Writable> out )
+ throws DMLRuntimeException, IOException
+ {
+ exportResultVariables(workerID, vars, resultVars, null, out);
+ }
+
+ /**
+ * For remote MR parfor workers.
+ *
+ * @param workerID
+ * @param vars
+ * @param resultVars
+ * @param rvarFnames
+ * @param out
+ * @throws DMLRuntimeException
+ * @throws IOException
+ */
+ public static void exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars,
+ HashMap<String,String> rvarFnames, OutputCollector<Writable, Writable> out )
+ throws DMLRuntimeException, IOException
+ {
+ //create key and value for reuse
+ LongWritable okey = new LongWritable( workerID );
+ Text ovalue = new Text();
+
+ //foreach result variables probe if export necessary
+ for( String rvar : resultVars )
+ {
+ Data dat = vars.get( rvar );
+
+ //export output variable to HDFS (see RunMRJobs)
+ if ( dat != null && dat.getDataType() == DataType.MATRIX )
+ {
+ MatrixObject mo = (MatrixObject) dat;
+ if( mo.isDirty() )
+ {
+ if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER && rvarFnames!=null )
+ {
+ String fname = rvarFnames.get( rvar );
+ if( fname!=null )
+ mo.setFileName( fname );
+
+ //export result var (iff actually modified in parfor)
+ mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)
+ rvarFnames.put(rvar, mo.getFileName());
+ }
+ else
+ {
+ //export result var (iff actually modified in parfor)
+ mo.exportData(); //note: this is equivalent to doing it in close (currently not required because 1 Task=1Map tasks, hence only one map invocation)
+ }
+
+ //pass output vars (scalars by value, matrix by ref) to result
+ //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
+ String datStr = ProgramConverter.serializeDataObject(rvar, mo);
+ ovalue.set( datStr );
+ out.collect( okey, ovalue );
+ }
+ }
+ }
+ }
+
+ /**
+ * For remote Spark parfor workers. This is a simplified version compared to MR.
+ *
+ * @param workerID
+ * @param vars
+ * @param resultVars
+ * @param rvarFnames
+ * @throws DMLRuntimeException
+ * @throws IOException
+ */
+ public static ArrayList<String> exportResultVariables( long workerID, LocalVariableMap vars, ArrayList<String> resultVars)
+ throws DMLRuntimeException, IOException
+ {
+ ArrayList<String> ret = new ArrayList<String>();
+
+ //foreach result variables probe if export necessary
+ for( String rvar : resultVars )
+ {
+ Data dat = vars.get( rvar );
+
+ //export output variable to HDFS (see RunMRJobs)
+ if ( dat != null && dat.getDataType() == DataType.MATRIX )
+ {
+ MatrixObject mo = (MatrixObject) dat;
+ if( mo.isDirty() )
+ {
+ //export result var (iff actually modified in parfor)
+ mo.exportData();
+
+
+ //pass output vars (scalars by value, matrix by ref) to result
+ //(only if actually exported, hence in check for dirty, otherwise potential problems in result merge)
+ ret.add( ProgramConverter.serializeDataObject(rvar, mo) );
+ }
+ }
+ }
+
+ return ret;
+ }
+
+
+ /**
+ * Cleanup all temporary files created by this SystemML process
+ * instance.
+ *
+ */
+ public static void cleanupWorkingDirectories()
+ {
+ //use the given job configuration for infrastructure analysis (see configure);
+ //this is important for robustness w/ misconfigured classpath which also contains
+ //core-default.xml and hence hides the actual cluster configuration; otherwise
+ //there is missing cleanup of working directories
+ JobConf job = ConfigurationManager.getCachedJobConf();
+
+ if( !InfrastructureAnalyzer.isLocalMode(job) )
+ {
+ //delete cache files
+ CacheableData.cleanupCacheDir();
+ //disable caching (prevent dynamic eviction)
+ CacheableData.disableCaching();
+ //cleanup working dir (e.g., of CP_FILE instructions)
+ LocalFileUtils.cleanupWorkingDirectory();
+ }
+ }
+
+ /**
+ *
+ * @param out
+ * @return
+ * @throws DMLRuntimeException
+ * @throws IOException
+ */
+ public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> out, Log LOG )
+ throws DMLRuntimeException
+ {
+ HashMap<Long,LocalVariableMap> tmp = new HashMap<Long,LocalVariableMap>();
+
+ int countAll = 0;
+ for( Tuple2<Long,String> entry : out )
+ {
+ Long key = entry._1();
+ String val = entry._2();
+ if( !tmp.containsKey( key ) )
+ tmp.put(key, new LocalVariableMap ());
+ Object[] dat = ProgramConverter.parseDataObject( val );
+ tmp.get(key).put((String)dat[0], (Data)dat[1]);
+ countAll++;
+ }
+
+ if( LOG != null ) {
+ LOG.debug("Num remote worker results (before deduplication): "+countAll);
+ LOG.debug("Num remote worker results: "+tmp.size());
+ }
+
+ //create return array
+ return tmp.values().toArray(new LocalVariableMap[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
index 6f478bc..e6556fa 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/PerfTestToolRegression.dml
@@ -1,58 +1,58 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-
-#PerfTestTool: DML template for estimation cost functions.
-
-dynRead = externalFunction(Matrix[Double] d, String fname, Integer m, Integer n)
-return (Matrix[Double] D)
-implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicReadMatrix2DCP",exectype="mem")
-
-dynWrite = externalFunction(Matrix[Double] R, String fname)
-return (Matrix[Double] D)
-implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicWriteMatrix2DCP",exectype="mem")
-
-solve = externalFunction(Matrix[Double] A, Matrix[Double] y)
-return (Matrix[Double] b)
-implemented in (classname="org.apache.sysml.packagesupport.LinearSolverWrapperCP",exectype="mem")
-
-k = %numModels%;
-m = -1;
-n = -1;
-
-dummy = matrix(1,rows=1,cols=1);
-
-for( i in 1:k, par=8, mode=LOCAL )
-{
- sin1 = "./conf/PerfTestTool/"+i+"_in1.csv";
- sin2 = "./conf/PerfTestTool/"+i+"_in2.csv";
-
- D = dynRead( dummy, sin1, m, n );
- y = dynRead( dummy, sin2, m, 1 );
-
- A = t(D) %*% D; # X'X
- b = t(D) %*% y; # X'y
- beta = solve(A,b);
-
- sout = "./conf/PerfTestTool/"+i+"_out.csv";
-
- X=dynWrite( beta, sout );
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+
+#PerfTestTool: DML template for estimation cost functions.
+
+dynRead = externalFunction(Matrix[Double] d, String fname, Integer m, Integer n)
+return (Matrix[Double] D)
+implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicReadMatrix2DCP",exectype="mem")
+
+dynWrite = externalFunction(Matrix[Double] R, String fname)
+return (Matrix[Double] D)
+implemented in (classname="org.apache.sysml.runtime.controlprogram.parfor.test.dml.DynamicWriteMatrix2DCP",exectype="mem")
+
+solve = externalFunction(Matrix[Double] A, Matrix[Double] y)
+return (Matrix[Double] b)
+implemented in (classname="org.apache.sysml.packagesupport.LinearSolverWrapperCP",exectype="mem")
+
+k = %numModels%;
+m = -1;
+n = -1;
+
+dummy = matrix(1,rows=1,cols=1);
+
+for( i in 1:k, par=8, mode=LOCAL )
+{
+ sin1 = "./conf/PerfTestTool/"+i+"_in1.csv";
+ sin2 = "./conf/PerfTestTool/"+i+"_in2.csv";
+
+ D = dynRead( dummy, sin1, m, n );
+ y = dynRead( dummy, sin2, m, 1 );
+
+ A = t(D) %*% D; # X'X
+ b = t(D) %*% y; # X'y
+ beta = solve(A,b);
+
+ sout = "./conf/PerfTestTool/"+i+"_out.csv";
+
+ X=dynWrite( beta, sout );
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
index 1dd419f..13c68e2 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/BroadcastObject.java
@@ -1,64 +1,64 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.data;
-
-import java.lang.ref.SoftReference;
-
-import org.apache.spark.broadcast.Broadcast;
-
-public class BroadcastObject extends LineageObject
-{
- //soft reference storage for graceful cleanup in case of memory pressure
- private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null;
-
- public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName )
- {
- _bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar);
- _varName = varName;
- }
-
- /**
- *
- * @return
- */
- public PartitionedBroadcastMatrix getBroadcast()
- {
- return _bcHandle.get();
- }
-
- /**
- *
- * @return
- */
- public boolean isValid()
- {
- //check for evicted soft reference
- PartitionedBroadcastMatrix pbm = _bcHandle.get();
- if( pbm == null )
- return false;
-
- //check for validity of individual broadcasts
- Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts();
- for( Broadcast<PartitionedMatrixBlock> bc : tmp )
- if( !bc.isValid() )
- return false;
- return true;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.data;
+
+import java.lang.ref.SoftReference;
+
+import org.apache.spark.broadcast.Broadcast;
+
+public class BroadcastObject extends LineageObject
+{
+ //soft reference storage for graceful cleanup in case of memory pressure
+ private SoftReference<PartitionedBroadcastMatrix> _bcHandle = null;
+
+ public BroadcastObject( PartitionedBroadcastMatrix bvar, String varName )
+ {
+ _bcHandle = new SoftReference<PartitionedBroadcastMatrix>(bvar);
+ _varName = varName;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public PartitionedBroadcastMatrix getBroadcast()
+ {
+ return _bcHandle.get();
+ }
+
+ /**
+ *
+ * @return
+ */
+ public boolean isValid()
+ {
+ //check for evicted soft reference
+ PartitionedBroadcastMatrix pbm = _bcHandle.get();
+ if( pbm == null )
+ return false;
+
+ //check for validity of individual broadcasts
+ Broadcast<PartitionedMatrixBlock>[] tmp = pbm.getBroadcasts();
+ for( Broadcast<PartitionedMatrixBlock> bc : tmp )
+ if( !bc.isValid() )
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
index b2bb62c..bcf37bb 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/LineageObject.java
@@ -1,83 +1,83 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.data;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-
-public abstract class LineageObject
-{
-
- //basic lineage information
- protected int _numRef = -1;
- protected List<LineageObject> _childs = null;
- protected String _varName = null;
-
- //N:1 back reference to matrix object
- protected MatrixObject _mo = null;
-
- protected LineageObject()
- {
- _numRef = 0;
- _childs = new ArrayList<LineageObject>();
- }
-
- public String getVarName() {
- return _varName;
- }
-
- public int getNumReferences()
- {
- return _numRef;
- }
-
- public void setBackReference(MatrixObject mo)
- {
- _mo = mo;
- }
-
- public boolean hasBackReference()
- {
- return (_mo != null);
- }
-
- public void incrementNumReferences()
- {
- _numRef++;
- }
-
- public void decrementNumReferences()
- {
- _numRef--;
- }
-
- public List<LineageObject> getLineageChilds()
- {
- return _childs;
- }
-
- public void addLineageChild(LineageObject lob)
- {
- lob.incrementNumReferences();
- _childs.add( lob );
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+
+public abstract class LineageObject
+{
+
+ //basic lineage information
+ protected int _numRef = -1;
+ protected List<LineageObject> _childs = null;
+ protected String _varName = null;
+
+ //N:1 back reference to matrix object
+ protected MatrixObject _mo = null;
+
+ protected LineageObject()
+ {
+ _numRef = 0;
+ _childs = new ArrayList<LineageObject>();
+ }
+
+ public String getVarName() {
+ return _varName;
+ }
+
+ public int getNumReferences()
+ {
+ return _numRef;
+ }
+
+ public void setBackReference(MatrixObject mo)
+ {
+ _mo = mo;
+ }
+
+ public boolean hasBackReference()
+ {
+ return (_mo != null);
+ }
+
+ public void incrementNumReferences()
+ {
+ _numRef++;
+ }
+
+ public void decrementNumReferences()
+ {
+ _numRef--;
+ }
+
+ public List<LineageObject> getLineageChilds()
+ {
+ return _childs;
+ }
+
+ public void addLineageChild(LineageObject lob)
+ {
+ lob.incrementNumReferences();
+ _childs.add( lob );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
index fb7e773..605e7ca 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
@@ -1,124 +1,124 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.instructions.spark.data;
-
-import org.apache.spark.api.java.JavaPairRDD;
-
-public class RDDObject extends LineageObject
-{
-
- private JavaPairRDD<?,?> _rddHandle = null;
-
- //meta data on origin of given rdd handle
- private boolean _checkpointed = false; //created via checkpoint instruction
- private boolean _hdfsfile = false; //created from hdfs file
- private String _hdfsFname = null; //hdfs filename, if created from hdfs.
-
- public RDDObject( JavaPairRDD<?,?> rddvar, String varName)
- {
- _rddHandle = rddvar;
- _varName = varName;
- }
-
- /**
- *
- * @return
- */
- public JavaPairRDD<?,?> getRDD()
- {
- return _rddHandle;
- }
-
- public void setCheckpointRDD( boolean flag )
- {
- _checkpointed = flag;
- }
-
- public boolean isCheckpointRDD()
- {
- return _checkpointed;
- }
-
- public void setHDFSFile( boolean flag ) {
- _hdfsfile = flag;
- }
-
- public void setHDFSFilename( String fname ) {
- _hdfsFname = fname;
- }
-
- public boolean isHDFSFile() {
- return _hdfsfile;
- }
-
- public String getHDFSFilename() {
- return _hdfsFname;
- }
-
-
- /**
- * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file;
- * in both cases, we can directly read the file instead of collecting
- * the given rdd.
- *
- * @return
- */
- public boolean allowsShortCircuitRead()
- {
- boolean ret = isHDFSFile();
-
- if( isCheckpointRDD() && getLineageChilds().size() == 1 ) {
- LineageObject lo = getLineageChilds().get(0);
- ret = ( lo instanceof RDDObject && ((RDDObject)lo).isHDFSFile() );
- }
-
- return ret;
- }
-
- /**
- *
- * @return
- */
- public boolean allowsShortCircuitCollect()
- {
- return ( isCheckpointRDD() && getLineageChilds().size() == 1
- && getLineageChilds().get(0) instanceof RDDObject );
- }
-
- /**
- *
- * @return
- */
- public boolean rHasCheckpointRDDChilds()
- {
- //probe for checkpoint rdd
- if( _checkpointed )
- return true;
-
- //process childs recursively
- boolean ret = false;
- for( LineageObject lo : getLineageChilds() ) {
- if( lo instanceof RDDObject )
- ret |= ((RDDObject)lo).rHasCheckpointRDDChilds();
- }
-
- return ret;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.data;
+
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class RDDObject extends LineageObject
+{
+
+ private JavaPairRDD<?,?> _rddHandle = null;
+
+ //meta data on origin of given rdd handle
+ private boolean _checkpointed = false; //created via checkpoint instruction
+ private boolean _hdfsfile = false; //created from hdfs file
+ private String _hdfsFname = null; //hdfs filename, if created from hdfs.
+
+ public RDDObject( JavaPairRDD<?,?> rddvar, String varName)
+ {
+ _rddHandle = rddvar;
+ _varName = varName;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public JavaPairRDD<?,?> getRDD()
+ {
+ return _rddHandle;
+ }
+
+ public void setCheckpointRDD( boolean flag )
+ {
+ _checkpointed = flag;
+ }
+
+ public boolean isCheckpointRDD()
+ {
+ return _checkpointed;
+ }
+
+ public void setHDFSFile( boolean flag ) {
+ _hdfsfile = flag;
+ }
+
+ public void setHDFSFilename( String fname ) {
+ _hdfsFname = fname;
+ }
+
+ public boolean isHDFSFile() {
+ return _hdfsfile;
+ }
+
+ public String getHDFSFilename() {
+ return _hdfsFname;
+ }
+
+
+ /**
+ * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file;
+ * in both cases, we can directly read the file instead of collecting
+ * the given rdd.
+ *
+ * @return
+ */
+ public boolean allowsShortCircuitRead()
+ {
+ boolean ret = isHDFSFile();
+
+ if( isCheckpointRDD() && getLineageChilds().size() == 1 ) {
+ LineageObject lo = getLineageChilds().get(0);
+ ret = ( lo instanceof RDDObject && ((RDDObject)lo).isHDFSFile() );
+ }
+
+ return ret;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public boolean allowsShortCircuitCollect()
+ {
+ return ( isCheckpointRDD() && getLineageChilds().size() == 1
+ && getLineageChilds().get(0) instanceof RDDObject );
+ }
+
+ /**
+ *
+ * @return
+ */
+ public boolean rHasCheckpointRDDChilds()
+ {
+ //probe for checkpoint rdd
+ if( _checkpointed )
+ return true;
+
+ //process childs recursively
+ boolean ret = false;
+ for( LineageObject lo : getLineageChilds() ) {
+ if( lo instanceof RDDObject )
+ ret |= ((RDDObject)lo).rHasCheckpointRDDChilds();
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
index e561f3c..6cb0830 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/GroupedAggMRCombiner.java
@@ -1,167 +1,167 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.mapred;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.sysml.runtime.functionobjects.CM;
-import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
-import org.apache.sysml.runtime.instructions.cp.KahanObject;
-import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction;
-import org.apache.sysml.runtime.matrix.data.TaggedMatrixIndexes;
-import org.apache.sysml.runtime.matrix.data.WeightedCell;
-import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
-import org.apache.sysml.runtime.matrix.operators.CMOperator;
-import org.apache.sysml.runtime.matrix.operators.Operator;
-
-
-public class GroupedAggMRCombiner extends ReduceBase
- implements Reducer<TaggedMatrixIndexes, WeightedCell, TaggedMatrixIndexes, WeightedCell>
-{
- //grouped aggregate instructions
- private HashMap<Byte, GroupedAggregateInstruction> grpaggInstructions = new HashMap<Byte, GroupedAggregateInstruction>();
-
- //reused intermediate objects
- private CM_COV_Object cmObj = new CM_COV_Object();
- private HashMap<Byte, CM> cmFn = new HashMap<Byte, CM>();
- private WeightedCell outCell = new WeightedCell();
-
- @Override
- public void reduce(TaggedMatrixIndexes key, Iterator<WeightedCell> values,
- OutputCollector<TaggedMatrixIndexes, WeightedCell> out, Reporter reporter)
- throws IOException
- {
- long start = System.currentTimeMillis();
-
- //get aggregate operator
- GroupedAggregateInstruction ins = grpaggInstructions.get(key.getTag());
- Operator op = ins.getOperator();
- boolean isPartialAgg = true;
-
- //combine iterator to single value
- try
- {
- if(op instanceof CMOperator) //everything except sum
- {
- if( ((CMOperator) op).isPartialAggregateOperator() )
- {
- cmObj.reset();
- CM lcmFn = cmFn.get(key.getTag());
-
- //partial aggregate cm operator
- while( values.hasNext() )
- {
- WeightedCell value=values.next();
- lcmFn.execute(cmObj, value.getValue(), value.getWeight());
- }
-
- outCell.setValue(cmObj.getRequiredPartialResult(op));
- outCell.setWeight(cmObj.getWeight());
- }
- else //forward tuples to reducer
- {
- isPartialAgg = false;
- while( values.hasNext() )
- out.collect(key, values.next());
- }
- }
- else if(op instanceof AggregateOperator) //sum
- {
- AggregateOperator aggop=(AggregateOperator) op;
-
- if( aggop.correctionExists )
- {
- KahanObject buffer=new KahanObject(aggop.initialValue, 0);
-
- KahanPlus.getKahanPlusFnObject();
-
- //partial aggregate with correction
- while( values.hasNext() )
- {
- WeightedCell value=values.next();
- aggop.increOp.fn.execute(buffer, value.getValue()*value.getWeight());
- }
-
- outCell.setValue(buffer._sum);
- outCell.setWeight(1);
- }
- else //no correction
- {
- double v = aggop.initialValue;
-
- //partial aggregate without correction
- while(values.hasNext())
- {
- WeightedCell value=values.next();
- v=aggop.increOp.fn.execute(v, value.getValue()*value.getWeight());
- }
-
- outCell.setValue(v);
- outCell.setWeight(1);
- }
- }
- else
- throw new IOException("Unsupported operator in instruction: " + ins);
- }
- catch(Exception ex)
- {
- throw new IOException(ex);
- }
-
- //collect the output (to reducer)
- if( isPartialAgg )
- out.collect(key, outCell);
-
- reporter.incrCounter(Counters.COMBINE_OR_REDUCE_TIME, System.currentTimeMillis()-start);
- }
-
- @Override
- public void configure(JobConf job)
- {
- try
- {
- GroupedAggregateInstruction[] grpaggIns = MRJobConfiguration.getGroupedAggregateInstructions(job);
- if( grpaggIns != null )
- for(GroupedAggregateInstruction ins : grpaggIns)
- {
- grpaggInstructions.put(ins.output, ins);
- if( ins.getOperator() instanceof CMOperator )
- cmFn.put(ins.output, CM.getCMFnObject(((CMOperator)ins.getOperator()).getAggOpType()));
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close()
- {
- //do nothing, overrides unnecessary handling in superclass
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.mapred;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.sysml.runtime.functionobjects.CM;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction;
+import org.apache.sysml.runtime.matrix.data.TaggedMatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.WeightedCell;
+import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysml.runtime.matrix.operators.CMOperator;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+
+
+public class GroupedAggMRCombiner extends ReduceBase
+ implements Reducer<TaggedMatrixIndexes, WeightedCell, TaggedMatrixIndexes, WeightedCell>
+{
+ //grouped aggregate instructions
+ private HashMap<Byte, GroupedAggregateInstruction> grpaggInstructions = new HashMap<Byte, GroupedAggregateInstruction>();
+
+ //reused intermediate objects
+ private CM_COV_Object cmObj = new CM_COV_Object();
+ private HashMap<Byte, CM> cmFn = new HashMap<Byte, CM>();
+ private WeightedCell outCell = new WeightedCell();
+
+ @Override
+ public void reduce(TaggedMatrixIndexes key, Iterator<WeightedCell> values,
+ OutputCollector<TaggedMatrixIndexes, WeightedCell> out, Reporter reporter)
+ throws IOException
+ {
+ long start = System.currentTimeMillis();
+
+ //get aggregate operator
+ GroupedAggregateInstruction ins = grpaggInstructions.get(key.getTag());
+ Operator op = ins.getOperator();
+ boolean isPartialAgg = true;
+
+ //combine iterator to single value
+ try
+ {
+ if(op instanceof CMOperator) //everything except sum
+ {
+ if( ((CMOperator) op).isPartialAggregateOperator() )
+ {
+ cmObj.reset();
+ CM lcmFn = cmFn.get(key.getTag());
+
+ //partial aggregate cm operator
+ while( values.hasNext() )
+ {
+ WeightedCell value=values.next();
+ lcmFn.execute(cmObj, value.getValue(), value.getWeight());
+ }
+
+ outCell.setValue(cmObj.getRequiredPartialResult(op));
+ outCell.setWeight(cmObj.getWeight());
+ }
+ else //forward tuples to reducer
+ {
+ isPartialAgg = false;
+ while( values.hasNext() )
+ out.collect(key, values.next());
+ }
+ }
+ else if(op instanceof AggregateOperator) //sum
+ {
+ AggregateOperator aggop=(AggregateOperator) op;
+
+ if( aggop.correctionExists )
+ {
+ KahanObject buffer=new KahanObject(aggop.initialValue, 0);
+
+ KahanPlus.getKahanPlusFnObject();
+
+ //partial aggregate with correction
+ while( values.hasNext() )
+ {
+ WeightedCell value=values.next();
+ aggop.increOp.fn.execute(buffer, value.getValue()*value.getWeight());
+ }
+
+ outCell.setValue(buffer._sum);
+ outCell.setWeight(1);
+ }
+ else //no correction
+ {
+ double v = aggop.initialValue;
+
+ //partial aggregate without correction
+ while(values.hasNext())
+ {
+ WeightedCell value=values.next();
+ v=aggop.increOp.fn.execute(v, value.getValue()*value.getWeight());
+ }
+
+ outCell.setValue(v);
+ outCell.setWeight(1);
+ }
+ }
+ else
+ throw new IOException("Unsupported operator in instruction: " + ins);
+ }
+ catch(Exception ex)
+ {
+ throw new IOException(ex);
+ }
+
+ //collect the output (to reducer)
+ if( isPartialAgg )
+ out.collect(key, outCell);
+
+ reporter.incrCounter(Counters.COMBINE_OR_REDUCE_TIME, System.currentTimeMillis()-start);
+ }
+
+ @Override
+ public void configure(JobConf job)
+ {
+ try
+ {
+ GroupedAggregateInstruction[] grpaggIns = MRJobConfiguration.getGroupedAggregateInstructions(job);
+ if( grpaggIns != null )
+ for(GroupedAggregateInstruction ins : grpaggIns)
+ {
+ grpaggInstructions.put(ins.output, ins);
+ if( ins.getOperator() instanceof CMOperator )
+ cmFn.put(ins.output, CM.getCMFnObject(((CMOperator)ins.getOperator()).getAggOpType()));
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ //do nothing, overrides unnecessary handling in superclass
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/816e2db8/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
index a08631d..fa9843a 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/IndexSortComparable.java
@@ -1,84 +1,84 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.matrix.sort;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparable;
-
-@SuppressWarnings("rawtypes")
-public class IndexSortComparable implements WritableComparable
-{
-
- protected DoubleWritable _dval = null;
- protected LongWritable _lval = null;
-
- public IndexSortComparable()
- {
- _dval = new DoubleWritable();
- _lval = new LongWritable();
- }
-
- public void set(double dval, long lval)
- {
- _dval.set(dval);
- _lval.set(lval);
- }
-
- @Override
- public void readFields(DataInput arg0)
- throws IOException
- {
- _dval.readFields(arg0);
- _lval.readFields(arg0);
- }
-
- @Override
- public void write(DataOutput arg0)
- throws IOException
- {
- _dval.write(arg0);
- _lval.write(arg0);
- }
-
- @Override
- public int compareTo(Object o)
- {
- //compare only double value (e.g., for partitioner)
- if( o instanceof DoubleWritable ) {
- return _dval.compareTo((DoubleWritable) o);
- }
- //compare double value and index (e.g., for stable sort)
- else if( o instanceof IndexSortComparable) {
- IndexSortComparable that = (IndexSortComparable)o;
- int tmp = _dval.compareTo(that._dval);
- if( tmp==0 ) //secondary sort
- tmp = _lval.compareTo(that._lval);
- return tmp;
- }
- else {
- throw new RuntimeException("Unsupported comparison involving class: "+o.getClass().getName());
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.matrix.sort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+@SuppressWarnings("rawtypes")
+public class IndexSortComparable implements WritableComparable
+{
+
+ protected DoubleWritable _dval = null;
+ protected LongWritable _lval = null;
+
+ public IndexSortComparable()
+ {
+ _dval = new DoubleWritable();
+ _lval = new LongWritable();
+ }
+
+ public void set(double dval, long lval)
+ {
+ _dval.set(dval);
+ _lval.set(lval);
+ }
+
+ @Override
+ public void readFields(DataInput arg0)
+ throws IOException
+ {
+ _dval.readFields(arg0);
+ _lval.readFields(arg0);
+ }
+
+ @Override
+ public void write(DataOutput arg0)
+ throws IOException
+ {
+ _dval.write(arg0);
+ _lval.write(arg0);
+ }
+
+ @Override
+ public int compareTo(Object o)
+ {
+ //compare only double value (e.g., for partitioner)
+ if( o instanceof DoubleWritable ) {
+ return _dval.compareTo((DoubleWritable) o);
+ }
+ //compare double value and index (e.g., for stable sort)
+ else if( o instanceof IndexSortComparable) {
+ IndexSortComparable that = (IndexSortComparable)o;
+ int tmp = _dval.compareTo(that._dval);
+ if( tmp==0 ) //secondary sort
+ tmp = _lval.compareTo(that._lval);
+ return tmp;
+ }
+ else {
+ throw new RuntimeException("Unsupported comparison involving class: "+o.getClass().getName());
+ }
+ }
+}