You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/22 05:24:33 UTC

incubator-systemml git commit: [SYSTEMML-925] Performance shuffle-based frame converters/operations

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 61a6dcb08 -> 4bc6601d6


[SYSTEMML-925] Performance shuffle-based frame converters/operations

This patch includes various performance improvements for wide-matrix to
frame casting, some of which generalize to all shuffle-based
converters/operations. In detail, this includes:

(1) Improved mergeByKey primitive: Similar to matrices, we now use
combineByKey for in-place block merges. This also includes a clean
separation of frame aggregation utils.

(2) Schema handling matrix-frame conversion: The multi-block
matrix-frame conversion also created a string schema, which was horrible
in terms of memory-efficiency leading to huge shuffle and spilling. We
now create proper double frame blocks.

(3) Multi-block matrix-frame conversion: The existing matrix-frame
conversion created unnecessarily large temporary objects - we now
reduced the size of intermediates (however, the number of copied cells
is still equivalent and has further potential for improvement).

(4) Frame block merge: The current frame block merge always used a slow
path through string conversions. If the column schemas match, we now use
a new copy non-zeros primitive directly over the native arrays.

Overall, these improvements brought the runtime for a 500k x 10k
scenario from 20+min to 200s. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/4bc6601d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/4bc6601d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/4bc6601d

Branch: refs/heads/master
Commit: 4bc6601d69d08d01a895d06cbb7fd8cd4e6c6cbf
Parents: 61a6dcb
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Sep 21 22:08:34 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Sep 21 22:24:12 2016 -0700

----------------------------------------------------------------------
 .../spark/FrameAppendRSPInstruction.java        |   6 +-
 .../spark/FrameIndexingSPInstruction.java       |   6 +-
 .../spark/utils/FrameRDDAggregateUtils.java     |  91 ++++++++++++++++
 .../spark/utils/FrameRDDConverterUtils.java     | 107 ++++++-------------
 .../spark/utils/RDDAggregateUtils.java          |  40 -------
 .../sysml/runtime/matrix/data/FrameBlock.java   |  43 +++++++-
 6 files changed, 170 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
index ad6ef58..58167b4 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -30,7 +30,7 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 
@@ -53,9 +53,9 @@ public class FrameAppendRSPInstruction extends AppendRSPInstruction
 		
 		if(_cbind) {
 			JavaPairRDD<Long,FrameBlock> in1Aligned = in1.mapToPair(new ReduceSideAppendAlignFunction(leftRows));			
-			in1Aligned = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey(in1Aligned);			
+			in1Aligned = FrameRDDAggregateUtils.mergeByKey(in1Aligned);			
 			JavaPairRDD<Long,FrameBlock> in2Aligned = in2.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
-			in2Aligned = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey(in2Aligned);			
+			in2Aligned = FrameRDDAggregateUtils.mergeByKey(in2Aligned);			
 			
 			out = in1Aligned.join(in2Aligned).mapValues(new ReduceSideColumnsFunction(_cbind));
 		} else {	//rbind

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
index b4556cf..98b8f70 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
@@ -36,7 +36,7 @@ import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.instructions.spark.functions.IsFrameBlockInRange;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDAggregateUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
@@ -111,7 +111,7 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 				
 				//aggregation if required 
 				if( _aggType != SparkAggType.NONE )
-					out = RDDAggregateUtils.mergeByFrameKey(out);
+					out = FrameRDDAggregateUtils.mergeByKey(out);
 			}
 			
 			//put output RDD handle into symbol table
@@ -164,7 +164,7 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 				in2 = sec.getFrameBinaryBlockRDDHandleForVariable( input2.getName() )
 					    .flatMapToPair(new SliceRHSForLeftIndexing(ixrange, mcLeft));
 
-				out = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey(in1.union(in2));
+				out = FrameRDDAggregateUtils.mergeByKey(in1.union(in2));
 			}
 			
 			sec.setRDDHandleForVariable(output.getName(), out);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDAggregateUtils.java
