You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/09 23:49:38 UTC
[2/2] incubator-systemml git commit: [SYSTEMML-376] Fix robustness
spark rand/seq ops (oom on many blocks)
[SYSTEMML-376] Fix robustness spark rand/seq ops (oom on many blocks)
Similar to our mapreduce datagen job, we now materialize seed/offset
inputs on hdfs if the number of blocks exceeds a threshold of 1M blocks.
This significantly reduces the memory requirements because otherwise the
driver has to accommodate the entire seed/offset collection in memory.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/91a1863b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/91a1863b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/91a1863b
Branch: refs/heads/master
Commit: 91a1863b9a9e5b6c6c58e739e178572313e21b17
Parents: 3658a80
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Jan 8 17:19:51 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Jan 8 17:19:51 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/sysml/lops/DataGen.java | 2 +-
.../instructions/spark/RandSPInstruction.java | 358 +++++++++++++------
.../apache/sysml/runtime/matrix/DataGenMR.java | 5 +-
.../runtime/matrix/data/LibMatrixDatagen.java | 20 +-
4 files changed, 256 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/91a1863b/src/main/java/org/apache/sysml/lops/DataGen.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/DataGen.java b/src/main/java/org/apache/sysml/lops/DataGen.java
index c1a70b4..ec8fb9c 100644
--- a/src/main/java/org/apache/sysml/lops/DataGen.java
+++ b/src/main/java/org/apache/sysml/lops/DataGen.java
@@ -208,7 +208,7 @@ public class DataGen extends Lop
sb.append(iLop.prepScalarLabel());
sb.append(OPERAND_DELIMITOR);
- if ( getExecType() == ExecType.MR ) {
+ if ( getExecType() == ExecType.MR || getExecType() == ExecType.SPARK ) {
sb.append(baseDir);
sb.append(OPERAND_DELIMITOR);
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/91a1863b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
index b1eca1e..dfb53a1 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
@@ -19,12 +19,17 @@
package org.apache.sysml.runtime.instructions.spark;
+import java.io.IOException;
+import java.io.PrintWriter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Random;
import org.apache.commons.math3.distribution.PoissonDistribution;
import org.apache.commons.math3.random.Well1024a;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -36,6 +41,7 @@ import scala.Tuple2;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.hops.DataGenOp;
import org.apache.sysml.hops.Hop.DataGenMethod;
import org.apache.sysml.hops.OptimizerUtils;
@@ -48,6 +54,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.LibMatrixDatagen;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -60,9 +67,8 @@ import org.apache.sysml.utils.Statistics;
public class RandSPInstruction extends UnarySPInstruction
{
-
//internal configuration
- private static final long SEED_PARTITION_SIZE = 1024 * 1024;
+ private static final long INMEMORY_NUMBLOCKS_THRESHOLD = 1024 * 1024;
private DataGenMethod method = DataGenMethod.INVALID;
@@ -76,6 +82,7 @@ public class RandSPInstruction extends UnarySPInstruction
private String pdf;
private String pdfParams;
private long seed=0;
+ private String dir;
private double seq_from;
private double seq_to;
private double seq_incr;
@@ -84,7 +91,7 @@ public class RandSPInstruction extends UnarySPInstruction
private boolean replace;
public RandSPInstruction (Operator op, DataGenMethod mthd, CPOperand in, CPOperand out, long rows, long cols,
- int rpb, int cpb, double minValue, double maxValue, double sparsity, long seed,
+ int rpb, int cpb, double minValue, double maxValue, double sparsity, long seed, String dir,
String probabilityDensityFunction, String pdfParams, String opcode, String istr)
{
super(op, in, out, opcode, istr);
@@ -98,6 +105,7 @@ public class RandSPInstruction extends UnarySPInstruction
this.maxValue = maxValue;
this.sparsity = sparsity;
this.seed = seed;
+ this.dir = dir;
this.pdf = probabilityDensityFunction;
this.pdfParams = pdfParams;
@@ -205,7 +213,7 @@ public class RandSPInstruction extends UnarySPInstruction
DataGenMethod method = DataGenMethod.INVALID;
if ( opcode.equalsIgnoreCase(DataGen.RAND_OPCODE) ) {
method = DataGenMethod.RAND;
- InstructionUtils.checkNumFields ( str, 11 );
+ InstructionUtils.checkNumFields ( str, 12 );
}
else if ( opcode.equalsIgnoreCase(DataGen.SEQ_OPCODE) ) {
method = DataGenMethod.SEQ;
@@ -249,10 +257,11 @@ public class RandSPInstruction extends UnarySPInstruction
seed = Long.parseLong(s[8]);
}
- String pdf = s[9];
- String pdfParams = s[10];
+ String dir = s[9];
+ String pdf = s[10];
+ String pdfParams = s[11];
- return new RandSPInstruction(op, method, null, out, rows, cols, rpb, cpb, minValue, maxValue, sparsity, seed, pdf, pdfParams, opcode, str);
+ return new RandSPInstruction(op, method, null, out, rows, cols, rpb, cpb, minValue, maxValue, sparsity, seed, dir, pdf, pdfParams, opcode, str);
}
else if ( method == DataGenMethod.SEQ) {
// Example Instruction: CP:seq:11:1:1000:1000:1:0:-0.1:scratch_space/_p7932_192.168.1.120//_t0/:mVar1
@@ -309,136 +318,220 @@ public class RandSPInstruction extends UnarySPInstruction
SparkExecutionContext sec = (SparkExecutionContext)ec;
//process specific datagen operator
- if ( this.method == DataGenMethod.RAND )
+ switch( method ) {
+ case RAND: generateRandData(sec); break;
+ case SEQ: generateSequence(sec); break;
+ case SAMPLE: generateSample(sec); break;
+ default:
+ throw new DMLRuntimeException("Invalid datagen method: "+method);
+ }
+ }
+
+ /**
+ *
+ * @param sec
+ * @throws DMLRuntimeException
+ */
+ private void generateRandData(SparkExecutionContext sec)
+ throws DMLRuntimeException
+ {
+ //step 1: generate pseudo-random seed (because not specified)
+ long lSeed = seed; //seed per invocation
+ if( lSeed == DataGenOp.UNSPECIFIED_SEED )
+ lSeed = DataGenOp.generateRandomSeed();
+
+ if( LOG.isTraceEnabled() )
+ LOG.trace("Process RandSPInstruction rand with seed = "+lSeed+".");
+
+ //step 2: potential in-memory rand operations if applicable
+ if( isMemAvail(rows, cols, sparsity, minValue, maxValue)
+ && DMLScript.rtplatform != RUNTIME_PLATFORM.SPARK )
{
- // The implementation is in same spirit as MapReduce
- // We generate seeds similar to org.apache.sysml.runtime.matrix.DataGenMR
- // and then generate blocks similar to org.apache.sysml.runtime.matrix.mapred.DataGenMapper
-
- //generate pseudo-random seed (because not specified)
- long lSeed = seed; //seed per invocation
- if( lSeed == DataGenOp.UNSPECIFIED_SEED )
- lSeed = DataGenOp.generateRandomSeed();
+ RandomMatrixGenerator rgen = LibMatrixDatagen.createRandomMatrixGenerator(
+ pdf, (int)rows, (int)cols, rowsInBlock, colsInBlock,
+ sparsity, minValue, maxValue, pdfParams);
+ MatrixBlock mb = MatrixBlock.randOperations(rgen, lSeed);
- if( LOG.isTraceEnabled() )
- LOG.trace("Process RandSPInstruction rand with seed = "+lSeed+".");
-
- //Check if there is sufficient memory for matrix to be created and execution platform is not forced Spark
- if( isMemAvail(rows, cols, sparsity, minValue, maxValue) && DMLScript.rtplatform != RUNTIME_PLATFORM.SPARK)
- {
- RandomMatrixGenerator rgen = LibMatrixDatagen.createRandomMatrixGenerator(
- pdf,
- (int)rows, (int)cols,
- rowsInBlock, colsInBlock,
- sparsity, minValue, maxValue,
- pdfParams);
- MatrixBlock mb = MatrixBlock.randOperations(rgen, lSeed);
-
- sec.setMatrixOutput(output.getName(), mb);
- Statistics.decrementNoOfExecutedSPInst();
- return;
+ sec.setMatrixOutput(output.getName(), mb);
+ Statistics.decrementNoOfExecutedSPInst();
+ return;
+ }
+
+ //step 3: seed generation
+ JavaPairRDD<MatrixIndexes, Tuple2<Long, Long>> seedsRDD = null;
+ Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(lSeed);
+ long[] nnz = LibMatrixDatagen.computeNNZperBlock(rows, cols, rowsInBlock, colsInBlock, sparsity);
+ double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize();
+ long numBlocks = nnz.length;
+ long numColBlocks = (long)Math.ceil((double)cols/(double)colsInBlock);
+
+ //a) in-memory seed rdd construction
+ if( numBlocks < INMEMORY_NUMBLOCKS_THRESHOLD )
+ {
+ ArrayList<Tuple2<MatrixIndexes, Tuple2<Long, Long>>> seeds =
+ new ArrayList<Tuple2<MatrixIndexes, Tuple2<Long, Long>>>();
+ double partSize = 0;
+ for( long i=0; i<numBlocks; i++ ) {
+ long r = 1 + i/numColBlocks;
+ long c = 1 + i%numColBlocks;
+ MatrixIndexes indx = new MatrixIndexes(r, c);
+ Long seedForBlock = bigrand.nextLong();
+ seeds.add(new Tuple2<MatrixIndexes, Tuple2<Long, Long>>(indx,
+ new Tuple2<Long, Long>(seedForBlock, nnz[(int)i])));
+ partSize += nnz[(int)i] * 8 + 16;
}
- // seed generation (partitioned to bound memory requirements)
- JavaPairRDD<MatrixIndexes, Tuple2<Long, Long>> seedsRDD = null;
- Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(lSeed);
- long[] nnz = LibMatrixDatagen.computeNNZperBlock(rows, cols, rowsInBlock, colsInBlock, sparsity);
- double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize();
- long numBlocks = nnz.length;
- long numColBlocks = (long)Math.ceil((double)cols/(double)colsInBlock);
+ //for load balancing: degree of parallelism such that ~128MB per partition
+ int numPartitions = (int) Math.max(Math.min(partSize/hdfsBlkSize, numBlocks), 1);
+
+ //create seeds rdd
+ seedsRDD = JavaPairRDD.fromJavaRDD(sec.getSparkContext().parallelize(seeds, numPartitions));
+ }
+ //b) file-based seed rdd construction (for robustness wrt large number of blocks)
+ else
+ {
+ String path = LibMatrixDatagen.generateUniqueSeedPath(dir);
+ double partSize = 0;
- for( long p = 0; p < numBlocks; p+=SEED_PARTITION_SIZE )
+ try
{
- ArrayList<Tuple2<MatrixIndexes, Tuple2<Long, Long>>> seeds = new ArrayList<Tuple2<MatrixIndexes, Tuple2<Long, Long>>>();
- double partitionSize = 0;
- for( long i=p; i<Math.min(p+SEED_PARTITION_SIZE, numBlocks); i++ ) {
- long r = 1 + i/numColBlocks;
- long c = 1 + i%numColBlocks;
- MatrixIndexes indx = new MatrixIndexes(r, c);
- Long seedForBlock = bigrand.nextLong();
- seeds.add(new Tuple2<MatrixIndexes, Tuple2<Long, Long>>(indx, new Tuple2<Long, Long>(seedForBlock, nnz[(int)i])));
- partitionSize += nnz[(int)i] * 8 + 16;
+ FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
+ FSDataOutputStream fsOut = fs.create(new Path(path));
+ PrintWriter pw = new PrintWriter(fsOut);
+ StringBuilder sb = new StringBuilder();
+ for( long i=0; i<numBlocks; i++ ) {
+ sb.append(1 + i/numColBlocks);
+ sb.append(',');
+ sb.append(1 + i%numColBlocks);
+ sb.append(',');
+ sb.append(bigrand.nextLong());
+ sb.append(',');
+ sb.append(nnz[(int)i]);
+ pw.println(sb.toString());
+ sb.setLength(0);
+ partSize += nnz[(int)i] * 8 + 16;
}
-
- //for load balancing: degree of parallelism such that ~128MB per partition
- int numPartitions = (int) Math.max(Math.min(partitionSize / hdfsBlockSize, seeds.size()), 1);
-
- //combine seeds partitions to seed rdd
- JavaPairRDD<MatrixIndexes, Tuple2<Long, Long>> seedsRDD2 =
- JavaPairRDD.fromJavaRDD(sec.getSparkContext().parallelize(seeds, numPartitions));
- seedsRDD = ( seedsRDD != null )? seedsRDD.union(seedsRDD2) : seedsRDD2;
+ pw.close();
+ fsOut.close();
+ }
+ catch( IOException ex ) {
+ throw new DMLRuntimeException(ex);
}
- //execute rand instruction over seed input
- JavaPairRDD<MatrixIndexes, MatrixBlock> out = seedsRDD.mapToPair(new GenerateRandomBlock(rows, cols, rowsInBlock, colsInBlock, sparsity, minValue, maxValue, pdf, pdfParams));
+ //for load balancing: degree of parallelism such that ~128MB per partition
+ int numPartitions = (int) Math.max(Math.min(partSize/hdfsBlkSize, numBlocks), 1);
- //output handling
- MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
- if(!mcOut.dimsKnown(true)) {
- //note: we cannot compute the nnz from sparsity because this would not reflect the
- //actual number of non-zeros, except for extreme values of sparsity equals 0 or 1.
- long lnnz = (sparsity==0 || sparsity==1) ? (long) (sparsity*rows*cols) : -1;
- mcOut.set(rows, cols, rowsInBlock, colsInBlock, lnnz);
- }
- sec.setRDDHandleForVariable(output.getName(), out);
+ //create seeds rdd
+ seedsRDD = sec.getSparkContext()
+ .textFile(path, numPartitions)
+ .mapToPair(new ExtractSeedTuple());
+ }
+
+ //step 4: execute rand instruction over seed input
+ JavaPairRDD<MatrixIndexes, MatrixBlock> out = seedsRDD
+ .mapToPair(new GenerateRandomBlock(rows, cols, rowsInBlock, colsInBlock,
+ sparsity, minValue, maxValue, pdf, pdfParams));
+
+ //step 5: output handling
+ MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
+ if(!mcOut.dimsKnown(true)) {
+ //note: we cannot compute the nnz from sparsity because this would not reflect the
+ //actual number of non-zeros, except for extreme values of sparsity equals 0 or 1.
+ long lnnz = (sparsity==0 || sparsity==1) ? (long) (sparsity*rows*cols) : -1;
+ mcOut.set(rows, cols, rowsInBlock, colsInBlock, lnnz);
+ }
+ sec.setRDDHandleForVariable(output.getName(), out);
+ }
+
+ /**
+ *
+ * @param sec
+ * @throws DMLRuntimeException
+ */
+ private void generateSequence(SparkExecutionContext sec)
+ throws DMLRuntimeException
+ {
+ //sanity check valid increment
+ if(seq_incr == 0) {
+ throw new DMLRuntimeException("ERROR: While performing seq(" + seq_from + "," + seq_to + "," + seq_incr + ")");
}
- else if ( this.method == DataGenMethod.SEQ )
+
+ //handle default 1 to -1 for special case of from>to
+ seq_incr = LibMatrixDatagen.updateSeqIncr(seq_from, seq_to, seq_incr);
+
+ if( LOG.isTraceEnabled() )
+ LOG.trace("Process RandSPInstruction seq with seqFrom="+seq_from+", seqTo="+seq_to+", seqIncr"+seq_incr);
+
+ //step 1: offset generation
+ JavaRDD<Double> offsetsRDD = null;
+ double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize();
+ long nnz = (long) Math.abs(Math.round((seq_to - seq_from)/seq_incr)) + 1;
+ long numBlocks = (long)Math.ceil(((double)nnz)/rowsInBlock);
+
+ //a) in-memory offset rdd construction
+ if( numBlocks < INMEMORY_NUMBLOCKS_THRESHOLD )
{
- //sanity check valid increment
- if(seq_incr == 0) {
- throw new DMLRuntimeException("ERROR: While performing seq(" + seq_from + "," + seq_to + "," + seq_incr + ")");
+ ArrayList<Double> offsets = new ArrayList<Double>();
+ double partSize = 0;
+ for( long i=0; i<numBlocks; i++ ) {
+ double off = seq_from + seq_incr*i*rowsInBlock;
+ offsets.add(off);
+ partSize += rowsInBlock * 8 +16;
}
+
+ //for load balancing: degree of parallelism such that ~128MB per partition
+ int numPartitions = (int) Math.max(Math.min(partSize/hdfsBlkSize, numBlocks), 1);
+
+ //create offset rdd
+ offsetsRDD = sec.getSparkContext().parallelize(offsets, numPartitions);
+ }
+ //b) file-based offset rdd construction (for robustness wrt large number of blocks)
+ else
+ {
+ String path = LibMatrixDatagen.generateUniqueSeedPath(dir);
+ double partSize = 0;
- //handle default 1 to -1 for special case of from>to
- seq_incr = LibMatrixDatagen.updateSeqIncr(seq_from, seq_to, seq_incr);
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Process RandSPInstruction seq with seqFrom="+seq_from+", seqTo="+seq_to+", seqIncr"+seq_incr);
-
- // offset generation (partitioned to bound memory requirements)
- JavaRDD<Double> offsetsRDD = null;
- double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize();
- long nnz = (long) Math.abs(Math.round((seq_to - seq_from)/seq_incr)) + 1;
- long numBlocks = (long)Math.ceil(((double)nnz)/rowsInBlock);
-
- for( long p = 0; p < numBlocks; p+=SEED_PARTITION_SIZE )
+ try
{
- ArrayList<Double> offsets = new ArrayList<Double>();
- double partitionSize = 0;
- for( long i=p; i<Math.min(p+SEED_PARTITION_SIZE, numBlocks); i++ ) {
+ FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
+ FSDataOutputStream fsOut = fs.create(new Path(path));
+ PrintWriter pw = new PrintWriter(fsOut);
+ for( long i=0; i<numBlocks; i++ ) {
double off = seq_from + seq_incr*i*rowsInBlock;
- offsets.add(off);
- partitionSize += rowsInBlock * 8 +16;
+ pw.println(off);
+ partSize += rowsInBlock * 8 +16;
}
-
- //for load balancing: degree of parallelism such that ~128MB per partition
- int numPartitions = (int) Math.max(Math.min(partitionSize / hdfsBlockSize, offsets.size()), 1);
-
- //combine seeds partitions to seed rdd
- JavaRDD<Double> offsetsRDD2 = sec.getSparkContext().parallelize(offsets, numPartitions);
-
- offsetsRDD = ( offsetsRDD != null )? offsetsRDD.union(offsetsRDD2) : offsetsRDD2;
+ pw.close();
+ fsOut.close();
}
-
- //sanity check number of non-zeros
- if(nnz != rows && rows != -1) {
- throw new DMLRuntimeException("Incorrect number of non-zeros: " + nnz + " != " + rows);
+ catch( IOException ex ) {
+ throw new DMLRuntimeException(ex);
}
- //execute seq instruction over offset input
- JavaPairRDD<MatrixIndexes, MatrixBlock> out = offsetsRDD.mapToPair(new GenerateSequenceBlock(rowsInBlock, seq_from, seq_to, seq_incr));
-
- //output handling
- MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
- if(!mcOut.dimsKnown()) {
- mcOut.set(nnz, 1, rowsInBlock, colsInBlock, nnz);
- }
- sec.setRDDHandleForVariable(output.getName(), out);
+ //for load balancing: degree of parallelism such that ~128MB per partition
+ int numPartitions = (int) Math.max(Math.min(partSize/hdfsBlkSize, numBlocks), 1);
+
+ //create seeds rdd
+ offsetsRDD = sec.getSparkContext()
+ .textFile(path, numPartitions)
+ .map(new ExtractOffsetTuple());
}
- else if (this.method == DataGenMethod.SAMPLE)
- {
- generateSample(sec);
+
+ //sanity check number of non-zeros
+ if(nnz != rows && rows != -1) {
+ throw new DMLRuntimeException("Incorrect number of non-zeros: " + nnz + " != " + rows);
}
+
+ //step 2: execute seq instruction over offset input
+ JavaPairRDD<MatrixIndexes, MatrixBlock> out = offsetsRDD
+ .mapToPair(new GenerateSequenceBlock(rowsInBlock, seq_from, seq_to, seq_incr));
+
+ //step 3: output handling
+ MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
+ if(!mcOut.dimsKnown()) {
+ mcOut.set(nnz, 1, rowsInBlock, colsInBlock, nnz);
+ }
+ sec.setRDDHandleForVariable(output.getName(), out);
}
/**
@@ -447,7 +540,8 @@ public class RandSPInstruction extends UnarySPInstruction
* @param sec
* @throws DMLRuntimeException
*/
- private void generateSample(SparkExecutionContext sec) throws DMLRuntimeException
+ private void generateSample(SparkExecutionContext sec)
+ throws DMLRuntimeException
{
if ( maxValue < rows && !replace )
throw new DMLRuntimeException("Sample (size=" + rows + ") larger than population (size=" + maxValue + ") can only be generated with replacement.");
@@ -648,6 +742,38 @@ public class RandSPInstruction extends UnarySPInstruction
/**
*
*/
+ private static class ExtractSeedTuple implements PairFunction<String, MatrixIndexes, Tuple2<Long,Long>> {
+ private static final long serialVersionUID = 3973794676854157101L;
+
+ @Override
+ public Tuple2<MatrixIndexes, Tuple2<Long, Long>> call(String arg)
+ throws Exception
+ {
+ String[] parts = IOUtilFunctions.split(arg, ",");
+ MatrixIndexes ix = new MatrixIndexes(
+ Long.parseLong(parts[0]), Long.parseLong(parts[1]));
+ Tuple2<Long,Long> seed = new Tuple2<Long,Long>(
+ Long.parseLong(parts[2]), Long.parseLong(parts[3]));
+
+ return new Tuple2<MatrixIndexes, Tuple2<Long, Long>>(ix,seed);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class ExtractOffsetTuple implements Function<String, Double> {
+ private static final long serialVersionUID = -3980257526545002552L;
+
+ @Override
+ public Double call(String arg) throws Exception {
+ return Double.parseDouble(arg);
+ }
+ }
+
+ /**
+ *
+ */
private static class GenerateRandomBlock implements PairFunction<Tuple2<MatrixIndexes, Tuple2<Long, Long> >, MatrixIndexes, MatrixBlock>
{
private static final long serialVersionUID = 1616346120426470173L;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/91a1863b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
index ba933a4..0d34d0b 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java
@@ -40,7 +40,6 @@ import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.lops.Lop;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysml.runtime.instructions.MRInstructionParser;
import org.apache.sysml.runtime.instructions.MRJobInstruction;
import org.apache.sysml.runtime.instructions.mr.DataGenMRInstruction;
@@ -72,8 +71,6 @@ public class DataGenMR
{
private static final Log LOG = LogFactory.getLog(DataGenMR.class.getName());
- private static IDSequence _seqRandInput = new IDSequence();
-
private DataGenMR() {
//prevent instantiation via private constructor
}
@@ -148,7 +145,7 @@ public class DataGenMR
if ( mrtype == MRINSTRUCTION_TYPE.Rand )
{
RandInstruction randInst = (RandInstruction) mrins;
- inputs[i]=genInst.getBaseDir() + "tmp"+_seqRandInput.getNextID()+".randinput";
+ inputs[i]=LibMatrixDatagen.generateUniqueSeedPath(genInst.getBaseDir());
maxsparsity = Math.max(maxsparsity, randInst.getSparsity());
FSDataOutputStream fsOut = fs.create(new Path(inputs[i]));
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/91a1863b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
index 5f2664c..7844ac3 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixDatagen.java
@@ -30,9 +30,9 @@ import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.random.Well1024a;
-
import org.apache.sysml.hops.DataGenOp;
import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysml.runtime.util.NormalPRNGenerator;
import org.apache.sysml.runtime.util.PRNGenerator;
import org.apache.sysml.runtime.util.PoissonPRNGenerator;
@@ -43,13 +43,13 @@ import org.apache.sysml.runtime.util.UniformPRNGenerator;
*/
public class LibMatrixDatagen
{
-
protected static final Log LOG = LogFactory.getLog(LibMatrixDatagen.class.getName());
-
public static final String RAND_PDF_UNIFORM = "uniform";
public static final String RAND_PDF_NORMAL = "normal";
public static final String RAND_PDF_POISSON = "poisson";
+ private static IDSequence _seqRandInput = new IDSequence();
+
private LibMatrixDatagen() {
//prevent instantiation via private constructor
}
@@ -82,6 +82,15 @@ public class LibMatrixDatagen
}
/**
+ *
+ * @param basedir
+ * @return
+ */
+ public static String generateUniqueSeedPath( String basedir ) {
+ return basedir + "tmp" + _seqRandInput.getNextID() + ".randinput";
+ }
+
+ /**
* A matrix of random numbers is generated by using multiple seeds, one for each
* block. Such block-level seeds are produced via Well equidistributed long-period linear
* generator (Well1024a). For a given seed, this function sets up the block-level seeds.
@@ -120,7 +129,6 @@ public class LibMatrixDatagen
*/
public static long[] computeNNZperBlock(long nrow, long ncol, int brlen, int bclen, double sparsity) throws DMLRuntimeException {
int numBlocks = (int) (Math.ceil((double)nrow/brlen) * Math.ceil((double)ncol/bclen));
- //System.out.println("nrow=" + nrow + ", brlen=" + brlen + ", ncol="+ncol+", bclen=" + bclen + "::: " + Math.ceil(nrow/brlen));
// CURRENT:
// Total #of NNZ is set to the expected value (nrow*ncol*sparsity).
@@ -128,7 +136,6 @@ public class LibMatrixDatagen
// Instead of using the expected value, one should actually
// treat NNZ as a random variable and accordingly generate a random value.
long nnz = (long) Math.ceil (nrow * (ncol*sparsity));
- //System.out.println("Number of blocks = " + numBlocks + "; NNZ = " + nnz);
if ( numBlocks > Integer.MAX_VALUE ) {
throw new DMLRuntimeException("A random matrix of size [" + nrow + "," + ncol + "] can not be created. Number of blocks (" + numBlocks + ") exceeds the maximum integer size. Try to increase the block size.");
@@ -136,7 +143,6 @@ public class LibMatrixDatagen
// Compute block-level NNZ
long[] ret = new long[numBlocks];
- Arrays.fill(ret, 0);
if ( nnz < numBlocks ) {
// Ultra-sparse matrix
@@ -187,14 +193,12 @@ public class LibMatrixDatagen
else {
int bid = 0;
- //long actualnnz = 0;
for(long r = 0; r < nrow; r += brlen) {
long curBlockRowSize = Math.min(brlen, (nrow - r));
for(long c = 0; c < ncol; c += bclen)
{
long curBlockColSize = Math.min(bclen, (ncol - c));
ret[bid] = (long) (curBlockRowSize * curBlockColSize * sparsity);
- //actualnnz += ret[bid];
bid++;
}
}