You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/15 18:49:26 UTC

[3/4] incubator-systemml git commit: [SYSTEMML-1268] Replace accumulators with new accumulatorV2 framework

[SYSTEMML-1268] Replace accumulators with new accumulatorV2 framework

This patch globally replaces all uses of deprecated accumulators with
the new accumulatorV2 framework. For custom accumulators, this entailed
a reimplementation. Furthermore, we now avoid expensive double-long
casting and use named accumulators for easier debugging in the webui.  

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/732e6da4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/732e6da4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/732e6da4

Branch: refs/heads/master
Commit: 732e6da4f924a99ba5fbddf656436fc1bd62668f
Parents: 12d79c5
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue Feb 14 21:56:14 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Feb 15 10:49:21 2017 -0800

----------------------------------------------------------------------
 .../parfor/RemoteDPParForSpark.java             | 10 +--
 .../parfor/RemoteDPParForSparkWorker.java       |  9 +--
 .../parfor/RemoteParForSpark.java               | 10 +--
 .../parfor/RemoteParForSparkWorker.java         |  8 +-
 ...ReturnParameterizedBuiltinSPInstruction.java | 79 +++++++++++++-------
 .../instructions/spark/WriteSPInstruction.java  | 10 +--
 .../ComputeBinaryBlockNnzFunction.java          |  9 +--
 .../spark/utils/RDDConverterUtils.java          | 18 ++---
 8 files changed, 89 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index 7286eca..8663038 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -23,9 +23,9 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.LongAccumulator;
 
 import scala.Tuple2;
 
@@ -73,8 +73,8 @@ public class RemoteDPParForSpark
 		InputInfo ii = InputInfo.BinaryBlockInputInfo;
 				
 		//initialize accumulators for tasks/iterations
-		Accumulator<Integer> aTasks = sc.accumulator(0);
-		Accumulator<Integer> aIters = sc.accumulator(0);
+		LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
+		LongAccumulator aIters = sc.sc().longAccumulator("iterations");
 		
 		JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(matrixvar);
 		DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf);
@@ -88,8 +88,8 @@ public class RemoteDPParForSpark
 		
 		//de-serialize results
 		LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG);
-		int numTasks = aTasks.value(); //get accumulator value
-		int numIters = aIters.value(); //get accumulator value
+		int numTasks = aTasks.value().intValue(); //get accumulator value
+		int numIters = aIters.value().intValue(); //get accumulator value
 		
 		//create output symbol table entries
 		RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index c973115..e12d010 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -24,10 +24,9 @@ import java.util.ArrayList;
 import java.util.Iterator;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.spark.Accumulator;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-
+import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
@@ -61,10 +60,10 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 	private boolean _tSparseCol = false;
 	private PDataPartitionFormat _dpf = null;
 	
-	private Accumulator<Integer> _aTasks = null;
-	private Accumulator<Integer> _aIters = null;
+	private LongAccumulator _aTasks = null;
+	private LongAccumulator _aIters = null;
 	
-	public RemoteDPParForSparkWorker(String program, String inputVar, String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, Accumulator<Integer> atasks, Accumulator<Integer> aiters) 
+	public RemoteDPParForSparkWorker(String program, String inputVar, String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, LongAccumulator atasks, LongAccumulator aiters) 
 		throws DMLRuntimeException
 	{
 		//keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
index 47ea2e6..9d3f0f3 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -23,8 +23,8 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.LongAccumulator;
 
 import scala.Tuple2;
 
@@ -64,8 +64,8 @@ public class RemoteParForSpark
 		JavaSparkContext sc = sec.getSparkContext();
 		
 		//initialize accumulators for tasks/iterations
-		Accumulator<Integer> aTasks = sc.accumulator(0);
-		Accumulator<Integer> aIters = sc.accumulator(0);
+		LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
+		LongAccumulator aIters = sc.sc().longAccumulator("iterations");
 		
 		//run remote_spark parfor job 
 		//(w/o lazy evaluation to fit existing parfor framework, e.g., result merge)
@@ -77,8 +77,8 @@ public class RemoteParForSpark
 		
 		//de-serialize results
 		LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG);
-		int numTasks = aTasks.value(); //get accumulator value
-		int numIters = aIters.value(); //get accumulator value
+		int numTasks = aTasks.value().intValue(); //get accumulator value
+		int numIters = aIters.value().intValue(); //get accumulator value
 		
 		//create output symbol table entries
 		RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index 75e5137..e12376a 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -23,9 +23,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
-import org.apache.spark.Accumulator;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
@@ -42,10 +42,10 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 	private String  _prog = null;
 	private boolean _caching = true;
 	