new file mode 100644
index 0000000..e57e599
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDAggregateUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark.utils;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+
+
+public class FrameRDDAggregateUtils 
+{
+	/**
+	 * @param: in
+	 * @return: 
+	 */
+	public static JavaPairRDD<Long, FrameBlock> mergeByKey( JavaPairRDD<Long, FrameBlock> in )
+	{
+		//use combine by key to avoid unnecessary deep block copies, i.e.
+		//create combiner block once and merge remaining blocks in-place.
+ 		return in.combineByKey( 
+ 				new CreateBlockCombinerFunction(), 
+			    new MergeBlocksFunction(false), 
+			    new MergeBlocksFunction(false) );
+	}
+	
+	/**
+	 * 
+	 */
+	private static class CreateBlockCombinerFunction implements Function<FrameBlock, FrameBlock> 
+	{
+		private static final long serialVersionUID = -4445167244905540494L;
+
+		@Override
+		public FrameBlock call(FrameBlock arg0) 
+			throws Exception 
+		{
+			//create deep copy of given block
+			return new FrameBlock(arg0);
+		}	
+	}
+	
+	/**
+	 * 
+	 */
+	private static class MergeBlocksFunction implements Function2<FrameBlock, FrameBlock, FrameBlock> 
+	{		
+		private static final long serialVersionUID = 7807210434431147007L;
+		
+		private boolean _deep = false;
+		
+		public MergeBlocksFunction(boolean deep) {
+			_deep = deep;
+		}
+
+		@Override
+		public FrameBlock call(FrameBlock b1, FrameBlock b2) 
+			throws Exception 
+		{
+			// sanity check input dimensions
+			if (b1.getNumRows() != b2.getNumRows() || b1.getNumColumns() != b2.getNumColumns()) {
+				throw new DMLRuntimeException("Mismatched frame block sizes for: "
+						+ b1.getNumRows() + " " + b1.getNumColumns() + " "
+						+ b2.getNumRows() + " " + b2.getNumColumns());
+			}
+
+			// execute merge (never pass by reference)
+			FrameBlock ret = _deep ? new FrameBlock(b1) : b1;
+			ret.merge(b2);
+			return ret;
+		}
+	}	
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 351d559..2bcfbc4 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -171,10 +171,8 @@ public class FrameRDDConverterUtils
 		}
 		
 		//convert binary block to csv (from blocks/rows)
-		JavaRDD<String> out = input
-				.flatMap(new BinaryBlockToCSVFunction(props));
-	
-		return out;
+		return input.flatMap(
+				new BinaryBlockToCSVFunction(props));
 	}
 	
 	
@@ -226,10 +224,7 @@ public class FrameRDDConverterUtils
 				.mapPartitionsToPair(new TextToBinaryBlockFunction( mc, schema ));
 		
 		//aggregate partial matrix blocks
