You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/02/14 23:15:57 UTC
[2/3] incubator-systemml git commit: Fix mr iqm/quantile record
readers (bounded partition read, progress)
Fix mr iqm/quantile record readers (bounded partition read, progress)
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/240143bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/240143bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/240143bb
Branch: refs/heads/master
Commit: 240143bbad57489872b86114cee4ac206fa9c2a4
Parents: 2fa69cd
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Feb 13 22:16:30 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Feb 13 22:16:30 2016 -0800
----------------------------------------------------------------------
.../matrix/sort/PickFromCompactInputFormat.java | 53 +++++++-------------
.../runtime/matrix/sort/ReadWithZeros.java | 13 +++--
2 files changed, 25 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/240143bb/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
index d03afc8..7dd3c13 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
@@ -222,12 +222,10 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
private Path path;
private FSDataInputStream currentStream;
- private int startPos=0;
- private int numToRead=0;
+ protected long totLength;
private DoubleWritable readKey=new DoubleWritable();
- private Writable readValue = new IntWritable(0);
+ private IntWritable readValue = new IntWritable(0);
private boolean noRecordsNeeded=false;
- private int rawKeyValuesRead=0;
private int index=0;
private int beginPart=-1, endPart=-1, currPart=-1;
private double sumwt = 0.0, readWt; // total weight (set in JobConf)
@@ -236,7 +234,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
private boolean isFirstRecord = true;
private ReadWithZeros reader=null;
- @SuppressWarnings("unchecked")
public RangePickRecordReader(JobConf job, FileSplit split)
throws IOException
{
@@ -244,6 +241,7 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
// check if the current part file needs to be processed
path = split.getPath();
+ totLength = split.getLength();
currentStream = FileSystem.get(job).open(path);
currPart = getIndexInTheArray(path.getName());
@@ -252,13 +250,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
return;
}
- Class<? extends Writable> valueClass=(Class<? extends Writable>) job.getClass(VALUE_CLASS, Writable.class);
- try {
- readValue=valueClass.newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
int part0=job.getInt(PARTITION_OF_ZERO, -1);
boolean contain0s=false;
long numZeros =0;
@@ -317,20 +308,20 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
double tmpWt = 0;
if ( currPart == beginPart || currPart == endPart ) {
- reader.readNextKeyValuePairs(readKey, (IntWritable)readValue);
- tmpWt = ((IntWritable)readValue).get();
+ boolean ret = reader.readNextKeyValuePairs(readKey, readValue);
+ tmpWt = readValue.get();
while(readWt+tmpWt < qLowerD) {
readWt += tmpWt;
- reader.readNextKeyValuePairs(readKey, (IntWritable)readValue);
- tmpWt = ((IntWritable)readValue).get();
+ ret &= reader.readNextKeyValuePairs(readKey, readValue);
+ tmpWt = readValue.get();
}
if((readWt<qLowerD && readWt+tmpWt >= qLowerD) || (readWt+tmpWt <= qUpperD) || (readWt<qUpperD && readWt+tmpWt>=qUpperD)) {
key.setIndexes(++index,1);
value.setValue(readKey.get()*tmpWt);
readWt += tmpWt;
- return true;
+ return ret;
}
else {
return false;
@@ -338,20 +329,19 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
}
else {
// read full part
- reader.readNextKeyValuePairs(readKey, (IntWritable)readValue);
- tmpWt = ((IntWritable)readValue).get();
+ boolean ret = reader.readNextKeyValuePairs(readKey, readValue);
+ tmpWt = readValue.get();
key.setIndexes(++index,1);
value.setValue(readKey.get()*tmpWt);
readWt += tmpWt;
- return true;
+ return ret;
}
}
@Override
public void close() throws IOException {
- //DO Nothing
currentStream.close();
}
@@ -373,10 +363,8 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
@Override
public float getProgress() throws IOException {
- if(numToRead>0)
- return (float)(rawKeyValuesRead-startPos)/(float)numToRead;
- else
- return 100.0f;
+ float progress = (float) getPos() / totLength;
+ return (progress>=0 && progress<=1) ? progress : 1.0f;
}
}
@@ -394,8 +382,8 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
private int[] pos=null; //starting from 0
private int[] indexes=null;
- private DoubleWritable readKey=new DoubleWritable();
- private Writable readValue;
+ private DoubleWritable readKey = new DoubleWritable();
+ private IntWritable readValue = new IntWritable();
private int numRead=0;
private boolean noRecordsNeeded=false;
ReadWithZeros reader=null;
@@ -406,7 +394,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
return Integer.parseInt(name.substring(i+5));
}
- @SuppressWarnings("unchecked")
public PickRecordReader(JobConf job, FileSplit split)
throws IOException
{
@@ -429,12 +416,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
indexes[i]=Integer.parseInt(temp[1]);
}
- Class<? extends Writable> valueClass=(Class<? extends Writable>) job.getClass(VALUE_CLASS, Writable.class);
- try {
- readValue=valueClass.newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
valueIsWeight=job.getBoolean(VALUE_IS_WEIGHT, true);
int part0=job.getInt(PARTITION_OF_ZERO, -1);
@@ -456,9 +437,9 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
while(numRead<=pos[posIndex])
{
- reader.readNextKeyValuePairs(readKey, (IntWritable)readValue);
+ reader.readNextKeyValuePairs(readKey, readValue);
if(valueIsWeight)
- numRead+=((IntWritable)readValue).get();
+ numRead+=readValue.get();
else
numRead++;
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/240143bb/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
index dc578a8..8793be9 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
@@ -28,9 +28,7 @@ import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
public class ReadWithZeros
-{
-
-
+{
private boolean contain0s=false;
private long numZeros=0;
private FSDataInputStream currentStream;
@@ -46,8 +44,11 @@ public class ReadWithZeros
numZeros=num0;
}
- public void readNextKeyValuePairs(DoubleWritable readKey, IntWritable readValue)throws IOException
+ public boolean readNextKeyValuePairs(DoubleWritable readKey, IntWritable readValue)
+ throws IOException
{
+ boolean ret = true;
+
try {
if(contain0s && justFound0)
{
@@ -68,7 +69,7 @@ public class ReadWithZeros
readValue.set((int)numZeros);
}
else {
- throw e;
+ ret = false;
}
}
@@ -80,5 +81,7 @@ public class ReadWithZeros
readKey.set(0);
readValue.set((int)numZeros);
}
+
+ return ret;
}
}