You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/09 23:49:38 UTC

[2/2] incubator-systemml git commit: [SYSTEMML-376] Fix robustness spark rand/seq ops (oom on many blocks)

[SYSTEMML-376] Fix robustness spark rand/seq ops (oom on many blocks)

Similar to our mapreduce datagen job, we now materialize seed/offset
inputs on hdfs if the number of blocks exceeds a threshold of 1M blocks.
This significantly reduces the memory requirements because otherwise the
driver has to accommodate the entire seed/offset collection in memory. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/91a1863b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/91a1863b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/91a1863b

Branch: refs/heads/master
Commit: 91a1863b9a9e5b6c6c58e739e178572313e21b17
Parents: 3658a80
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Jan 8 17:19:51 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Jan 8 17:19:51 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/lops/DataGen.java     |   2 +-
 .../instructions/spark/RandSPInstruction.java   | 358 +++++++++++++------
 .../apache/sysml/runtime/matrix/DataGenMR.java  |   5 +-
 .../runtime/matrix/data/LibMatrixDatagen.java   |  20 +-
 4 files changed, 256 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/91a1863b/src/main/java/org/apache/sysml/lops/DataGen.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/DataGen.java b/src/main/java/org/apache/sysml/lops/DataGen.java
index c1a70b4..ec8fb9c 100644
--- a/src/main/java/org/apache/sysml/lops/DataGen.java
+++ b/src/main/java/org/apache/sysml/lops/DataGen.java
@@ -208,7 +208,7 @@ public class DataGen extends Lop
 		sb.append(iLop.prepScalarLabel());
 		sb.append(OPERAND_DELIMITOR);
 