-		JavaPairRDD<Long,FrameBlock> out = 
-				(JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey( output ); 
-
-		return out;
+		return FrameRDDAggregateUtils.mergeByKey( output ); 
 	}
 
 	/**
@@ -287,7 +282,7 @@ public class FrameRDDConverterUtils
 			out = input.flatMapToPair(new MatrixToBinaryBlockFunction(mcIn));
 			
 			//aggregate partial frame blocks
-			out = (JavaPairRDD<Long, FrameBlock>) RDDAggregateUtils.mergeByFrameKey( out );
+			out = FrameRDDAggregateUtils.mergeByKey( out );
 		}
 		else {
 			//convert single matrix binary block to frame binary block (w/o shuffle)
@@ -315,9 +310,7 @@ public class FrameRDDConverterUtils
 				.flatMapToPair(new BinaryBlockToMatrixBlockFunction(mcIn, mcOut));
 	
 		//aggregate partial matrix blocks
-		out = RDDAggregateUtils.mergeByKey( out ); 	
-		
-		return out;
+		return RDDAggregateUtils.mergeByKey( out ); 	
 	}
 	
 	//=====================================
@@ -357,10 +350,8 @@ public class FrameRDDConverterUtils
 		convertDFSchemaToFrameSchema(df.schema(), colnames, fschema, containsID);
 				
 		//convert rdd to binary block rdd
-		JavaPairRDD<Long, FrameBlock> out = prepinput.mapPartitionsToPair(
+		return prepinput.mapPartitionsToPair(
 				new DataFrameToBinaryBlockFunction(mc, colnames, fschema, containsID));
-		
-		return out;
 	}
 
 	/**
@@ -1011,85 +1002,57 @@ public class FrameRDDConverterUtils
 			throws Exception 
 		{
 			ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>();
-
-			MatrixIndexes matrixIndexes = arg0._1();
-			MatrixBlock matrixBlock = arg0._2();
-			
-			//Frame Index (Row id, with base 1)
-			Long rowix = new Long((matrixIndexes.getRowIndex()-1)*_brlen+1);
-
-			//Global index within frame blocks
-			long colixLow = (int)((matrixIndexes.getColumnIndex()-1)*_bclen+1);
-			long colixHigh = Math.min(colixLow+matrixBlock.getNumColumns()-1, _clen);
+			MatrixIndexes ix = arg0._1();
+			MatrixBlock mb = arg0._2();
+			MatrixBlock mbreuse = new MatrixBlock();
 			
-			//Index within a local matrix block
-			int iColLowMat = UtilFunctions.computeCellInBlock(colixLow, _bclen);
-			int iColHighMat = UtilFunctions.computeCellInBlock(colixHigh, _bclen);
+			//frame index (row id, 1-based)
+			Long rowix = new Long((ix.getRowIndex()-1)*_brlen+1);
 
-			FrameBlock tmpBlock = DataConverter.convertToFrameBlock(matrixBlock);
+			//global index within frame block (0-based)
+			long cl = (int)((ix.getColumnIndex()-1)*_bclen);
+			long cu = Math.min(cl+mb.getNumColumns()-1, _clen);
 
-			int iRowLow = 0;	//Index within a local frame block
-			while(iRowLow < matrixBlock.getNumRows()) {
-				int iRowHigh = Math.min(iRowLow+_maxRowsPerBlock-1,  matrixBlock.getNumRows()-1);
-				
-				FrameBlock tmpBlock2 = null;
-				//All rows from matrix block can fit into single frame block, no need for slicing 
-				if(iRowLow == 0 && iRowHigh == matrixBlock.getNumRows()-1)
-					tmpBlock2 = tmpBlock;
-				else
-					tmpBlock2 = tmpBlock.sliceOperations(iRowLow, iRowHigh, iColLowMat, iColHighMat, tmpBlock2);
-				
-				//If Matrix has only one column block, then simply assigns converted block to frame block
-				if(colixLow == 0 && colixHigh == matrixBlock.getNumColumns()-1)
-					ret.add(new Tuple2<Long, FrameBlock>(rowix+iRowLow, tmpBlock2));
-				else
-				{
-					FrameBlock frameBlock = new FrameBlock((int)_clen, ValueType.STRING);
-					frameBlock.ensureAllocatedColumns(iRowHigh-iRowLow+1);
-					
-					frameBlock.copy(0, iRowHigh-iRowLow, (int)colixLow-1, (int)colixHigh-1, tmpBlock2);
-					ret.add(new Tuple2<Long, FrameBlock>(rowix+iRowLow, frameBlock));
-				}
-				iRowLow = iRowHigh+1;
+			//prepare output frame blocks 
+			for( int i=0; i<mb.getNumRows(); i+=_maxRowsPerBlock ) {
+				int ru = Math.min(i+_maxRowsPerBlock, mb.getNumRows())-1;
+				FrameBlock fb = new FrameBlock((int)_clen, ValueType.DOUBLE);
+				fb.ensureAllocatedColumns(ru-i+1);
+				fb.copy(0, fb.getNumRows()-1, (int)cl, (int)cu, DataConverter.convertToFrameBlock(
+					mb.sliceOperations(i, ru, 0, mb.getNumColumns()-1, mbreuse)));
+				ret.add(new Tuple2<Long, FrameBlock>(rowix+i,fb));				
 			}
+
 			return ret;
 		}
 	}
 
-	/*
-	 * This function supports if matrix has only one column block.
+	/**
+	 * 
 	 */
 	private static class MatrixToBinaryBlockOneColumnBlockFunction implements PairFunction<Tuple2<MatrixIndexes,MatrixBlock>,Long,FrameBlock>
 	{
 		private static final long serialVersionUID = 3716019666116660815L;
 
 		private int _brlen = -1;
-		private int _bclen = -1;
-		private long _clen = -1;
-	
-		
-		public MatrixToBinaryBlockOneColumnBlockFunction(MatrixCharacteristics mc)
+			
+		public MatrixToBinaryBlockOneColumnBlockFunction(MatrixCharacteristics mc) 
+			throws DMLRuntimeException
 		{
+			//sanity check function constraints
+			if(mc.getCols() > mc.getColsPerBlock())
+				throw new DMLRuntimeException("This function supports only matrices with a single column block.");
+
 			_brlen = mc.getRowsPerBlock();
-			_bclen = mc.getColsPerBlock();
-			_clen = mc.getCols();
 		}
 
 		@Override
 		public Tuple2<Long, FrameBlock> call(Tuple2<MatrixIndexes,MatrixBlock> arg0) 
 			throws Exception 
 		{
-			if(_clen > _bclen)
-				throw new DMLRuntimeException("The input matrix has more than one column block, this function supports only one column block.");
-
-			MatrixIndexes matrixIndexes = arg0._1();
-			MatrixBlock matrixBlock = arg0._2();
-			
-			//Frame Index (Row id, with base 1)
-			Long rowix = new Long((matrixIndexes.getRowIndex()-1)*_brlen+1);
-
-			FrameBlock frameBlock = DataConverter.convertToFrameBlock(matrixBlock);
-			return new Tuple2<Long, FrameBlock>(rowix, frameBlock);
+			return new Tuple2<Long, FrameBlock>(
+				(arg0._1().getRowIndex()-1)*_brlen+1, 
+				DataConverter.convertToFrameBlock(arg0._2()) );
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 93bb1d0..dcdbd36 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -28,7 +28,6 @@ import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.instructions.spark.data.CorrMatrixBlock;
 import org.apache.sysml.runtime.instructions.spark.data.RowMatrixBlock;
-import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
@@ -730,43 +729,4 @@ public class RDDAggregateUtils
 			return v1 + v2;
 		}	
 	}
