You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/15 18:49:26 UTC
[3/4] incubator-systemml git commit: [SYSTEMML-1268] Replace
accumulators with new accumulatorV2 framework
[SYSTEMML-1268] Replace accumulators with new accumulatorV2 framework
This patch globally replaces all uses of deprecated accumulators with
the new accumulatorV2 framework. For custom accumulators, this entailed
a reimplementation. Furthermore, we now avoid expensive double-long
casting and use named accumulators for easier debugging in the webui.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/732e6da4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/732e6da4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/732e6da4
Branch: refs/heads/master
Commit: 732e6da4f924a99ba5fbddf656436fc1bd62668f
Parents: 12d79c5
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue Feb 14 21:56:14 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Feb 15 10:49:21 2017 -0800
----------------------------------------------------------------------
.../parfor/RemoteDPParForSpark.java | 10 +--
.../parfor/RemoteDPParForSparkWorker.java | 9 +--
.../parfor/RemoteParForSpark.java | 10 +--
.../parfor/RemoteParForSparkWorker.java | 8 +-
...ReturnParameterizedBuiltinSPInstruction.java | 79 +++++++++++++-------
.../instructions/spark/WriteSPInstruction.java | 10 +--
.../ComputeBinaryBlockNnzFunction.java | 9 +--
.../spark/utils/RDDConverterUtils.java | 18 ++---
8 files changed, 89 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index 7286eca..8663038 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -23,9 +23,9 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.LongAccumulator;
import scala.Tuple2;
@@ -73,8 +73,8 @@ public class RemoteDPParForSpark
InputInfo ii = InputInfo.BinaryBlockInputInfo;
//initialize accumulators for tasks/iterations
- Accumulator<Integer> aTasks = sc.accumulator(0);
- Accumulator<Integer> aIters = sc.accumulator(0);
+ LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
+ LongAccumulator aIters = sc.sc().longAccumulator("iterations");
JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(matrixvar);
DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf);
@@ -88,8 +88,8 @@ public class RemoteDPParForSpark
//de-serialize results
LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG);
- int numTasks = aTasks.value(); //get accumulator value
- int numIters = aIters.value(); //get accumulator value
+ int numTasks = aTasks.value().intValue(); //get accumulator value
+ int numIters = aIters.value().intValue(); //get accumulator value
//create output symbol table entries
RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index c973115..e12d010 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -24,10 +24,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.io.Writable;
-import org.apache.spark.Accumulator;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-
+import org.apache.spark.util.LongAccumulator;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
@@ -61,10 +60,10 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
private boolean _tSparseCol = false;
private PDataPartitionFormat _dpf = null;
- private Accumulator<Integer> _aTasks = null;
- private Accumulator<Integer> _aIters = null;
+ private LongAccumulator _aTasks = null;
+ private LongAccumulator _aIters = null;
- public RemoteDPParForSparkWorker(String program, String inputVar, String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, Accumulator<Integer> atasks, Accumulator<Integer> aiters)
+ public RemoteDPParForSparkWorker(String program, String inputVar, String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, LongAccumulator atasks, LongAccumulator aiters)
throws DMLRuntimeException
{
//keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
index 47ea2e6..9d3f0f3 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -23,8 +23,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.LongAccumulator;
import scala.Tuple2;
@@ -64,8 +64,8 @@ public class RemoteParForSpark
JavaSparkContext sc = sec.getSparkContext();
//initialize accumulators for tasks/iterations
- Accumulator<Integer> aTasks = sc.accumulator(0);
- Accumulator<Integer> aIters = sc.accumulator(0);
+ LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
+ LongAccumulator aIters = sc.sc().longAccumulator("iterations");
//run remote_spark parfor job
//(w/o lazy evaluation to fit existing parfor framework, e.g., result merge)
@@ -77,8 +77,8 @@ public class RemoteParForSpark
//de-serialize results
LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG);
- int numTasks = aTasks.value(); //get accumulator value
- int numIters = aIters.value(); //get accumulator value
+ int numTasks = aTasks.value().intValue(); //get accumulator value
+ int numIters = aIters.value().intValue(); //get accumulator value
//create output symbol table entries
RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index 75e5137..e12376a 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -23,9 +23,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
-import org.apache.spark.Accumulator;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.util.LongAccumulator;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
@@ -42,10 +42,10 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
private String _prog = null;
private boolean _caching = true;
- private Accumulator<Integer> _aTasks = null;
- private Accumulator<Integer> _aIters = null;
+ private LongAccumulator _aTasks = null;
+ private LongAccumulator _aIters = null;
- public RemoteParForSparkWorker(String program, boolean cpCaching, Accumulator<Integer> atasks, Accumulator<Integer> aiters)
+ public RemoteParForSparkWorker(String program, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters)
throws DMLRuntimeException
{
//keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index daa1ce5..5890bf9 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -20,20 +20,19 @@
package org.apache.sysml.runtime.instructions.spark;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
-import org.apache.spark.Accumulator;
-import org.apache.spark.AccumulatorParam;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.util.AccumulatorV2;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -124,7 +123,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
Encoder encoderBuild = EncoderFactory.createEncoder(spec, colnames,
fo.getSchema(), (int)fo.getNumColumns(), null);
- Accumulator<Long> accMax = sec.getSparkContext().accumulator(0L, new MaxAcc());
+ MaxLongAccumulator accMax = registerMaxLongAccumulator(sec.getSparkContext());
JavaRDD<String> rcMaps = in
.mapPartitionsToPair(new TransformEncodeBuildFunction(encoderBuild))
.distinct().groupByKey()
@@ -190,6 +189,54 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
return null;
}
+ private static MaxLongAccumulator registerMaxLongAccumulator(JavaSparkContext sc) {
+ MaxLongAccumulator acc = new MaxLongAccumulator(Long.MIN_VALUE);
+ sc.sc().register(acc, "max");
+ return acc;
+ }
+
+
+ private static class MaxLongAccumulator extends AccumulatorV2<Long,Long>
+ {
+ private static final long serialVersionUID = -3739727823287550826L;
+
+ private long _value = Long.MIN_VALUE;
+
+ public MaxLongAccumulator(long value) {
+ _value = value;
+ }
+
+ @Override
+ public void add(Long arg0) {
+ _value = Math.max(_value, arg0);
+ }
+
+ @Override
+ public AccumulatorV2<Long, Long> copy() {
+ return new MaxLongAccumulator(_value);
+ }
+
+ @Override
+ public boolean isZero() {
+ return _value == Long.MIN_VALUE;
+ }
+
+ @Override
+ public void merge(AccumulatorV2<Long, Long> arg0) {
+ _value = Math.max(_value, arg0.value());
+ }
+
+ @Override
+ public void reset() {
+ _value = Long.MIN_VALUE;
+ }
+
+ @Override
+ public Long value() {
+ return _value;
+ }
+ }
+
/**
* This function pre-aggregates distinct values of recoded columns per partition
* (part of distributed recode map construction, used for recoding, binning and
@@ -242,9 +289,9 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
{
private static final long serialVersionUID = -1034187226023517119L;
- private Accumulator<Long> _accMax = null;
+ private MaxLongAccumulator _accMax = null;
- public TransformEncodeGroupFunction( Accumulator<Long> accMax ) {
+ public TransformEncodeGroupFunction( MaxLongAccumulator accMax ) {
_accMax = accMax;
}
@@ -275,26 +322,6 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
}
}
- private static class MaxAcc implements AccumulatorParam<Long>, Serializable
- {
- private static final long serialVersionUID = -3739727823287550826L;
-
- @Override
- public Long addInPlace(Long arg0, Long arg1) {
- return Math.max(arg0, arg1);
- }
-
- @Override
- public Long zero(Long arg0) {
- return arg0;
- }
-
- @Override
- public Long addAccumulator(Long arg0, Long arg1) {
- return Math.max(arg0, arg1);
- }
- }
-
public static class TransformEncodeBuild2Function implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, ColumnMetadata>
{
private static final long serialVersionUID = 6336375833412029279L;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index 912dbe3..3387770 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -25,9 +25,9 @@ import java.util.Random;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.util.LongAccumulator;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
@@ -203,12 +203,12 @@ public class WriteSPInstruction extends SPInstruction
else if( oi == OutputInfo.CSVOutputInfo )
{
JavaRDD<String> out = null;
- Accumulator<Double> aNnz = null;
+ LongAccumulator aNnz = null;
if ( isInputMatrixBlock ) {
//piggyback nnz computation on actual write
if( !mc.nnzKnown() ) {
- aNnz = sec.getSparkContext().accumulator(0L);
+ aNnz = sec.getSparkContext().sc().longAccumulator("nnz");
in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
}
@@ -252,9 +252,9 @@ public class WriteSPInstruction extends SPInstruction
}
else if( oi == OutputInfo.BinaryBlockOutputInfo ) {
//piggyback nnz computation on actual write
- Accumulator<Double> aNnz = null;
+ LongAccumulator aNnz = null;
if( !mc.nnzKnown() ) {
- aNnz = sec.getSparkContext().accumulator(0L);
+ aNnz = sec.getSparkContext().sc().longAccumulator("nnz");
in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
index f76784e..7b6daad 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
@@ -19,18 +19,17 @@
package org.apache.sysml.runtime.instructions.spark.functions;
-import org.apache.spark.Accumulator;
import org.apache.spark.api.java.function.Function;
-
+import org.apache.spark.util.LongAccumulator;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
public class ComputeBinaryBlockNnzFunction implements Function<MatrixBlock,MatrixBlock>
{
private static final long serialVersionUID = -8396410450821999936L;
- private Accumulator<Double> _aNnz = null;
+ private LongAccumulator _aNnz = null;
- public ComputeBinaryBlockNnzFunction( Accumulator<Double> aNnz )
+ public ComputeBinaryBlockNnzFunction( LongAccumulator aNnz )
{
_aNnz = aNnz;
}
@@ -38,7 +37,7 @@ public class ComputeBinaryBlockNnzFunction implements Function<MatrixBlock,Matri
@Override
public MatrixBlock call(MatrixBlock arg0) throws Exception
{
- _aNnz.add( (double)arg0.getNonZeros() );
+ _aNnz.add( arg0.getNonZeros() );
return arg0;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index b5a4b58..1310b80 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -27,7 +27,6 @@ import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -46,6 +45,7 @@ import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
+import org.apache.spark.util.LongAccumulator;
import scala.Tuple2;
@@ -166,7 +166,7 @@ public class RDDConverterUtils
{
//determine unknown dimensions and sparsity if required
if( !mc.dimsKnown(true) ) {
- Accumulator<Double> aNnz = sc.accumulator(0L);
+ LongAccumulator aNnz = sc.sc().longAccumulator("nnz");
JavaRDD<String> tmp = input.values()
.map(new CSVAnalysisFunction(aNnz, delim));
long rlen = tmp.count() - (hasHeader ? 1 : 0);
@@ -230,7 +230,7 @@ public class RDDConverterUtils
{
//determine unknown dimensions and sparsity if required
if( !mc.dimsKnown(true) ) {
- Accumulator<Double> aNnz = sc.accumulator(0L);
+ LongAccumulator aNnz = sc.sc().longAccumulator("nnz");
JavaRDD<Row> tmp = df.javaRDD().map(new DataFrameAnalysisFunction(aNnz, containsID, isVector));
long rlen = tmp.count();
long clen = !isVector ? df.columns().length - (containsID?1:0) :
@@ -531,10 +531,10 @@ public class RDDConverterUtils
{
private static final long serialVersionUID = 2310303223289674477L;
- private Accumulator<Double> _aNnz = null;
+ private LongAccumulator _aNnz = null;
private String _delim = null;
- public CSVAnalysisFunction( Accumulator<Double> aNnz, String delim )
+ public CSVAnalysisFunction( LongAccumulator aNnz, String delim )
{
_aNnz = aNnz;
_delim = delim;
@@ -552,7 +552,7 @@ public class RDDConverterUtils
int lnnz = IOUtilFunctions.countNnz(cols);
//update counters
- _aNnz.add( (double)lnnz );
+ _aNnz.add( lnnz );
return line;
}
@@ -922,11 +922,11 @@ public class RDDConverterUtils
{
private static final long serialVersionUID = 5705371332119770215L;
- private Accumulator<Double> _aNnz = null;
+ private LongAccumulator _aNnz = null;
private boolean _containsID;
private boolean _isVector;
- public DataFrameAnalysisFunction( Accumulator<Double> aNnz, boolean containsID, boolean isVector) {
+ public DataFrameAnalysisFunction( LongAccumulator aNnz, boolean containsID, boolean isVector) {
_aNnz = aNnz;
_containsID = containsID;
_isVector = isVector;
@@ -940,7 +940,7 @@ public class RDDConverterUtils
int lnnz = countNnz(vect, _isVector, off);
//update counters
- _aNnz.add( (double)lnnz );
+ _aNnz.add( lnnz );
return arg0;
}
}