-		if ( getExecType() == ExecType.MR ) {
+		if ( getExecType() == ExecType.MR || getExecType() == ExecType.SPARK ) {
 			sb.append(baseDir);
 			sb.append(OPERAND_DELIMITOR);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/91a1863b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
index b1eca1e..dfb53a1 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
@@ -19,12 +19,17 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
+import java.io.IOException;
+import java.io.PrintWriter;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Random;
 
 import org.apache.commons.math3.distribution.PoissonDistribution;
 import org.apache.commons.math3.random.Well1024a;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -36,6 +41,7 @@ import scala.Tuple2;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.hops.DataGenOp;
 import org.apache.sysml.hops.Hop.DataGenMethod;
 import org.apache.sysml.hops.OptimizerUtils;
@@ -48,6 +54,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.LibMatrixDatagen;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -60,9 +67,8 @@ import org.apache.sysml.utils.Statistics;
 
 public class RandSPInstruction extends UnarySPInstruction
 {
-	
 	//internal configuration
-	private static final long SEED_PARTITION_SIZE = 1024 * 1024;
+	private static final long INMEMORY_NUMBLOCKS_THRESHOLD = 1024 * 1024;
 	
 	private DataGenMethod method = DataGenMethod.INVALID;
 	
@@ -76,6 +82,7 @@ public class RandSPInstruction extends UnarySPInstruction
 	private String pdf;
 	private String pdfParams;
 	private long seed=0;
+	private String dir;
 	private double seq_from;
 	private double seq_to; 
 	private double seq_incr;
@@ -84,7 +91,7 @@ public class RandSPInstruction extends UnarySPInstruction
 	private boolean replace;
 
 	public RandSPInstruction (Operator op, DataGenMethod mthd, CPOperand in, CPOperand out, long rows, long cols, 
-			int rpb, int cpb, double minValue, double maxValue, double sparsity, long seed,
+			int rpb, int cpb, double minValue, double maxValue, double sparsity, long seed, String dir,
 			String probabilityDensityFunction, String pdfParams, String opcode, String istr) 
 	{
 		super(op, in, out, opcode, istr);
@@ -98,6 +105,7 @@ public class RandSPInstruction extends UnarySPInstruction
 		this.maxValue = maxValue;
 		this.sparsity = sparsity;
 		this.seed = seed;
+		this.dir = dir;
 		this.pdf = probabilityDensityFunction;
 		this.pdfParams = pdfParams;
 
@@ -205,7 +213,7 @@ public class RandSPInstruction extends UnarySPInstruction
 		DataGenMethod method = DataGenMethod.INVALID;
 		if ( opcode.equalsIgnoreCase(DataGen.RAND_OPCODE) ) {
 			method = DataGenMethod.RAND;
-			InstructionUtils.checkNumFields ( str, 11 );
+			InstructionUtils.checkNumFields ( str, 12 );
 		}
 		else if ( opcode.equalsIgnoreCase(DataGen.SEQ_OPCODE) ) {
 			method = DataGenMethod.SEQ;
@@ -249,10 +257,11 @@ public class RandSPInstruction extends UnarySPInstruction
 				seed = Long.parseLong(s[8]);
 			}
 				
-	        String pdf = s[9];
-			String pdfParams = s[10];
+			String dir = s[9];
+	        String pdf = s[10];
+			String pdfParams = s[11];
 			
-			return new RandSPInstruction(op, method, null, out, rows, cols, rpb, cpb, minValue, maxValue, sparsity, seed, pdf, pdfParams, opcode, str);
+			return new RandSPInstruction(op, method, null, out, rows, cols, rpb, cpb, minValue, maxValue, sparsity, seed, dir, pdf, pdfParams, opcode, str);
 		}
 		else if ( method == DataGenMethod.SEQ) {
 			// Example Instruction: CP:seq:11:1:1000:1000:1:0:-0.1:scratch_space/_p7932_192.168.1.120//_t0/:mVar1
@@ -309,136 +318,220 @@ public class RandSPInstruction extends UnarySPInstruction
 		SparkExecutionContext sec = (SparkExecutionContext)ec;
 		
 		//process specific datagen operator
-		if ( this.method == DataGenMethod.RAND ) 
+		switch( method ) {
+			case RAND: generateRandData(sec); break;
+			case SEQ: generateSequence(sec); break;
+			case SAMPLE: generateSample(sec); break;				
+			default: 
+				throw new DMLRuntimeException("Invalid datagen method: "+method); 
+		}
+	}
+	
+	/**
+	 * 
+	 * @param sec
+	 * @throws DMLRuntimeException
+	 */
+	private void generateRandData(SparkExecutionContext sec) 
+		throws DMLRuntimeException
+	{
+		//step 1: generate pseudo-random seed (because not specified) 
+		long lSeed = seed; //seed per invocation
+		if( lSeed == DataGenOp.UNSPECIFIED_SEED ) 
+			lSeed = DataGenOp.generateRandomSeed();
+		
+		if( LOG.isTraceEnabled() )
+			LOG.trace("Process RandSPInstruction rand with seed = "+lSeed+".");
+
+		//step 2: potential in-memory rand operations if applicable
+		if( isMemAvail(rows, cols, sparsity, minValue, maxValue) 
+			&&  DMLScript.rtplatform != RUNTIME_PLATFORM.SPARK )
 		{
-			// The implementation is in same spirit as MapReduce
-			// We generate seeds similar to org.apache.sysml.runtime.matrix.DataGenMR
-			// and then generate blocks similar to org.apache.sysml.runtime.matrix.mapred.DataGenMapper
-			
-			//generate pseudo-random seed (because not specified) 
-			long lSeed = seed; //seed per invocation
-			if( lSeed == DataGenOp.UNSPECIFIED_SEED ) 
-				lSeed = DataGenOp.generateRandomSeed();
+			RandomMatrixGenerator rgen = LibMatrixDatagen.createRandomMatrixGenerator(
+					pdf, (int)rows, (int)cols, rowsInBlock, colsInBlock, 
+					sparsity, minValue, maxValue, pdfParams);
+			MatrixBlock mb = MatrixBlock.randOperations(rgen, lSeed);
 			
-			if( LOG.isTraceEnabled() )
-				LOG.trace("Process RandSPInstruction rand with seed = "+lSeed+".");
-
-			//Check if there is sufficient memory for matrix to be created and execution platform is not forced Spark
-			if( isMemAvail(rows, cols, sparsity, minValue, maxValue) &&  DMLScript.rtplatform != RUNTIME_PLATFORM.SPARK)
-			{
-				RandomMatrixGenerator rgen = LibMatrixDatagen.createRandomMatrixGenerator(
-						pdf, 
-						(int)rows, (int)cols, 
-						rowsInBlock, colsInBlock, 
-						sparsity, minValue, maxValue, 
-						pdfParams);
-				MatrixBlock mb = MatrixBlock.randOperations(rgen, lSeed);
-				
-				sec.setMatrixOutput(output.getName(), mb);
-				Statistics.decrementNoOfExecutedSPInst();
-				return;
+			sec.setMatrixOutput(output.getName(), mb);
+			Statistics.decrementNoOfExecutedSPInst();
+			return;
+		}
+		
+		//step 3: seed generation 
+		JavaPairRDD<MatrixIndexes, Tuple2<Long, Long>> seedsRDD = null;
+		Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(lSeed);
+		long[] nnz = LibMatrixDatagen.computeNNZperBlock(rows, cols, rowsInBlock, colsInBlock, sparsity);
+		double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize();
+		long numBlocks = nnz.length;
+		long numColBlocks = (long)Math.ceil((double)cols/(double)colsInBlock);
+					
+		//a) in-memory seed rdd construction 
+		if( numBlocks < INMEMORY_NUMBLOCKS_THRESHOLD )
+		{
+			ArrayList<Tuple2<MatrixIndexes, Tuple2<Long, Long>>> seeds = 
+					new ArrayList<Tuple2<MatrixIndexes, Tuple2<Long, Long>>>();
+			double partSize = 0;
+			for( long i=0; i<numBlocks; i++ ) {
+				long r = 1 + i/numColBlocks;
+				long c = 1 + i%numColBlocks;
+				MatrixIndexes indx = new MatrixIndexes(r, c);
+				Long seedForBlock = bigrand.nextLong();
+				seeds.add(new Tuple2<MatrixIndexes, Tuple2<Long, Long>>(indx, 
+						new Tuple2<Long, Long>(seedForBlock, nnz[(int)i])));
+				partSize += nnz[(int)i] * 8 + 16;
 			}
 			
-			// seed generation (partitioned to bound memory requirements)
-			JavaPairRDD<MatrixIndexes, Tuple2<Long, Long>> seedsRDD = null;
-			Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(lSeed);
-			long[] nnz = LibMatrixDatagen.computeNNZperBlock(rows, cols, rowsInBlock, colsInBlock, sparsity);
-			double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize();
-			long numBlocks = nnz.length;
-			long numColBlocks = (long)Math.ceil((double)cols/(double)colsInBlock);
+			//for load balancing: degree of parallelism such that ~128MB per partition
+			int numPartitions = (int) Math.max(Math.min(partSize/hdfsBlkSize, numBlocks), 1);
+				
+			//create seeds rdd 
+			seedsRDD = JavaPairRDD.fromJavaRDD(sec.getSparkContext().parallelize(seeds, numPartitions));				
+		}
+		//b) file-based seed rdd construction (for robustness wrt large number of blocks)
+		else
+		{
+			String path = LibMatrixDatagen.generateUniqueSeedPath(dir);
+			double partSize = 0;
 			
-			for( long p = 0; p < numBlocks; p+=SEED_PARTITION_SIZE )
+			try
 			{
-				ArrayList<Tuple2<MatrixIndexes, Tuple2<Long, Long>>> seeds = new ArrayList<Tuple2<MatrixIndexes, Tuple2<Long, Long>>>();
-				double partitionSize = 0;
-				for( long i=p; i<Math.min(p+SEED_PARTITION_SIZE, numBlocks); i++ ) {
-					long r = 1 + i/numColBlocks;
-					long c = 1 + i%numColBlocks;
-					MatrixIndexes indx = new MatrixIndexes(r, c);
-					Long seedForBlock = bigrand.nextLong();
-					seeds.add(new Tuple2<MatrixIndexes, Tuple2<Long, Long>>(indx, new Tuple2<Long, Long>(seedForBlock, nnz[(int)i])));
-					partitionSize += nnz[(int)i] * 8 + 16;
+				FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
+				FSDataOutputStream fsOut = fs.create(new Path(path));
+				PrintWriter pw = new PrintWriter(fsOut);
+				StringBuilder sb = new StringBuilder();
+				for( long i=0; i<numBlocks; i++ ) {
+					sb.append(1 + i/numColBlocks);
+					sb.append(',');
+					sb.append(1 + i%numColBlocks);
+					sb.append(',');
+					sb.append(bigrand.nextLong());
+					sb.append(',');
+					sb.append(nnz[(int)i]);
+					pw.println(sb.toString());
+					sb.setLength(0);
+					partSize += nnz[(int)i] * 8 + 16;
 				}
-			
-				//for load balancing: degree of parallelism such that ~128MB per partition
-				int numPartitions = (int) Math.max(Math.min(partitionSize / hdfsBlockSize, seeds.size()), 1);
-				
-				//combine seeds partitions to seed rdd
-				JavaPairRDD<MatrixIndexes, Tuple2<Long, Long>> seedsRDD2 = 
-						JavaPairRDD.fromJavaRDD(sec.getSparkContext().parallelize(seeds, numPartitions));
-				seedsRDD = ( seedsRDD != null )? seedsRDD.union(seedsRDD2) : seedsRDD2;	
+				pw.close();
+				fsOut.close();
+			}
+			catch( IOException ex ) {
+				throw new DMLRuntimeException(ex);
 			}
 			
-			//execute rand instruction over seed input
-			JavaPairRDD<MatrixIndexes, MatrixBlock> out = seedsRDD.mapToPair(new GenerateRandomBlock(rows, cols, rowsInBlock, colsInBlock, sparsity, minValue, maxValue, pdf, pdfParams)); 
+			//for load balancing: degree of parallelism such that ~128MB per partition
+			int numPartitions = (int) Math.max(Math.min(partSize/hdfsBlkSize, numBlocks), 1);
 			
-			//output handling
-			MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
-			if(!mcOut.dimsKnown(true)) {
-				//note: we cannot compute the nnz from sparsity because this would not reflect the 
-				//actual number of non-zeros, except for extreme values of sparsity equals 0 or 1.
-				long lnnz = (sparsity==0 || sparsity==1) ? (long) (sparsity*rows*cols) : -1;
-				mcOut.set(rows, cols, rowsInBlock, colsInBlock, lnnz);
-			}
-			sec.setRDDHandleForVariable(output.getName(), out);
+			//create seeds rdd 
+			seedsRDD = sec.getSparkContext()
+					.textFile(path, numPartitions)
+					.mapToPair(new ExtractSeedTuple());
+		}
+		
+		//step 4: execute rand instruction over seed input
+		JavaPairRDD<MatrixIndexes, MatrixBlock> out = seedsRDD
+				.mapToPair(new GenerateRandomBlock(rows, cols, rowsInBlock, colsInBlock, 
+						sparsity, minValue, maxValue, pdf, pdfParams)); 
+		
+		//step 5: output handling
+		MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
+		if(!mcOut.dimsKnown(true)) {
+			//note: we cannot compute the nnz from sparsity because this would not reflect the 
+			//actual number of non-zeros, except for extreme values of sparsity equals 0 or 1.
+			long lnnz = (sparsity==0 || sparsity==1) ? (long) (sparsity*rows*cols) : -1;
+			mcOut.set(rows, cols, rowsInBlock, colsInBlock, lnnz);
+		}
+		sec.setRDDHandleForVariable(output.getName(), out);
+	}
+	
+	/**
+	 * 
+	 * @param sec
+	 * @throws DMLRuntimeException
+	 */
+	private void generateSequence(SparkExecutionContext sec) 
+		throws DMLRuntimeException
+	{
+		//sanity check valid increment
+		if(seq_incr == 0) {
+			throw new DMLRuntimeException("ERROR: While performing seq(" + seq_from + "," + seq_to + "," + seq_incr + ")");
 		}
-		else if ( this.method == DataGenMethod.SEQ ) 
+		
+		//handle default 1 to -1 for special case of from>to
+		seq_incr = LibMatrixDatagen.updateSeqIncr(seq_from, seq_to, seq_incr);
+		
+		if( LOG.isTraceEnabled() )
+			LOG.trace("Process RandSPInstruction seq with seqFrom="+seq_from+", seqTo="+seq_to+", seqIncr"+seq_incr);
+		
+		//step 1: offset generation 
+		JavaRDD<Double> offsetsRDD = null;
+		double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize();
+		long nnz = (long) Math.abs(Math.round((seq_to - seq_from)/seq_incr)) + 1;
+		long numBlocks = (long)Math.ceil(((double)nnz)/rowsInBlock);
+	
+		//a) in-memory offset rdd construction 
+		if( numBlocks < INMEMORY_NUMBLOCKS_THRESHOLD )
 		{
-			//sanity check valid increment
-			if(seq_incr == 0) {
-				throw new DMLRuntimeException("ERROR: While performing seq(" + seq_from + "," + seq_to + "," + seq_incr + ")");
+			ArrayList<Double> offsets = new ArrayList<Double>();
+			double partSize = 0;
+			for( long i=0; i<numBlocks; i++ ) {
+				double off = seq_from + seq_incr*i*rowsInBlock;
+				offsets.add(off);
+				partSize += rowsInBlock * 8 +16;
 			}
+				
+			//for load balancing: degree of parallelism such that ~128MB per partition
+			int numPartitions = (int) Math.max(Math.min(partSize/hdfsBlkSize, numBlocks), 1);
+				
+			//create offset rdd
+			offsetsRDD = sec.getSparkContext().parallelize(offsets, numPartitions);
+		}
+		//b) file-based offset rdd construction (for robustness wrt large number of blocks)
+		else
+		{
+			String path = LibMatrixDatagen.generateUniqueSeedPath(dir);
+			double partSize = 0;
 			
-			//handle default 1 to -1 for special case of from>to
-			seq_incr = LibMatrixDatagen.updateSeqIncr(seq_from, seq_to, seq_incr);
-			
-			if( LOG.isTraceEnabled() )
-				LOG.trace("Process RandSPInstruction seq with seqFrom="+seq_from+", seqTo="+seq_to+", seqIncr"+seq_incr);
-			
-			// offset generation (partitioned to bound memory requirements)
-			JavaRDD<Double> offsetsRDD = null;
-			double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize();
-			long nnz = (long) Math.abs(Math.round((seq_to - seq_from)/seq_incr)) + 1;
-			long numBlocks = (long)Math.ceil(((double)nnz)/rowsInBlock);
-		
-			for( long p = 0; p < numBlocks; p+=SEED_PARTITION_SIZE )
+			try
 			{
-				ArrayList<Double> offsets = new ArrayList<Double>();
-				double partitionSize = 0;
-				for( long i=p; i<Math.min(p+SEED_PARTITION_SIZE, numBlocks); i++ ) {
+				FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
+				FSDataOutputStream fsOut = fs.create(new Path(path));
+				PrintWriter pw = new PrintWriter(fsOut);
+				for( long i=0; i<numBlocks; i++ ) {
 					double off = seq_from + seq_incr*i*rowsInBlock;
-					offsets.add(off);
-					partitionSize += rowsInBlock * 8 +16;
+					pw.println(off);
+					partSize += rowsInBlock * 8 +16;
 				}
-				
-				//for load balancing: degree of parallelism such that ~128MB per partition
-				int numPartitions = (int) Math.max(Math.min(partitionSize / hdfsBlockSize, offsets.size()), 1);
-				
-				//combine seeds partitions to seed rdd
-				JavaRDD<Double> offsetsRDD2 = sec.getSparkContext().parallelize(offsets, numPartitions);
-				
-				offsetsRDD = ( offsetsRDD != null )? offsetsRDD.union(offsetsRDD2) : offsetsRDD2;		
+				pw.close();
+				fsOut.close();
 			}
-			
-			//sanity check number of non-zeros
-			if(nnz != rows && rows != -1) {
-				throw new DMLRuntimeException("Incorrect number of non-zeros: " + nnz + " != " + rows);
+			catch( IOException ex ) {
+				throw new DMLRuntimeException(ex);
 			}
 			
-			//execute seq instruction over offset input
-			JavaPairRDD<MatrixIndexes, MatrixBlock> out = offsetsRDD.mapToPair(new GenerateSequenceBlock(rowsInBlock, seq_from, seq_to, seq_incr));
-
-			//output handling
-			MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
-			if(!mcOut.dimsKnown()) {
-				mcOut.set(nnz, 1, rowsInBlock, colsInBlock, nnz);
-			}
-			sec.setRDDHandleForVariable(output.getName(), out);
+			//for load balancing: degree of parallelism such that ~128MB per partition
+			int numPartitions = (int) Math.max(Math.min(partSize/hdfsBlkSize, numBlocks), 1);
+			
+			//create seeds rdd 
+			offsetsRDD = sec.getSparkContext()
+					.textFile(path, numPartitions)
+					.map(new ExtractOffsetTuple());
 		}
-		else if (this.method == DataGenMethod.SAMPLE)
-		{
-			generateSample(sec);
+		
+		//sanity check number of non-zeros
+		if(nnz != rows && rows != -1) {
+			throw new DMLRuntimeException("Incorrect number of non-zeros: " + nnz + " != " + rows);
 		}
+		
+		//step 2: execute seq instruction over offset input
+		JavaPairRDD<MatrixIndexes, MatrixBlock> out = offsetsRDD
+				.mapToPair(new GenerateSequenceBlock(rowsInBlock, seq_from, seq_to, seq_incr));
+
+		//step 3: output handling
+		MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
+		if(!mcOut.dimsKnown()) {
+			mcOut.set(nnz, 1, rowsInBlock, colsInBlock, nnz);
+		}
+		sec.setRDDHandleForVariable(output.getName(), out);
 	}
 	
 	/**
@@ -447,7 +540,8 @@ public class RandSPInstruction extends UnarySPInstruction
 	 * @param sec
 	 * @throws DMLRuntimeException
 	 */
-	private void generateSample(SparkExecutionContext sec) throws DMLRuntimeException 
+	private void generateSample(SparkExecutionContext sec) 
+		throws DMLRuntimeException 
 	{
 		if ( maxValue < rows && !replace )
 			throw new DMLRuntimeException("Sample (size=" + rows + ") larger than population (size=" + maxValue + ") can only be generated with replacement.");
@@ -648,6 +742,38 @@ public class RandSPInstruction extends UnarySPInstruction
 	/**
 	 * 
 	 */
+	private static class ExtractSeedTuple implements PairFunction<String, MatrixIndexes, Tuple2<Long,Long>> {
+		private static final long serialVersionUID = 3973794676854157101L;
+
+		@Override
+		public Tuple2<MatrixIndexes, Tuple2<Long, Long>> call(String arg)
+				throws Exception 
+		{
+			String[] parts = IOUtilFunctions.split(arg, ",");
+			MatrixIndexes ix = new MatrixIndexes(
+					Long.parseLong(parts[0]), Long.parseLong(parts[1]));
+			Tuple2<Long,Long> seed = new Tuple2<Long,Long>(
+					Long.parseLong(parts[2]), Long.parseLong(parts[3]));
+			
+			return new Tuple2<MatrixIndexes, Tuple2<Long, Long>>(ix,seed);
+		}
+	}
+	
+	/**
+	 * 
+	 */
+	private static class ExtractOffsetTuple implements Function<String, Double> {
+		private static final long serialVersionUID = -3980257526545002552L;
+
+		@Override
+		public Double call(String arg) throws Exception {
+			return Double.parseDouble(arg);
+		}
+	}
+	
+	/**
+	 * 
+	 */
 	private static class GenerateRandomBlock implements PairFunction<Tuple2<MatrixIndexes, Tuple2<Long, Long> >, MatrixIndexes, MatrixBlock> 
 	{
 		private static final long serialVersionUID = 1616346120426470173L;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/91a1863b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
index ba933a4..0d34d0b 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
@@ -40,7 +40,6 @@ import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.runtime.instructions.MRInstructionParser;
 import org.apache.sysml.runtime.instructions.MRJobInstruction;
 import org.apache.sysml.runtime.instructions.mr.DataGenMRInstruction;
@@ -72,8 +71,6 @@ public class DataGenMR
 {
 	private static final Log LOG = LogFactory.getLog(DataGenMR.class.getName());
 	
-	private static IDSequence _seqRandInput = new IDSequence(); 
-	
 	private DataGenMR() {
 		//prevent instantiation via private constructor
 	}
@@ -148,7 +145,7 @@ public class DataGenMR
 			if ( mrtype == MRINSTRUCTION_TYPE.Rand ) 
 			{
 				RandInstruction randInst = (RandInstruction) mrins;
-				inputs[i]=genInst.getBaseDir() + "tmp"+_seqRandInput.getNextID()+".randinput";
+				inputs[i]=LibMatrixDatagen.generateUniqueSeedPath(genInst.getBaseDir());
 				maxsparsity = Math.max(maxsparsity, randInst.getSparsity());
 				
 				FSDataOutputStream fsOut = fs.create(new Path(inputs[i]));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/91a1863b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
index 5f2664c..7844ac3 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
@@ -30,9 +30,9 @@ import java.util.concurrent.Executors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.math3.random.Well1024a;
-
 import org.apache.sysml.hops.DataGenOp;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.runtime.util.NormalPRNGenerator;
 import org.apache.sysml.runtime.util.PRNGenerator;
 import org.apache.sysml.runtime.util.PoissonPRNGenerator;
@@ -43,13 +43,13 @@ import org.apache.sysml.runtime.util.UniformPRNGenerator;
  */
 public class LibMatrixDatagen 
 {
-	
 	protected static final Log LOG = LogFactory.getLog(LibMatrixDatagen.class.getName());
-	
 	public static final String RAND_PDF_UNIFORM = "uniform";
 	public static final String RAND_PDF_NORMAL = "normal";
 	public static final String RAND_PDF_POISSON = "poisson";
 	
+	private static IDSequence _seqRandInput = new IDSequence(); 
+	
 	private LibMatrixDatagen() {
 		//prevent instantiation via private constructor
 	}
@@ -82,6 +82,15 @@ public class LibMatrixDatagen
 	}
 	
 	/**
+	 * 
+	 * @param basedir
+	 * @return
+	 */
+	public static String generateUniqueSeedPath( String basedir ) {
+		return basedir + "tmp" + _seqRandInput.getNextID() + ".randinput";
+	}
+	
+	/**
 	 * A matrix of random numbers is generated by using multiple seeds, one for each 
 	 * block. Such block-level seeds are produced via Well equidistributed long-period linear 
 	 * generator (Well1024a). For a given seed, this function sets up the block-level seeds.
@@ -120,7 +129,6 @@ public class LibMatrixDatagen
 	 */
 	public static long[] computeNNZperBlock(long nrow, long ncol, int brlen, int bclen, double sparsity) throws DMLRuntimeException {
 		int numBlocks = (int) (Math.ceil((double)nrow/brlen) * Math.ceil((double)ncol/bclen));
-		//System.out.println("nrow=" + nrow + ", brlen=" + brlen + ", ncol="+ncol+", bclen=" + bclen + "::: " + Math.ceil(nrow/brlen));
 		
 		// CURRENT: 
 		// 		Total #of NNZ is set to the expected value (nrow*ncol*sparsity).
@@ -128,7 +136,6 @@ public class LibMatrixDatagen
 		//		Instead of using the expected value, one should actually 
 		// 		treat NNZ as a random variable and accordingly generate a random value.
 		long nnz = (long) Math.ceil (nrow * (ncol*sparsity));
-		//System.out.println("Number of blocks = " + numBlocks + "; NNZ = " + nnz);
 
 		if ( numBlocks > Integer.MAX_VALUE ) {
 			throw new DMLRuntimeException("A random matrix of size [" + nrow + "," + ncol + "] can not be created. Number of blocks (" +  numBlocks + ") exceeds the maximum integer size. Try to increase the block size.");
@@ -136,7 +143,6 @@ public class LibMatrixDatagen
 		
 		// Compute block-level NNZ
 		long[] ret  = new long[numBlocks];
-		Arrays.fill(ret, 0);
 		
 		if ( nnz < numBlocks ) {
 			// Ultra-sparse matrix
@@ -187,14 +193,12 @@ public class LibMatrixDatagen
 		else {
 			int bid = 0;
 			
-			//long actualnnz = 0;
 			for(long r = 0; r < nrow; r += brlen) {
 				long curBlockRowSize = Math.min(brlen, (nrow - r));
 				for(long c = 0; c < ncol; c += bclen)
 				{
 					long curBlockColSize = Math.min(bclen, (ncol - c));
 					ret[bid] = (long) (curBlockRowSize * curBlockColSize * sparsity);
-					//actualnnz += ret[bid];
 					bid++;
 				}
 			}