-	
-	/**
-	 * @param: in
-	 * @return: 
-	 */
-	public static JavaPairRDD<Long, FrameBlock> mergeByFrameKey( JavaPairRDD<Long, FrameBlock> in )
-	{
-		return in.reduceByKey(
-				new MergeFrameBlocksFunction());
-	}
-	
-	/**
-	 * 
-	 */
-	private static class MergeFrameBlocksFunction implements Function2<FrameBlock, FrameBlock, FrameBlock> 
-	{		
-		private static final long serialVersionUID = -8881019027250258850L;
-
-		@Override
-		public FrameBlock call(FrameBlock b1, FrameBlock b2) 
-			throws Exception 
-		{
-			// sanity check input dimensions
-			if (b1.getNumRows() != b2.getNumRows() || b1.getNumColumns() != b2.getNumColumns()) {
-				throw new DMLRuntimeException("Mismatched frame block sizes for: "
-						+ b1.getNumRows() + " " + b1.getNumColumns() + " "
-						+ b2.getNumRows() + " " + b2.getNumColumns());
-			}
-
-			// execute merge (never pass by reference)
-			FrameBlock ret = new FrameBlock(b1);
-			ret.merge(b2);
-			
-			return ret;
-		}
-
-	}
-	
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4bc6601d/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index cbd231e..005b254 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -960,11 +960,19 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 			}
 		
 		//core frame block merge through cell copy