-	private Accumulator<Integer> _aTasks = null;
-	private Accumulator<Integer> _aIters = null;
+	private LongAccumulator _aTasks = null;
+	private LongAccumulator _aIters = null;
 	
-	public RemoteParForSparkWorker(String program, boolean cpCaching, Accumulator<Integer> atasks, Accumulator<Integer> aiters) 
+	public RemoteParForSparkWorker(String program, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) 
 		throws DMLRuntimeException
 	{
 		//keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index daa1ce5..5890bf9 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -20,20 +20,19 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
-import org.apache.spark.Accumulator;
-import org.apache.spark.AccumulatorParam;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.util.AccumulatorV2;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -124,7 +123,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
 			Encoder encoderBuild = EncoderFactory.createEncoder(spec, colnames,
 					fo.getSchema(), (int)fo.getNumColumns(), null);
 			
-			Accumulator<Long> accMax = sec.getSparkContext().accumulator(0L, new MaxAcc()); 
+			MaxLongAccumulator accMax = registerMaxLongAccumulator(sec.getSparkContext()); 
 			JavaRDD<String> rcMaps = in
 					.mapPartitionsToPair(new TransformEncodeBuildFunction(encoderBuild))
 					.distinct().groupByKey()
@@ -190,6 +189,54 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
 		return null;	
 	}
 	
