You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:45:33 UTC
[02/78] [abbrv] [partial] incubator-systemml git commit: Move files
to new package folder structure
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemotePartitioning.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemotePartitioning.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemotePartitioning.java
deleted file mode 100644
index d93949f..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemotePartitioning.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
-
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-
-public class ResultMergeRemotePartitioning implements Partitioner<ResultMergeTaggedMatrixIndexes, TaggedMatrixBlock>
-{
-
-
- private long _numColBlocks = -1;
-
-
- @Override
- public int getPartition(ResultMergeTaggedMatrixIndexes key, TaggedMatrixBlock val, int numPartitions)
- {
- //MB: Result merge might deal with lots of data but only few
- //different indexes (many worker result blocks for one final
- //result block). Hence, balanced partitioning it even more important
- //and unfortunately, our default hash function results in significant
- //load imbalance for those cases. Based on the known result dimensions
- //we can create a better partitioning scheme. However, it still makes
- //the assumption that there is no sparsity skew between blocks.
-
- MatrixIndexes ix = key.getIndexes();
- int blockid = (int) (ix.getRowIndex() * _numColBlocks + ix.getColumnIndex());
- int partition = blockid % numPartitions;
-
- //int hash = key.getIndexes().hashCode();
- //int partition = hash % numPartitions;
-
- return partition;
- }
-
- @Override
- public void configure(JobConf job)
- {
- long[] tmp = MRJobConfiguration.getResultMergeMatrixCharacteristics( job );
- long clen = tmp[1];
- int bclen = (int) tmp[3];
- _numColBlocks = clen/bclen + ((clen%bclen!=0)? 1 : 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
deleted file mode 100644
index 5066a82..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixCell;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.DataConverter;
-
-/**
- * Remote result merge reducer that receives all worker results partitioned by
- * cell index or blockindex and merges all results. Due to missing resettable iterators
- * in the old mapred API we need to spill parts of the value list to disk before merging
- * in case of binaryblock.
- *
- */
-public class ResultMergeRemoteReducer
- implements Reducer<Writable, Writable, Writable, Writable>
-{
-
- private ResultMergeReducer _reducer = null;
-
- public ResultMergeRemoteReducer( )
- {
-
- }
-
- @Override
- public void reduce(Writable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException
- {
- _reducer.processKeyValueList(key, valueList, out, reporter);
- }
-
- /**
- *
- */
- public void configure(JobConf job)
- {
- InputInfo ii = MRJobConfiguration.getResultMergeInputInfo(job);
- String compareFname = MRJobConfiguration.getResultMergeInfoCompareFilename(job);
-
- //determine compare required
- boolean requiresCompare = false;
- if( !compareFname.equals("null") )
- requiresCompare = true;
-
- if( ii == InputInfo.TextCellInputInfo )
- _reducer = new ResultMergeReducerTextCell(requiresCompare);
- else if( ii == InputInfo.BinaryCellInputInfo )
- _reducer = new ResultMergeReducerBinaryCell(requiresCompare);
- else if( ii == InputInfo.BinaryBlockInputInfo )
- _reducer = new ResultMergeReducerBinaryBlock(requiresCompare);
- else
- throw new RuntimeException("Unable to configure mapper with unknown input info: "+ii.toString());
- }
-
- /**
- *
- */
- @Override
- public void close() throws IOException
- {
- //do nothing
- }
-
-
- private interface ResultMergeReducer //interface in order to allow ResultMergeReducerBinaryBlock to inherit from ResultMerge
- {
- void processKeyValueList( Writable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter )
- throws IOException;
- }
-
- private static class ResultMergeReducerTextCell implements ResultMergeReducer
- {
- private boolean _requiresCompare;
- private StringBuilder _sb = null;
- private Text _objValue = null;
-
- public ResultMergeReducerTextCell(boolean requiresCompare)
- {
- _requiresCompare = requiresCompare;
- _sb = new StringBuilder();
- _objValue = new Text();
- }
-
- @Override
- public void processKeyValueList(Writable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException
- {
- //with compare
- if( _requiresCompare )
- {
- // NOTES MB:
- // 1) the old mapred api does not support multiple scans (reset/mark),
- // once we switch to the new api, we can use the resetableiterator for doing
- // the two required scans (finding the compare obj, compare all other objs)
- // 2) for 'textcell' we assume that the entire valueList fits into main memory
- // this is valid as we group by cells, i.e., we would need millions of input files
- // to exceed the usual 100-600MB per reduce task.
-
- //scan for compare object (incl result merge if compare available)
- MatrixIndexes key2 = (MatrixIndexes) key;
- Double cellCompare = null;
- Collection<Double> cellList = new LinkedList<Double>();
- boolean found = false;
- while( valueList.hasNext() ) {
- TaggedMatrixCell tVal = (TaggedMatrixCell) valueList.next();
- double lvalue = ((MatrixCell)tVal.getBaseObject()).getValue();
- if( tVal.getTag()==ResultMergeRemoteMR.COMPARE_TAG )
- cellCompare = lvalue;
- else
- {
- if( cellCompare == null )
- cellList.add( lvalue );
- else if( cellCompare.doubleValue()!=lvalue ) //compare on the fly
- {
- _sb.append(key2.getRowIndex());
- _sb.append(' ');
- _sb.append(key2.getColumnIndex());
- _sb.append(' ');
- _sb.append(lvalue);
- _objValue.set( _sb.toString() );
- _sb.setLength(0);
- out.collect(NullWritable.get(), _objValue );
- found = true;
- break; //only one write per cell possible (independence)
- }// note: objs with equal value are directly discarded
- }
- }
-
- //result merge for objs before compare
- if( !found )
- for( Double c : cellList )
- if( !c.equals( cellCompare ) )
- {
- _sb.append(key2.getRowIndex());
- _sb.append(' ');
- _sb.append(key2.getColumnIndex());
- _sb.append(' ');
- _sb.append(c.doubleValue());
- _objValue.set( _sb.toString() );
- _sb.setLength(0);
- out.collect(NullWritable.get(), _objValue );
- break; //only one write per cell possible (independence)
- }
- }
- //without compare
- else
- {
- MatrixIndexes key2 = (MatrixIndexes) key;
- while( valueList.hasNext() )
- {
- TaggedMatrixCell tVal = (TaggedMatrixCell) valueList.next();
- MatrixCell value = (MatrixCell) tVal.getBaseObject();
-
- _sb.append(key2.getRowIndex());
- _sb.append(' ');
- _sb.append(key2.getColumnIndex());
- _sb.append(' ');
- _sb.append(value.getValue());
- _objValue.set( _sb.toString() );
- _sb.setLength(0);
- out.collect(NullWritable.get(), _objValue );
- break; //only one write per cell possible (independence)
- }
- }
-
-
- }
- }
-
- private static class ResultMergeReducerBinaryCell implements ResultMergeReducer
- {
- private boolean _requiresCompare;
- private MatrixCell _objValue;
-
- public ResultMergeReducerBinaryCell(boolean requiresCompare)
- {
- _requiresCompare = requiresCompare;
- _objValue = new MatrixCell();
- }
-
- @Override
- public void processKeyValueList(Writable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException
- {
- //with compare
- if( _requiresCompare )
- {
- // NOTES MB:
- // 1) the old mapred api does not support multiple scans (reset/mark),
- // once we switch to the new api, we can use the resetableiterator for doing
- // the two required scans (finding the compare obj, compare all other objs)
- // 2) for 'binarycell' we assume that the entire valueList fits into main memory
- // this is valid as we group by cells, i.e., we would need millions of input files
- // to exceed the usual 100-600MB per reduce task.
-
- //scan for compare object (incl result merge if compare available)
- Double cellCompare = null;
- Collection<Double> cellList = new LinkedList<Double>();
- boolean found = false;
- while( valueList.hasNext() ) {
- TaggedMatrixCell tVal = (TaggedMatrixCell) valueList.next();
- MatrixCell cVal = (MatrixCell) tVal.getBaseObject();
- if( tVal.getTag()==ResultMergeRemoteMR.COMPARE_TAG )
- cellCompare = cVal.getValue();
- else
- {
- if( cellCompare == null )
- cellList.add( cVal.getValue() );
- else if( cellCompare.doubleValue() != cVal.getValue() ) //compare on the fly
- {
- out.collect(key, cVal );
- found = true;
- break; //only one write per cell possible (independence)
- }// note: objs with equal value are directly discarded
- }
- }
-
- //result merge for objs before compare
- if( !found )
- for( Double c : cellList )
- if( !c.equals( cellCompare) )
- {
- _objValue.setValue(c.doubleValue());
- out.collect(key, _objValue );
- break; //only one write per cell possible (independence)
- }
- }
- //without compare
- else
- {
- while( valueList.hasNext() )
- {
- TaggedMatrixCell tVal = (TaggedMatrixCell) valueList.next();
- out.collect((MatrixIndexes)key, (MatrixCell)tVal.getBaseObject());
- break; //only one write per cell possible (independence)
- }
- }
- }
- }
-
- private static class ResultMergeReducerBinaryBlock extends ResultMerge implements ResultMergeReducer
- {
- private boolean _requiresCompare;
-
- public ResultMergeReducerBinaryBlock(boolean requiresCompare)
- {
- _requiresCompare = requiresCompare;
- }
-
- @Override
- public MatrixObject executeParallelMerge(int par)
- throws DMLRuntimeException
- {
- throw new DMLRuntimeException("Unsupported operation.");
- }
-
- @Override
- public MatrixObject executeSerialMerge()
- throws DMLRuntimeException
- {
- throw new DMLRuntimeException("Unsupported operation.");
- }
-
- @Override
- public void processKeyValueList(Writable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
- throws IOException
- {
- try
- {
- MatrixIndexes ixOut = ((ResultMergeTaggedMatrixIndexes)key).getIndexes();
- MatrixBlock mbOut = null;
- double[][] aCompare = null;
- boolean appendOnly = false;
-
- //get and prepare compare block if required
- if( _requiresCompare )
- {
- TaggedMatrixBlock tVal = (TaggedMatrixBlock) valueList.next();
- MatrixBlock bVal = (MatrixBlock) tVal.getBaseObject();
- if( tVal.getTag()!=ResultMergeRemoteMR.COMPARE_TAG )
- throw new IOException("Failed to read compare block at expected first position.");
- aCompare = DataConverter.convertToDoubleMatrix(bVal);
- }
-
- //merge all result blocks into final result block
- while( valueList.hasNext() )
- {
- TaggedMatrixBlock tVal = (TaggedMatrixBlock) valueList.next();
- MatrixBlock bVal = (MatrixBlock) tVal.getBaseObject();
-
- if( mbOut == null ) //copy first block
- {
- mbOut = new MatrixBlock();
- mbOut.copy( bVal );
- appendOnly = mbOut.isInSparseFormat();
- }
- else //merge remaining blocks
- {
- if( _requiresCompare )
- mergeWithComp(mbOut, bVal, aCompare);
- else
- mergeWithoutComp(mbOut, bVal, appendOnly);
- }
- }
-
- //sort sparse due to append-only
- if( appendOnly )
- mbOut.sortSparseRows();
-
- //change sparsity if required after
- mbOut.examSparsity();
-
- out.collect(ixOut, mbOut);
- }
- catch( Exception ex )
- {
- throw new IOException(ex);
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
deleted file mode 100644
index 4f81764..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-public class ResultMergeRemoteSorting extends WritableComparator
-{
-
-
- protected ResultMergeRemoteSorting()
- {
- super(ResultMergeTaggedMatrixIndexes.class, true);
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public int compare(WritableComparable k1, WritableComparable k2)
- {
- ResultMergeTaggedMatrixIndexes key1 = (ResultMergeTaggedMatrixIndexes)k1;
- ResultMergeTaggedMatrixIndexes key2 = (ResultMergeTaggedMatrixIndexes)k2;
-
- int ret = key1.getIndexes().compareTo(key2.getIndexes());
- if( ret == 0 ) //same indexes, secondary sort
- {
- ret = ((key1.getTag() == key2.getTag()) ? 0 :
- (key1.getTag() < key2.getTag())? -1 : 1);
- }
-
- return ret;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
deleted file mode 100644
index 6765bc3..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-
-import org.apache.spark.api.java.JavaPairRDD;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.context.ExecutionContext;
-import com.ibm.bi.dml.runtime.controlprogram.context.SparkExecutionContext;
-import com.ibm.bi.dml.runtime.instructions.spark.data.RDDObject;
-import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixFormatMetaData;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.utils.Statistics;
-
-/**
- * MR job class for submitting parfor result merge MR jobs.
- *
- */
-public class ResultMergeRemoteSpark extends ResultMerge
-{
-
- private ExecutionContext _ec = null;
- private int _numMappers = -1;
- private int _numReducers = -1;
-
- public ResultMergeRemoteSpark(MatrixObject out, MatrixObject[] in, String outputFilename, ExecutionContext ec, int numMappers, int numReducers)
- {
- super(out, in, outputFilename);
-
- _ec = ec;
- _numMappers = numMappers;
- _numReducers = numReducers;
- }
-
- @Override
- public MatrixObject executeSerialMerge()
- throws DMLRuntimeException
- {
- //graceful degradation to parallel merge
- return executeParallelMerge( _numMappers );
- }
-
- @Override
- public MatrixObject executeParallelMerge(int par)
- throws DMLRuntimeException
- {
- MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-
- LOG.trace("ResultMerge (remote, spark): Execute serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
-
- try
- {
- if( _inputs != null && _inputs.length>0 )
- {
- //prepare compare
- MatrixFormatMetaData metadata = (MatrixFormatMetaData) _output.getMetaData();
- MatrixCharacteristics mcOld = metadata.getMatrixCharacteristics();
- MatrixObject compare = (mcOld.getNonZeros()==0) ? null : _output;
-
- //actual merge
- RDDObject ro = executeMerge(compare, _inputs, _output.getVarName(), mcOld.getRows(), mcOld.getCols(), mcOld.getRowsPerBlock(), mcOld.getColsPerBlock());
-
- //create new output matrix (e.g., to prevent potential export<->read file access conflict
- String varName = _output.getVarName();
- ValueType vt = _output.getValueType();
- moNew = new MatrixObject( vt, _outputFName );
- moNew.setVarName( varName.contains(NAME_SUFFIX) ? varName : varName+NAME_SUFFIX );
- moNew.setDataType( DataType.MATRIX );
- OutputInfo oiOld = metadata.getOutputInfo();
- InputInfo iiOld = metadata.getInputInfo();
- MatrixCharacteristics mc = new MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
- mcOld.getRowsPerBlock(),mcOld.getColsPerBlock());
- mc.setNonZeros( computeNonZeros(_output, convertToList(_inputs)) );
- MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,oiOld,iiOld);
- moNew.setMetaData( meta );
- moNew.setRDDHandle( ro );
- }
- else
- {
- moNew = _output; //return old matrix, to prevent copy
- }
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
-
- return moNew;
- }
-
- /**
- *
- * @param fname null if no comparison required
- * @param fnameNew
- * @param srcFnames
- * @param ii
- * @param oi
- * @param rlen
- * @param clen
- * @param brlen
- * @param bclen
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("unchecked")
- protected RDDObject executeMerge(MatrixObject compare, MatrixObject[] inputs, String varname, long rlen, long clen, int brlen, int bclen)
- throws DMLRuntimeException
- {
- String jobname = "ParFor-RMSP";
- long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-
- SparkExecutionContext sec = (SparkExecutionContext)_ec;
- boolean withCompare = (compare!=null);
-
- RDDObject ret = null;
-
- //determine degree of parallelism
- int numRed = (int)determineNumReducers(rlen, clen, brlen, bclen, _numReducers);
-
- //sanity check for empty src files
- if( inputs == null || inputs.length==0 )
- throw new DMLRuntimeException("Execute merge should never be called with no inputs.");
-
- try
- {
- //Step 1: union over all results
- JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
- sec.getRDDHandleForMatrixObject(_inputs[0], InputInfo.BinaryBlockInputInfo);
- for( int i=1; i<_inputs.length; i++ ) {
- JavaPairRDD<MatrixIndexes, MatrixBlock> rdd2 = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
- sec.getRDDHandleForMatrixObject(_inputs[i], InputInfo.BinaryBlockInputInfo);
- rdd = rdd.union(rdd2);
- }
-
- //Step 2a: merge with compare
- JavaPairRDD<MatrixIndexes, MatrixBlock> out = null;
- if( withCompare )
- {
- JavaPairRDD<MatrixIndexes, MatrixBlock> compareRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
- sec.getRDDHandleForMatrixObject(compare, InputInfo.BinaryBlockInputInfo);
-
- //merge values which differ from compare values
- ResultMergeRemoteSparkWCompare cfun = new ResultMergeRemoteSparkWCompare();
- out = rdd.groupByKey(numRed) //group all result blocks per key
- .join(compareRdd) //join compare block and result blocks
- .mapToPair(cfun); //merge result blocks w/ compare
- }
- //Step 2b: merge without compare
- else
- {
- //direct merge in any order (disjointness guaranteed)
- out = RDDAggregateUtils.mergeByKey(rdd);
- }
-
- //Step 3: create output rdd handle w/ lineage
- ret = new RDDObject(out, varname);
- for( int i=0; i<_inputs.length; i++ ) {
- //child rdd handles guaranteed to exist
- RDDObject child = _inputs[i].getRDDHandle();
- ret.addLineageChild(child);
- }
- }
- catch( Exception ex )
- {
- throw new DMLRuntimeException(ex);
- }
-
- //maintain statistics
- Statistics.incrementNoOfCompiledSPInst();
- Statistics.incrementNoOfExecutedSPInst();
- if( DMLScript.STATISTICS ){
- Statistics.maintainCPHeavyHitters(jobname, System.nanoTime()-t0);
- }
-
- return ret;
- }
-
- /**
- *
- * @param rlen
- * @param clen
- * @param brlen
- * @param bclen
- * @param numRed
- * @return
- */
- private int determineNumReducers(long rlen, long clen, int brlen, int bclen, long numRed)
- {
- //set the number of mappers and reducers
- long reducerGroups = Math.max(rlen/brlen,1) * Math.max(clen/bclen, 1);
- int ret = (int)Math.min( numRed, reducerGroups );
-
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
deleted file mode 100644
index 7df2cd1..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.Iterator;
-
-import org.apache.spark.api.java.function.PairFunction;
-
-import scala.Tuple2;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.util.DataConverter;
-
-/**
- *
- */
-public class ResultMergeRemoteSparkWCompare extends ResultMerge implements PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<MatrixBlock>,MatrixBlock>>, MatrixIndexes, MatrixBlock>
-{
-
- private static final long serialVersionUID = -5970805069405942836L;
-
- @Override
- public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, Tuple2<Iterable<MatrixBlock>, MatrixBlock>> arg)
- throws Exception
- {
- MatrixIndexes ixin = arg._1();
- Iterator<MatrixBlock> din = arg._2()._1().iterator();
- MatrixBlock cin = arg._2()._2();
-
- //create compare array
- double[][] compare = DataConverter.convertToDoubleMatrix(cin);
-
- //merge all blocks into compare block
- MatrixBlock out = new MatrixBlock(cin);
- while( din.hasNext() )
- mergeWithComp(out, din.next(), compare);
-
- //create output tuple
- return new Tuple2<MatrixIndexes,MatrixBlock>(new MatrixIndexes(ixin), out);
- }
-
- @Override
- public MatrixObject executeSerialMerge()
- throws DMLRuntimeException
- {
- throw new DMLRuntimeException("Unsupported operation.");
- }
-
- @Override
- public MatrixObject executeParallelMerge(int par)
- throws DMLRuntimeException
- {
- throw new DMLRuntimeException("Unsupported operation.");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
deleted file mode 100644
index af87aa3..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-
-/**
- * This class serves as composite key for the remote result merge job
- * (for any data format) in order to sort on both matrix indexes and tag
- * but group all blocks according to matrix indexes only. This prevents
- * us from doing an 2pass out-of-core algorithm at the reducer since we
- * can guarantee that the compare block (tag 0) will be the first element
- * in the iterator.
- *
- */
-public class ResultMergeTaggedMatrixIndexes implements WritableComparable<ResultMergeTaggedMatrixIndexes>
-{
-
- private MatrixIndexes _ix;
- private byte _tag = -1;
-
- public ResultMergeTaggedMatrixIndexes()
- {
- _ix = new MatrixIndexes();
- }
-
- public ResultMergeTaggedMatrixIndexes(long r, long c, byte tag)
- {
- _ix = new MatrixIndexes(r, c);
- _tag = tag;
- }
-
- public MatrixIndexes getIndexes()
- {
- return _ix;
- }
-
-
- public byte getTag()
- {
- return _tag;
- }
-
- public void setTag(byte tag)
- {
- _tag = tag;
- }
-
- @Override
- public void readFields(DataInput in)
- throws IOException
- {
- if( _ix == null )
- _ix = new MatrixIndexes();
- _ix.readFields(in);
- _tag = in.readByte();
- }
-
- @Override
- public void write(DataOutput out)
- throws IOException
- {
- _ix.write(out);
- out.writeByte(_tag);
- }
-
- @Override
- public int compareTo(ResultMergeTaggedMatrixIndexes that)
- {
- int ret = _ix.compareTo(that._ix);
-
- if( ret == 0 )
- {
- ret = ((_tag == that._tag) ? 0 :
- (_tag < that._tag)? -1 : 1);
- }
-
- return ret;
- }
-
- @Override
- public boolean equals(Object other)
- {
- if( !(other instanceof ResultMergeTaggedMatrixIndexes) )
- return false;
-
- ResultMergeTaggedMatrixIndexes that = (ResultMergeTaggedMatrixIndexes)other;
- return (_ix.equals(that._ix) && _tag == that._tag);
- }
-
- @Override
- public int hashCode() {
- throw new RuntimeException("hashCode() should never be called on instances of this class.");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/Task.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/Task.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/Task.java
deleted file mode 100644
index c2ff9ac..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/Task.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.StringTokenizer;
-
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * A task is a logical group of one or multiple iterations (each iteration is assigned to exactly one task).
- * There, each single task is executed sequentially. See TaskPartitioner for how tasks are created and
- * ParWorker for how those tasks are eventually executed.
- *
- * NOTE: (Extension possibility: group of statements)
- *
- */
-public class Task implements Serializable
-{
-
- private static final long serialVersionUID = 2815832451487164284L;
-
- public enum TaskType {
- RANGE,
- SET
- }
-
- public static final int MAX_VARNAME_SIZE = 256;
- public static final int MAX_TASK_SIZE = Integer.MAX_VALUE-1;
-
- private TaskType _type;
- private LinkedList<IntObject> _iterations; //each iteration is specified as an ordered set of index values
-
- public Task() {
- //default constructor for serialize
- }
-
- public Task( TaskType type )
- {
- _type = type;
-
- _iterations = new LinkedList<IntObject>();
- }
-
- public void addIteration( IntObject indexVal )
- {
- if( indexVal.getName().length() > MAX_VARNAME_SIZE )
- throw new RuntimeException("Cannot add iteration, MAX_VARNAME_SIZE exceeded.");
-
- if( size() >= MAX_TASK_SIZE )
- throw new RuntimeException("Cannot add iteration, MAX_TASK_SIZE reached.");
-
- _iterations.addLast( indexVal );
- }
-
- public List<IntObject> getIterations()
- {
- return _iterations;
- }
-
- public TaskType getType()
- {
- return _type;
- }
-
- public int size()
- {
- return _iterations.size();
- }
-
- /**
- *
- * @param task
- */
- public void mergeTask( Task task )
- {
- //check for set iteration type
- if( _type==TaskType.RANGE )
- throw new RuntimeException("Task Merging not supported for tasks of type ITERATION_RANGE.");
-
- //check for same iteration name
- String var1 = _iterations.getFirst().getName();
- String var2 = task._iterations.getFirst().getName();
- if( !var1.equals(var2) )
- throw new RuntimeException("Task Merging not supported for tasks with different variable names");
-
- //merge tasks
- for( IntObject o : task._iterations )
- _iterations.addLast( o );
- }
-
-
- @Override
- public String toString()
- {
- return toFormatedString();
- }
-
- /**
- *
- * @return
- */
- public String toFormatedString()
- {
- StringBuilder sb = new StringBuilder();
- sb.append("task (type=");
- sb.append(_type);
- sb.append(", iterations={");
- int count=0;
- for( IntObject dat : _iterations )
- {
- if( count!=0 )
- sb.append(";");
- sb.append("[");
- sb.append(dat.getName());
- sb.append("=");
- sb.append(dat.getLongValue());
- sb.append("]");
-
- count++;
- }
- sb.append("})");
- return sb.toString();
- }
-
- /**
- *
- * @return
- */
- public String toCompactString()
- {
- StringBuilder sb = new StringBuilder( );
- sb.append(_type);
-
- if( size() > 0 )
- {
- sb.append(".");
- IntObject dat0 = _iterations.getFirst();
- sb.append(dat0.getName());
- sb.append(".{");
-
- int count = 0;
- for( IntObject dat : _iterations )
- {
- if( count!=0 )
- sb.append(",");
- sb.append(dat.getLongValue());
- count++;
- }
-
- sb.append("}");
- }
-
- return sb.toString();
- }
-
- /**
- *
- * @return
- */
- public String toCompactString( int maxDigits )
- {
- StringBuilder sb = new StringBuilder( );
- sb.append(_type);
-
- if( size() > 0 )
- {
- sb.append(".");
- IntObject dat0 = _iterations.getFirst();
- sb.append(dat0.getName());
- sb.append(".{");
-
- int count = 0;
- for( IntObject dat : _iterations )
- {
- if( count!=0 )
- sb.append(",");
-
- String tmp = String.valueOf(dat.getLongValue());
- for( int k=tmp.length(); k<maxDigits; k++ )
- sb.append("0");
- sb.append(tmp);
- count++;
- }
-
- sb.append("}");
- }
-
- return sb.toString();
- }
-
- /**
- *
- * @param stask
- * @return
- */
- public static Task parseCompactString( String stask )
- {
- StringTokenizer st = new StringTokenizer( stask.trim(), "." );
-
- Task newTask = new Task( TaskType.valueOf(st.nextToken()) );
- String meta = st.nextToken();
-
- //iteration data
- String sdata = st.nextToken();
- sdata = sdata.substring(1,sdata.length()-1); // remove brackets
- StringTokenizer st2 = new StringTokenizer(sdata, ",");
- while( st2.hasMoreTokens() )
- {
- //create new iteration
- String lsdata = st2.nextToken();
- IntObject ldata = new IntObject(meta,Integer.parseInt( lsdata ) );
- newTask.addIteration(ldata);
- }
-
- return newTask;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitioner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitioner.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitioner.java
deleted file mode 100644
index 0b7eb4e..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitioner.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.List;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * This is the base class for all task partitioner. For this purpose it stores relevant information such as
- * the loop specification (FROM, TO, INCR), the index variable and the task size. Furthermore, it declares two
- * prototypes: (1) full task creation, (2) streaming task creation.
- *
- * Known implementation classes: TaskPartitionerFixedsize, TaskPartitionerFactoring
- *
- */
-public abstract class TaskPartitioner
-{
-
- protected long _taskSize = -1;
-
- protected String _iterVarName = null;
- protected IntObject _fromVal = null;
- protected IntObject _toVal = null;
- protected IntObject _incrVal = null;
-
- protected long _numIter = -1;
-
-
- protected TaskPartitioner( long taskSize, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal )
- {
- _taskSize = taskSize;
-
- _iterVarName = iterVarName;
- _fromVal = fromVal;
- _toVal = toVal;
- _incrVal = incrVal;
-
- _numIter = (long)Math.ceil(((double)(_toVal.getLongValue()-_fromVal.getLongValue()+1 )) / _incrVal.getLongValue());
- }
-
- /**
- * Creates and returns set of all tasks for given problem at once.
- *
- * @return
- */
- public abstract List<Task> createTasks()
- throws DMLRuntimeException;
-
- /**
- * Creates set of all tasks for given problem, but streams them directly
- * into task queue. This allows for more tasks than fitting in main memory.
- *
- * @return
- */
- public abstract long createTasks( LocalTaskQueue<Task> queue )
- throws DMLRuntimeException;
-
-
- /**
- *
- * @return
- */
- public long getNumIterations()
- {
- return _numIter;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
deleted file mode 100644
index cd03544..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.Task.TaskType;
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * This factoring task partitioner virtually iterates over the given FOR loop (from, to, incr),
- * creates iterations and group them to tasks. Note that the task size is used here.
- * The tasks are created with decreasing size for good load balance of heterogeneous tasks.
- *
- *
- * See the original paper for details:
- * [Susan Flynn Hummel, Edith Schonberg, Lawrence E. Flynn:
- * Factoring: a practical and robust method for scheduling parallel loops.
- * SC 1991: 610-632]
- *
- */
-public class TaskPartitionerFactoring extends TaskPartitioner
-{
-
- private int _numThreads = -1;
-
- public TaskPartitionerFactoring( long taskSize, int numThreads, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal )
- {
- super(taskSize, iterVarName, fromVal, toVal, incrVal);
-
- _numThreads = numThreads;
- }
-
- @Override
- public List<Task> createTasks()
- throws DMLRuntimeException
- {
- LinkedList<Task> tasks = new LinkedList<Task>();
-
- long lFrom = _fromVal.getLongValue();
- long lTo = _toVal.getLongValue();
- long lIncr = _incrVal.getLongValue();
-
- int P = _numThreads; // number of parallel workers
- long N = _numIter; // total number of iterations
- long R = N; // remaining number of iterations
- long K = -1; // next _numThreads task sizes
- TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
-
- for( long i = lFrom; i<=lTo; )
- {
- K = determineNextBatchSize(R, P);
- R -= (K * P);
-
- type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && K>3 ) ?
- TaskType.RANGE : TaskType.SET;
-
- //for each logical processor
- for( int j=0; j<P; j++ )
- {
- if( i > lTo ) //no more iterations
- break;
-
- //create new task and add to list of tasks
- Task lTask = new Task( type );
- tasks.addLast(lTask);
-
- // add iterations to task
- if( type == TaskType.SET )
- {
- //value based tasks
- for( long k=0; k<K && i<=lTo; k++, i+=lIncr )
- {
- lTask.addIteration(new IntObject(_iterVarName, i));
- }
- }
- else
- {
- //determine end of task
- long to = Math.min( i+(K-1)*lIncr, lTo );
-
- //range based tasks
- lTask.addIteration(new IntObject(_iterVarName, i)); //from
- lTask.addIteration(new IntObject(_iterVarName, to)); //to
- lTask.addIteration(new IntObject(_iterVarName, lIncr)); //increment
-
- i = to + lIncr;
- }
- }
- }
-
- return tasks;
- }
-
- @Override
- public long createTasks(LocalTaskQueue<Task> queue)
- throws DMLRuntimeException
- {
- long numCreatedTasks = 0;
-
- long lFrom = _fromVal.getLongValue();
- long lTo = _toVal.getLongValue();
- long lIncr = _incrVal.getLongValue();
-
- int P = _numThreads; // number of parallel workers
- long N = _numIter; // total number of iterations
- long R = N; // remaining number of iterations
- long K = -1; //next _numThreads task sizes
- TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
-
- try
- {
- for( long i = lFrom; i<=lTo; )
- {
- K = determineNextBatchSize(R, P);
- R -= (K * P);
-
- type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && K>3 ) ?
- TaskType.RANGE : TaskType.SET;
-
- //for each logical processor
- for( int j=0; j<P; j++ )
- {
- if( i > lTo ) //no more iterations
- break;
-
- //create new task and add to list of tasks
- Task lTask = new Task( type );
-
- // add iterations to task
- if( type == TaskType.SET )
- {
- //value based tasks
- for( long k=0; k<K && i<=lTo; k++, i+=lIncr )
- {
- lTask.addIteration(new IntObject(_iterVarName, i));
- }
- }
- else
- {
- //determine end of task
- long to = Math.min( i+(K-1)*lIncr, lTo );
-
- //range based tasks
- lTask.addIteration(new IntObject(_iterVarName, i)); //from
- lTask.addIteration(new IntObject(_iterVarName, to)); //to
- lTask.addIteration(new IntObject(_iterVarName, lIncr)); //increment
-
- i = to + lIncr;
- }
-
- //add task to queue (after all iteration added for preventing raise conditions)
- queue.enqueueTask( lTask );
- numCreatedTasks++;
- }
- }
-
- // mark end of task input stream
- queue.closeInput();
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
-
- return numCreatedTasks;
- }
-
-
- /**
- * Computes the task size (number of iterations per task) for the next numThreads tasks
- * given the number of remaining iterations R, and the number of Threads.
- *
- * NOTE: x can be set to different values, but the original paper argues for x=2.
- *
- * @param R
- * @return
- */
- protected long determineNextBatchSize(long R, int P)
- {
- int x = 2;
- long K = (long) Math.ceil((double)R / ( x * P )); //NOTE: round creates more tasks
-
- if( K < 1 ) //account for rounding errors
- K = 1;
-
- return K;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmax.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmax.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmax.java
deleted file mode 100644
index 914f387..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmax.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * Factoring with maximum constraint (e.g., if LIX matrix out-of-core and we need
- * to bound the maximum number of iterations per map task -> memory bounds)
- */
-public class TaskPartitionerFactoringCmax extends TaskPartitionerFactoring
-{
-
- protected long _constraint = -1;
-
- public TaskPartitionerFactoringCmax( long taskSize, int numThreads, long constraint, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal )
- {
- super(taskSize, numThreads, iterVarName, fromVal, toVal, incrVal);
-
- _constraint = constraint;
- }
-
- @Override
- protected long determineNextBatchSize(long R, int P)
- {
- int x = 2;
- long K = (long)Math.ceil((double)R / ( x * P )); //NOTE: round creates more tasks
-
- if( K > _constraint ) //account for rounding errors
- K = _constraint;
-
- return K;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmin.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmin.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmin.java
deleted file mode 100644
index 0d1c022..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmin.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * Factoring with minimum constraint (e.g., if communication is expensive)
- */
-public class TaskPartitionerFactoringCmin extends TaskPartitionerFactoring
-{
-
- protected long _constraint = -1;
-
- public TaskPartitionerFactoringCmin( long taskSize, int numThreads, long constraint, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal )
- {
- super(taskSize, numThreads, iterVarName, fromVal, toVal, incrVal);
-
- _constraint = constraint;
- }
-
- @Override
- protected long determineNextBatchSize(long R, int P)
- {
- int x = 2;
- long K = (long) Math.ceil((double)R / ( x * P )); //NOTE: round creates more tasks
-
- if( K < _constraint ) //account for rounding errors
- K = _constraint;
-
- return K;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java
deleted file mode 100644
index 54fc72a..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.Task.TaskType;
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * This naive task partitioner virtually iterates over the given FOR loop (from, to, incr),
- * creates iterations and group them to tasks according to the given task size. There, all
- * tasks are equally sized.
- *
- */
-public class TaskPartitionerFixedsize extends TaskPartitioner
-{
-
- protected int _firstnPlus1 = 0; //add one to these firstn tasks
-
- public TaskPartitionerFixedsize( long taskSize, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal )
- {
- super(taskSize, iterVarName, fromVal, toVal, incrVal);
- }
-
- @Override
- public List<Task> createTasks()
- throws DMLRuntimeException
- {
- LinkedList<Task> tasks = new LinkedList<Task>();
-
- //range tasks (similar to run-length encoding) make only sense if taskSize>3
- TaskType type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && _taskSize>3 ) ?
- TaskType.RANGE : TaskType.SET;
-
- long lFrom = _fromVal.getLongValue();
- long lTo = _toVal.getLongValue();
- long lIncr = _incrVal.getLongValue();
- long lfnp1 = _firstnPlus1;
-
- for( long i = lFrom; i<=lTo; )
- {
- //create new task and add to list of tasks
- Task lTask = new Task( type );
- tasks.addLast(lTask);
-
- int corr = (lfnp1-- > 0)? 1:0; //correction for static partitioner
-
- // add <tasksize> iterations to task
- // (last task might have less)
- if( type == TaskType.SET )
- {
- //value based tasks
- for( long j=0; j<_taskSize+corr && i<=lTo; j++, i+=lIncr )
- {
- lTask.addIteration(new IntObject(_iterVarName, i));
- }
- }
- else
- {
- //determine end of task
- long to = Math.min( i+(_taskSize-1+corr)*lIncr, lTo );
-
- //range based tasks
- lTask.addIteration(new IntObject(_iterVarName, i)); //from
- lTask.addIteration(new IntObject(_iterVarName, to)); //to
- lTask.addIteration(new IntObject(_iterVarName, lIncr)); //increment
-
- i = to + lIncr;
- }
- }
-
- return tasks;
- }
-
- @Override
- public long createTasks(LocalTaskQueue<Task> queue)
- throws DMLRuntimeException
- {
- long numCreatedTasks=0;
-
- //range tasks (similar to run-length encoding) make only sense if taskSize>3
- TaskType type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && _taskSize>3 ) ?
- TaskType.RANGE : TaskType.SET;
-
- long lFrom = _fromVal.getLongValue();
- long lTo = _toVal.getLongValue();
- long lIncr = _incrVal.getLongValue();
- long lfnp1 = _firstnPlus1;
-
- try
- {
- for( long i = lFrom; i<=lTo; )
- {
- //create new task and add to list of tasks
- Task lTask = new Task( type );
-
- int corr = (lfnp1-- > 0)? 1:0; //correction for static partitioner
-
- // add <tasksize> iterations to task
- // (last task might have less)
- if( type == TaskType.SET )
- {
- //value based tasks
- for( long j=0; j<_taskSize+corr && i<=lTo; j++, i+=lIncr )
- {
- lTask.addIteration(new IntObject(_iterVarName, i));
- }
- }
- else
- {
- //determine end of task
- long to = Math.min( i+(_taskSize-1+corr)*lIncr, lTo );
-
- //range based tasks
- lTask.addIteration(new IntObject(_iterVarName, i)); //from
- lTask.addIteration(new IntObject(_iterVarName, to)); //to
- lTask.addIteration(new IntObject(_iterVarName, lIncr)); //increment
-
- i = to + lIncr;
- }
-
- //add task to queue (after all iteration added for preventing raise conditions)
- queue.enqueueTask( lTask );
- numCreatedTasks++;
- }
-
- // mark end of task input stream
- queue.closeInput();
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException(ex);
- }
-
- return numCreatedTasks;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerNaive.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerNaive.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerNaive.java
deleted file mode 100644
index 2ad4806..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerNaive.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * This static task partitioner virtually iterates over the given FOR loop (from, to, incr),
- * creates iterations and group them to tasks according to a task size of numIterations/numWorkers.
- * There, all tasks are equally sized.
- *
- */
-public class TaskPartitionerNaive extends TaskPartitionerFixedsize
-{
-
- public TaskPartitionerNaive( long taskSize, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal )
- {
- super(taskSize, iterVarName, fromVal, toVal, incrVal);
-
- //compute the new task size
- _taskSize = 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerStatic.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerStatic.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerStatic.java
deleted file mode 100644
index 20be635..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerStatic.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * This static task partitioner virtually iterates over the given FOR loop (from, to, incr),
- * creates iterations and group them to tasks according to a task size of numIterations/numWorkers.
- * There, all tasks are equally sized.
- *
- */
-public class TaskPartitionerStatic extends TaskPartitionerFixedsize
-{
-
- public TaskPartitionerStatic( long taskSize, int numThreads, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal )
- {
- super(taskSize, iterVarName, fromVal, toVal, incrVal);
-
- _taskSize = _numIter / numThreads;
- _firstnPlus1 = (int)_numIter % numThreads;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/MergedMRJobInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/MergedMRJobInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/MergedMRJobInstruction.java
deleted file mode 100644
index 7b3423a..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/MergedMRJobInstruction.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor.mqo;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.JobReturn;
-import com.ibm.bi.dml.runtime.matrix.MetaData;
-
-/**
- * Merged MR Job instruction to hold the actually merged instruction as well as offsets of
- * result indexes in order to split result meta data after successful execution.
- *
- */
-public class MergedMRJobInstruction
-{
-
- protected MRJobInstruction inst;
- protected LinkedList<Long> ids;
- protected HashMap<Long,Integer> outIxOffs;
- protected HashMap<Long,Integer> outIxLens;
-
- public MergedMRJobInstruction()
- {
- ids = new LinkedList<Long>();
- outIxOffs = new HashMap<Long,Integer>();
- outIxLens = new HashMap<Long,Integer>();
- }
-
- public void addInstructionMetaData(long instID, int outIxOffset, int outIxLen)
- {
- ids.add(instID);
- outIxOffs.put(instID, outIxOffset);
- outIxLens.put(instID, outIxLen);
- }
-
- /**
- *
- * @param instID
- * @param allRet
- * @return
- */
- public JobReturn constructJobReturn( long instID, JobReturn retAll )
- {
- //get output offset and len
- int off = outIxOffs.get(instID);
- int len = outIxLens.get(instID);
-
- //create partial output meta data
- JobReturn ret = new JobReturn();
- ret.successful = retAll.successful;
- if( ret.successful ) {
- ret.metadata = new MetaData[len];
- System.arraycopy(retAll.metadata, off, ret.metadata, 0, len);
- }
-
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorker.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorker.java
deleted file mode 100644
index f76f515..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorker.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor.mqo;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.Timing;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.JobReturn;
-import com.ibm.bi.dml.runtime.matrix.data.Pair;
-
-/**
- *
- *
- */
-public abstract class PiggybackingWorker extends Thread
-{
-
-
- protected static final Log LOG = LogFactory.getLog(PiggybackingWorker.class.getName());
-
- protected HashMap<Long, JobReturn> _results = null;
- protected boolean _stop;
-
- protected PiggybackingWorker()
- {
- _results = new HashMap<Long, JobReturn>();
- _stop = false;
- }
-
- /**
- *
- */
- public void setStopped()
- {
- _stop = true;
- }
-
- /**
- *
- * @param instID
- * @return
- * @throws InterruptedException
- */
- public synchronized JobReturn getJobResult( long instID )
- throws InterruptedException
- {
- JobReturn ret = null;
-
- while( ret == null )
- {
- //wait for new results
- wait();
-
- //obtain job return (if available)
- ret = _results.remove( instID );
- }
-
- return ret;
- }
-
-
- /**
- *
- * @param ids
- * @param results
- */
- protected synchronized void putJobResults( LinkedList<Long> ids, LinkedList<JobReturn> results )
- {
- //make job returns available
- for( int i=0; i<ids.size(); i++ )
- _results.put(ids.get(i), results.get(i));
-
- //notify all waiting threads
- notifyAll();
- }
-
- /**
- *
- * @param workingSet
- * @return
- * @throws IllegalAccessException
- */
- protected LinkedList<MergedMRJobInstruction> mergeMRJobInstructions( LinkedList<Pair<Long,MRJobInstruction>> workingSet )
- throws IllegalAccessException
- {
- LinkedList<MergedMRJobInstruction> ret = new LinkedList<MergedMRJobInstruction>();
- Timing time = new Timing(true);
-
- //NOTE currently all merged into one (might be invalid due to memory constraints)
- MergedMRJobInstruction minst = new MergedMRJobInstruction();
- for( Pair<Long,MRJobInstruction> inst : workingSet )
- {
- long instID = inst.getKey();
- MRJobInstruction instVal = inst.getValue();
- int numOutputs = instVal.getOutputs().length;
-
- //append to current merged instruction
- if( minst.inst==null )
- {
- //deep copy first instruction
- minst.inst = new MRJobInstruction( instVal );
- minst.addInstructionMetaData( instID, 0, numOutputs );
- }
- else
- {
- //merge other instructions
- if( minst.inst.isMergableMRJobInstruction( instVal ) )
- {
- //add instruction to open merged instruction
- int offOutputs = minst.inst.getOutputs().length; //before merge
- minst.inst.mergeMRJobInstruction( instVal );
- minst.addInstructionMetaData(instID, offOutputs, numOutputs);
- }
- else
- {
- //close current merged instruction
- ret.add(minst);
- //open new merged instruction
- minst = new MergedMRJobInstruction();
- minst.inst = new MRJobInstruction( instVal );
- minst.addInstructionMetaData( instID, 0, numOutputs );
- }
- }
- }
- //close last open merged instruction
- ret.add(minst);
-
- //output log info for better understandability for users
- LOG.info("Merged MR-Job instructions: "+workingSet.size()+" --> "+ret.size()+" in "+time.stop()+"ms.");
-
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerTimeSequential.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerTimeSequential.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerTimeSequential.java
deleted file mode 100644
index d385fc7..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerTimeSequential.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor.mqo;
-
-import java.util.LinkedList;
-
-import com.ibm.bi.dml.lops.runtime.RunMRJobs;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.JobReturn;
-import com.ibm.bi.dml.runtime.matrix.data.Pair;
-import com.ibm.bi.dml.utils.Statistics;
-
-public class PiggybackingWorkerTimeSequential extends PiggybackingWorker
-{
-
- //internal configuration parameters
- private static long DEFAULT_MERGE_INTERVAL = 1000;
- private static boolean SUBSTRACT_EXEC_TIME = true;
-
- private long _time;
-
- public PiggybackingWorkerTimeSequential()
- {
- this(DEFAULT_MERGE_INTERVAL);
- }
-
- public PiggybackingWorkerTimeSequential( long timeInterval )
- {
- _time = timeInterval;
- }
-
- @Override
- public void run()
- {
- long lastTime = System.currentTimeMillis();
-
- while( !_stop )
- {
- try
- {
- // wait until next submission
- if( SUBSTRACT_EXEC_TIME ) {
- long currentTime = System.currentTimeMillis();
- if( currentTime-lastTime < _time )
- Thread.sleep( _time-(currentTime-lastTime) );
- lastTime = currentTime;
- }
- else
- Thread.sleep(_time);
-
-
- // pick job type with largest number of jobs
- LinkedList<Pair<Long,MRJobInstruction>> workingSet = RuntimePiggybacking.getMaxWorkingSet();
- if( workingSet == null )
- continue; //empty pool
-
- // merge jobs (if possible)
- LinkedList<MergedMRJobInstruction> mergedWorkingSet = mergeMRJobInstructions(workingSet);
-
- // submit all resulting jobs (currently sequential submission)
- for( MergedMRJobInstruction minst : mergedWorkingSet )
- {
- JobReturn mret = RunMRJobs.submitJob(minst.inst);
- Statistics.incrementNoOfExecutedMRJobs();
-
- // error handling
- if( !mret.successful )
- LOG.error("Failed to run merged mr-job instruction:\n"+minst.inst.toString());
-
- // split job return
- LinkedList<JobReturn> ret = new LinkedList<JobReturn>();
- for( Long id : minst.ids ){
- ret.add( minst.constructJobReturn(id, mret) );
- Statistics.decrementNoOfExecutedMRJobs();
- }
- // make job returns available and notify waiting clients
- putJobResults(minst.ids, ret);
- }
- }
- catch(Exception ex)
- {
- throw new RuntimeException(ex);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilDecayParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilDecayParallel.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilDecayParallel.java
deleted file mode 100644
index cad9967..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilDecayParallel.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor.mqo;
-
-import java.util.LinkedList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import com.ibm.bi.dml.lops.runtime.RunMRJobs;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.JobReturn;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.data.Pair;
-import com.ibm.bi.dml.utils.Statistics;
-
-/**
- *
- * Extensions: (1) take number of running jobs into account,
- * (2) compute timeout threshold based on max and last job execution time.
- */
-public class PiggybackingWorkerUtilDecayParallel extends PiggybackingWorker
-{
-
- //internal configuration parameters
- private static long MIN_MERGE_INTERVAL = 1000;
- private static double UTILIZATION_DECAY = 0.5; //decay per minute
-
- //thread pool for parallel submit
- private ExecutorService _parSubmit = null;
-
- private long _minTime = -1;
- private double _utilDecay = -1;
- private int _par = -1;
-
- public PiggybackingWorkerUtilDecayParallel(int par)
- {
- this( MIN_MERGE_INTERVAL,
- UTILIZATION_DECAY,
- par );
- }
-
- public PiggybackingWorkerUtilDecayParallel( long minInterval, double utilDecay, int par )
- {
- _minTime = minInterval;
- _utilDecay = utilDecay;
- _par = par;
-
- //init thread pool
- _parSubmit = Executors.newFixedThreadPool(_par);
- }
-
- @Override
- public void setStopped()
- {
- //parent logic
- super.setStopped();
-
- //explicitly stop the thread pool
- _parSubmit.shutdown();
- }
-
- @Override
- public void run()
- {
- long lastTime = System.currentTimeMillis();
-
- while( !_stop )
- {
- try
- {
- long currentTime = System.currentTimeMillis()+1; //ensure > lastTime
-
- // wait until next submission
- Thread.sleep(_minTime); //wait at least minTime
-
- //continue if (prevent cluster status requests)
- if( RuntimePiggybacking.isEmptyJobPool() )
- continue;
-
- double util = RuntimePiggybackingUtils.getCurrentClusterUtilization();
- double utilThreshold = 1-Math.pow(_utilDecay, Math.ceil(((double)currentTime-lastTime)/60000));
-
- //continue to collect jobs if cluster util too high (decay to prevent starvation)
- if( util > utilThreshold ) { //cluster utilization condition
- continue; //1min - >50%, 2min - >75%, 3min - >87.5%, 4min - > 93.7%
- }
-
- // pick job type with largest number of jobs
- LinkedList<Pair<Long,MRJobInstruction>> workingSet = RuntimePiggybacking.getMaxWorkingSet();
- if( workingSet == null )
- continue; //empty pool
-
- // merge jobs (if possible)
- LinkedList<MergedMRJobInstruction> mergedWorkingSet = mergeMRJobInstructions(workingSet);
-
- // submit all resulting jobs (parallel submission)
- for( MergedMRJobInstruction minst : mergedWorkingSet )
- {
- //submit job and return results if finished
- _parSubmit.execute(new MRJobSubmitTask(minst));
- }
-
- lastTime = currentTime;
- }
- catch(Exception ex)
- {
- throw new RuntimeException(ex);
- }
- }
- }
-
-
- /**
- *
- *
- */
- public class MRJobSubmitTask implements Runnable
- {
- private MergedMRJobInstruction _minst = null;
-
- public MRJobSubmitTask( MergedMRJobInstruction minst )
- {
- _minst = minst;
- }
-
- @Override
- public void run()
- {
- try
- {
- // submit mr job
- JobReturn mret = RunMRJobs.submitJob(_minst.inst);
- Statistics.incrementNoOfExecutedMRJobs();
-
- // error handling
- if( !mret.successful )
- LOG.error("Failed to run merged mr-job instruction:\n"+_minst.inst.toString());
-
- // split job return
- LinkedList<JobReturn> ret = new LinkedList<JobReturn>();
- for( Long id : _minst.ids ){
- ret.add( _minst.constructJobReturn(id, mret) );
- Statistics.decrementNoOfExecutedMRJobs();
- }
- putJobResults(_minst.ids, ret);
- }
- catch(Exception ex)
- {
- //log error and merged instruction
- LOG.error("Failed to run merged mr-job instruction:\n"+_minst.inst.toString(),ex);
-
- //handle unsuccessful job returns for failed job
- //(otherwise clients would literally wait forever for results)
- LinkedList<JobReturn> ret = new LinkedList<JobReturn>();
- for( Long id : _minst.ids ){
- JobReturn fret = new JobReturn(new MatrixCharacteristics[_minst.outIxLens.get(id)], false);
- ret.add( _minst.constructJobReturn(id, fret) );
- Statistics.decrementNoOfExecutedMRJobs();
- }
- // make job returns available and notify waiting clients
- putJobResults(_minst.ids, ret);
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilTimeParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilTimeParallel.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilTimeParallel.java
deleted file mode 100644
index 9c89532..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilTimeParallel.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor.mqo;
-
-import java.util.LinkedList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import com.ibm.bi.dml.lops.runtime.RunMRJobs;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.JobReturn;
-import com.ibm.bi.dml.runtime.matrix.data.Pair;
-import com.ibm.bi.dml.utils.Statistics;
-
-/**
- *
- * Extensions: (1) take number of running jobs into account,
- * (2) compute timeout threshold based on max and last job execution time.
- */
-public class PiggybackingWorkerUtilTimeParallel extends PiggybackingWorker
-{
-
- //internal configuration parameters
- private static long MIN_MERGE_INTERVAL = 1000;
- private static long MAX_MERGE_INTERVAL = 60000;
- private static double UTILIZATION_THRESHOLD = 0.4; //60% occupied map tasks
-
- //thread pool for parallel submit
- private ExecutorService _parSubmit = null;
-
- private long _minTime = -1;
- private long _maxTime = -1;
- private double _utilThreshold = -1;
- private int _par = -1;
-
- public PiggybackingWorkerUtilTimeParallel(int par)
- {
- this( MIN_MERGE_INTERVAL,
- MAX_MERGE_INTERVAL,
- UTILIZATION_THRESHOLD,
- par );
- }
-
- public PiggybackingWorkerUtilTimeParallel( long minInterval, long maxInterval, double utilThreshold, int par )
- {
- _minTime = minInterval;
- _maxTime = maxInterval;
- _utilThreshold = utilThreshold;
- _par = par;
-
- //init thread pool
- _parSubmit = Executors.newFixedThreadPool(_par);
- }
-
- @Override
- public void setStopped()
- {
- //parent logic
- super.setStopped();
-
- //explicitly stop the thread pool
- _parSubmit.shutdown();
- }
-
- @Override
- public void run()
- {
- long lastTime = System.currentTimeMillis();
-
- while( !_stop )
- {
- try
- {
- long currentTime = System.currentTimeMillis();
-
- // wait until next submission
- Thread.sleep(_minTime); //wait at least minTime
- if( RuntimePiggybacking.isEmptyJobPool() )
- continue;
- double util = RuntimePiggybackingUtils.getCurrentClusterUtilization();
- if( util > _utilThreshold //cluster utilization condition
- && currentTime-lastTime<_maxTime ) //timeout condition
- {
- continue;
- }
-
- // pick job type with largest number of jobs
- LinkedList<Pair<Long,MRJobInstruction>> workingSet = RuntimePiggybacking.getMaxWorkingSet();
- if( workingSet == null )
- continue; //empty pool
-
- // merge jobs (if possible)
- LinkedList<MergedMRJobInstruction> mergedWorkingSet = mergeMRJobInstructions(workingSet);
-
- // submit all resulting jobs (parallel submission)
- for( MergedMRJobInstruction minst : mergedWorkingSet )
- {
- //submit job and return results if finished
- _parSubmit.execute(new MRJobSubmitTask(minst));
- }
-
- lastTime = currentTime;
- }
- catch(Exception ex)
- {
- throw new RuntimeException(ex);
- }
- }
- }
-
-
- /**
- *
- *
- */
- public class MRJobSubmitTask implements Runnable
- {
- private MergedMRJobInstruction _minst = null;
-
- public MRJobSubmitTask( MergedMRJobInstruction minst )
- {
- _minst = minst;
- }
-
- @Override
- public void run()
- {
- try
- {
- // submit mr job
- JobReturn mret = RunMRJobs.submitJob(_minst.inst);
- Statistics.incrementNoOfExecutedMRJobs();
-
- // error handling
- if( !mret.successful )
- LOG.error("Failed to run merged mr-job instruction:\n"+_minst.inst.toString());
-
- // split job return
- LinkedList<JobReturn> ret = new LinkedList<JobReturn>();
- for( Long id : _minst.ids ){
- ret.add( _minst.constructJobReturn(id, mret) );
- Statistics.decrementNoOfExecutedMRJobs();
- }
- // make job returns available and notify waiting clients
- putJobResults(_minst.ids, ret);
- }
- catch(Exception ex)
- {
- throw new RuntimeException(ex);
- }
- }
-
- }
-}