-		for( int i=0; i<that.getNumRows(); i++ ) {
-			for( int j=0; j<getNumColumns(); j++ ) {
-				Object obj = UtilFunctions.objectToObject(getSchema().get(j), that.get(i,j), true);
-				if (obj != null) 			// Do not update with "null" data
-					set(i, j,obj);
+		//with column-wide access pattern
+		for( int j=0; j<getNumColumns(); j++ ) {
+			//special case: copy non-zeros of column 
+			if( _schema.get(j).equals(that._schema.get(j)) )
+				_coldata.get(j).setNz(0, _numRows-1, that._coldata.get(j));
+			//general case w/ schema transformation
+			else {
+				for( int i=0; i<_numRows; i++ ) {
+					Object obj = UtilFunctions.objectToObject(
+							getSchema().get(j), that.get(i,j), true);
+					if (obj != null) //merge non-zeros
+						set(i, j,obj);
+				}
 			}
 		}
 	}
@@ -1122,6 +1130,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		public abstract void set(int index, T value);
 		public abstract void set(int rl, int ru, Array value);
 		public abstract void set(int rl, int ru, Array value, int rlSrc);
+		public abstract void setNz(int rl, int ru, Array value);
 		public abstract void append(String value);
 		public abstract void append(T value);
 		public abstract Array clone();
@@ -1150,6 +1159,12 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		public void set(int rl, int ru, Array value, int rlSrc) {
 			System.arraycopy(((StringArray)value)._data, rlSrc, _data, rl, ru-rl+1);
 		}
+		public void setNz(int rl, int ru, Array value) {
+			String[] data2 = ((StringArray)value)._data;
+			for( int i=rl; i<ru+1; i++ )
+				if( data2[i]!=null )
+					_data[i] = data2[i];
+		}
 		public void append(String value) {
 			if( _data.length <= _size )
 				_data = Arrays.copyOf(_data, newSize());
@@ -1196,6 +1211,12 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		public void set(int rl, int ru, Array value, int rlSrc) {
 			System.arraycopy(((BooleanArray)value)._data, rlSrc, _data, rl, ru-rl+1);
 		}
+		public void setNz(int rl, int ru, Array value) {
+			boolean[] data2 = ((BooleanArray)value)._data;
+			for( int i=rl; i<ru+1; i++ )
+				if( data2[i] )
+					_data[i] = data2[i];
+		}
 		public void append(String value) {
 			append(Boolean.parseBoolean(value));
 		}
@@ -1243,6 +1264,12 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		public void set(int rl, int ru, Array value, int rlSrc) {
 			System.arraycopy(((LongArray)value)._data, rlSrc, _data, rl, ru-rl+1);
 		}
+		public void setNz(int rl, int ru, Array value) {
+			long[] data2 = ((LongArray)value)._data;
+			for( int i=rl; i<ru+1; i++ )
+				if( data2[i]!=0 )
+					_data[i] = data2[i];
+		}
 		public void append(String value) {
 			append((value!=null)?Long.parseLong(value):null);
 		}
@@ -1290,6 +1317,12 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		public void set(int rl, int ru, Array value, int rlSrc) {
 			System.arraycopy(((DoubleArray)value)._data, rlSrc, _data, rl, ru-rl+1);
 		}
+		public void setNz(int rl, int ru, Array value) {
+			double[] data2 = ((DoubleArray)value)._data;
+			for( int i=rl; i<ru+1; i++ )
+				if( data2[i]!=0 )
+					_data[i] = data2[i];
+		}
 		public void append(String value) {
 			append((value!=null)?Double.parseDouble(value):null);
 		}