+	private static MaxLongAccumulator registerMaxLongAccumulator(JavaSparkContext sc) {
+		MaxLongAccumulator acc = new MaxLongAccumulator(Long.MIN_VALUE);
+		sc.sc().register(acc, "max");
+		return acc;
+	}
+	
+
+	private static class MaxLongAccumulator extends AccumulatorV2<Long,Long>
+	{
+		private static final long serialVersionUID = -3739727823287550826L;
+
+		private long _value = Long.MIN_VALUE;
+		
+		public MaxLongAccumulator(long value) {
+			_value = value;
+		}
+
+		@Override
+		public void add(Long arg0) {
+			_value = Math.max(_value, arg0);
+		}
+
+		@Override
+		public AccumulatorV2<Long, Long> copy() {
+			return new MaxLongAccumulator(_value);
+		}
+
+		@Override
+		public boolean isZero() {
+			return _value == Long.MIN_VALUE;
+		}
+
+		@Override
+		public void merge(AccumulatorV2<Long, Long> arg0) {
+			_value = Math.max(_value, arg0.value());
+		}
+
+		@Override
+		public void reset() {
+			_value = Long.MIN_VALUE;
+		}
+
+		@Override
+		public Long value() {
+			return _value;
+		}
+	}
+	
 	/**
 	 * This function pre-aggregates distinct values of recoded columns per partition
 	 * (part of distributed recode map construction, used for recoding, binning and 
@@ -242,9 +289,9 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
 	{
 		private static final long serialVersionUID = -1034187226023517119L;
 
-		private Accumulator<Long> _accMax = null;
+		private MaxLongAccumulator _accMax = null;
 		
-		public TransformEncodeGroupFunction( Accumulator<Long> accMax ) {
+		public TransformEncodeGroupFunction( MaxLongAccumulator accMax ) {
 			_accMax = accMax;
 		}
 		
@@ -275,26 +322,6 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
 		}
 	}
 
-	private static class MaxAcc implements AccumulatorParam<Long>, Serializable 
-	{
-		private static final long serialVersionUID = -3739727823287550826L;
-
-		@Override
-		public Long addInPlace(Long arg0, Long arg1) {
-			return Math.max(arg0, arg1);
-		}
-
-		@Override
-		public Long zero(Long arg0) {
-			return arg0;
-		}
-
-		@Override
-		public Long addAccumulator(Long arg0, Long arg1) {
-			return Math.max(arg0, arg1);	
-		}
-	}
-
 	public static class TransformEncodeBuild2Function implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, ColumnMetadata>
 	{
 		private static final long serialVersionUID = 6336375833412029279L;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index 912dbe3..3387770 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -25,9 +25,9 @@ import java.util.Random;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -203,12 +203,12 @@ public class WriteSPInstruction extends SPInstruction
 		else if( oi == OutputInfo.CSVOutputInfo ) 
 		{
 			JavaRDD<String> out = null;
-			Accumulator<Double> aNnz = null;
+			LongAccumulator aNnz = null;
 			
 			if ( isInputMatrixBlock ) {
 				//piggyback nnz computation on actual write
 				if( !mc.nnzKnown() ) {
-					aNnz = sec.getSparkContext().accumulator(0L);
+					aNnz = sec.getSparkContext().sc().longAccumulator("nnz");
 					in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
 				}	
 				
@@ -252,9 +252,9 @@ public class WriteSPInstruction extends SPInstruction
 		}
 		else if( oi == OutputInfo.BinaryBlockOutputInfo ) {
 			//piggyback nnz computation on actual write
-			Accumulator<Double> aNnz = null;
+			LongAccumulator aNnz = null;
 			if( !mc.nnzKnown() ) {
-				aNnz = sec.getSparkContext().accumulator(0L);
+				aNnz = sec.getSparkContext().sc().longAccumulator("nnz");
 				in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
 			}
 			

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
index f76784e..7b6daad 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
@@ -19,18 +19,17 @@
 
 package org.apache.sysml.runtime.instructions.spark.functions;
 
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.Function;
-
+import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
 public class ComputeBinaryBlockNnzFunction implements Function<MatrixBlock,MatrixBlock> 
 {
 	private static final long serialVersionUID = -8396410450821999936L;
 	
-	private Accumulator<Double> _aNnz = null;
+	private LongAccumulator _aNnz = null;
 	
-	public ComputeBinaryBlockNnzFunction( Accumulator<Double> aNnz )
+	public ComputeBinaryBlockNnzFunction( LongAccumulator aNnz )
 	{
 		_aNnz = aNnz;
 	}
@@ -38,7 +37,7 @@ public class ComputeBinaryBlockNnzFunction implements Function<MatrixBlock,Matri
 	@Override
 	public MatrixBlock call(MatrixBlock arg0) throws Exception 
 	{
-		_aNnz.add( (double)arg0.getNonZeros() );
+		_aNnz.add( arg0.getNonZeros() );
 		return arg0;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index b5a4b58..1310b80 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -27,7 +27,6 @@ import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -46,6 +45,7 @@ import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
+import org.apache.spark.util.LongAccumulator;
 
 import scala.Tuple2;
 
@@ -166,7 +166,7 @@ public class RDDConverterUtils
 	{
 		//determine unknown dimensions and sparsity if required
 		if( !mc.dimsKnown(true) ) {
-			Accumulator<Double> aNnz = sc.accumulator(0L);
+			LongAccumulator aNnz = sc.sc().longAccumulator("nnz");
 			JavaRDD<String> tmp = input.values()
 					.map(new CSVAnalysisFunction(aNnz, delim));
 			long rlen = tmp.count() - (hasHeader ? 1 : 0);
@@ -230,7 +230,7 @@ public class RDDConverterUtils
 	{
 		//determine unknown dimensions and sparsity if required
 		if( !mc.dimsKnown(true) ) {
-			Accumulator<Double> aNnz = sc.accumulator(0L);
+			LongAccumulator aNnz = sc.sc().longAccumulator("nnz");
 			JavaRDD<Row> tmp = df.javaRDD().map(new DataFrameAnalysisFunction(aNnz, containsID, isVector));
 			long rlen = tmp.count();
 			long clen = !isVector ? df.columns().length - (containsID?1:0) : 
@@ -531,10 +531,10 @@ public class RDDConverterUtils
 	{
 		private static final long serialVersionUID = 2310303223289674477L;
 
-		private Accumulator<Double> _aNnz = null;
+		private LongAccumulator _aNnz = null;
 		private String _delim = null;
 		
-		public CSVAnalysisFunction( Accumulator<Double> aNnz, String delim )
+		public CSVAnalysisFunction( LongAccumulator aNnz, String delim )
 		{
 			_aNnz = aNnz;
 			_delim = delim;
@@ -552,7 +552,7 @@ public class RDDConverterUtils
 			int lnnz = IOUtilFunctions.countNnz(cols);
 			
 			//update counters
-			_aNnz.add( (double)lnnz );
+			_aNnz.add( lnnz );
 			
 			return line;
 		}
@@ -922,11 +922,11 @@ public class RDDConverterUtils
 	{	
 		private static final long serialVersionUID = 5705371332119770215L;
 		
-		private Accumulator<Double> _aNnz = null;
+		private LongAccumulator _aNnz = null;
 		private boolean _containsID;
 		private boolean _isVector;
 		
-		public DataFrameAnalysisFunction( Accumulator<Double> aNnz, boolean containsID, boolean isVector) {
+		public DataFrameAnalysisFunction( LongAccumulator aNnz, boolean containsID, boolean isVector) {
 			_aNnz = aNnz;
 			_containsID = containsID;
 			_isVector = isVector;
@@ -940,7 +940,7 @@ public class RDDConverterUtils
 			int lnnz = countNnz(vect, _isVector, off);
 			
 			//update counters
-			_aNnz.add( (double)lnnz );
+			_aNnz.add( lnnz );
 			return arg0;
 		}
 	}