You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/02/14 23:15:57 UTC

[2/3] incubator-systemml git commit: Fix mr iqm/quantile record readers (bounded partition read, progress)

Fix mr iqm/quantile record readers (bounded partition read, progress) 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/240143bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/240143bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/240143bb

Branch: refs/heads/master
Commit: 240143bbad57489872b86114cee4ac206fa9c2a4
Parents: 2fa69cd
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Feb 13 22:16:30 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Feb 13 22:16:30 2016 -0800

----------------------------------------------------------------------
 .../matrix/sort/PickFromCompactInputFormat.java | 53 +++++++-------------
 .../runtime/matrix/sort/ReadWithZeros.java      | 13 +++--
 2 files changed, 25 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/240143bb/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
index d03afc8..7dd3c13 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
@@ -222,12 +222,10 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
 		private Path path;
 	    
 		private FSDataInputStream currentStream;
-		private int startPos=0;
-		private int numToRead=0;
+		protected long totLength;
 		private DoubleWritable readKey=new DoubleWritable();
-		private Writable readValue = new IntWritable(0);
+		private IntWritable readValue = new IntWritable(0);
 		private boolean noRecordsNeeded=false;
-		private int rawKeyValuesRead=0;
 		private int index=0;
 		private int beginPart=-1, endPart=-1, currPart=-1;
 		private double sumwt = 0.0, readWt;  // total weight (set in JobConf)
@@ -236,7 +234,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
 		private boolean isFirstRecord = true;
 		private ReadWithZeros reader=null;
 		
-		@SuppressWarnings("unchecked")
 		public RangePickRecordReader(JobConf job, FileSplit split) 
 			throws IOException 
 		{
@@ -244,6 +241,7 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
 			
 			// check if the current part file needs to be processed
 	    	path = split.getPath();
+	    	totLength = split.getLength();
 	    	currentStream = FileSystem.get(job).open(path);
 	    	currPart = getIndexInTheArray(path.getName());
 	    	
@@ -252,13 +250,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
 	    		return;
 	    	}
 	    	
-	    	Class<? extends Writable> valueClass=(Class<? extends Writable>) job.getClass(VALUE_CLASS, Writable.class);
-	    	try {
-				readValue=valueClass.newInstance();
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-			
 			int part0=job.getInt(PARTITION_OF_ZERO, -1);
 			boolean contain0s=false;
 			long numZeros =0;
@@ -317,20 +308,20 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
 			double tmpWt = 0;
 			
 			if ( currPart == beginPart || currPart == endPart ) {
-				reader.readNextKeyValuePairs(readKey, (IntWritable)readValue);
-				tmpWt = ((IntWritable)readValue).get();
+				boolean ret = reader.readNextKeyValuePairs(readKey, readValue);
+				tmpWt = readValue.get();
 
 				while(readWt+tmpWt < qLowerD) {
 					readWt += tmpWt;
-					reader.readNextKeyValuePairs(readKey, (IntWritable)readValue);
-					tmpWt = ((IntWritable)readValue).get();
+					ret &= reader.readNextKeyValuePairs(readKey, readValue);
+					tmpWt = readValue.get();
 				}
 				
 				if((readWt<qLowerD && readWt+tmpWt >= qLowerD) || (readWt+tmpWt <= qUpperD) || (readWt<qUpperD && readWt+tmpWt>=qUpperD)) {
 					key.setIndexes(++index,1);
 					value.setValue(readKey.get()*tmpWt);
 					readWt += tmpWt;
-					return true;
+					return ret;
 				}
 				else {
 					return false;
@@ -338,20 +329,19 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
 			}
 			else { 
 				// read full part
-				reader.readNextKeyValuePairs(readKey, (IntWritable)readValue);
-				tmpWt = ((IntWritable)readValue).get();
+				boolean ret = reader.readNextKeyValuePairs(readKey, readValue);
+				tmpWt = readValue.get();
 				
 				key.setIndexes(++index,1);
 				value.setValue(readKey.get()*tmpWt);
 				
 				readWt += tmpWt;
-				return true;
+				return ret;
 			}
 		}
 		
 		@Override
 		public void close() throws IOException {
-			//DO Nothing
 			currentStream.close();
 		}
 
@@ -373,10 +363,8 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
 
 		@Override
 		public float getProgress() throws IOException {
-			if(numToRead>0)
-				return (float)(rawKeyValuesRead-startPos)/(float)numToRead;
-			else
-				return 100.0f;
+			float progress = (float) getPos() / totLength;
+			return (progress>=0 && progress<=1) ? progress : 1.0f;
 		}
 	}
 	
@@ -394,8 +382,8 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
 		
 		private int[] pos=null; //starting from 0
 		private int[] indexes=null;
-		private DoubleWritable readKey=new DoubleWritable();
-		private Writable readValue;
+		private DoubleWritable readKey = new DoubleWritable();
+		private IntWritable readValue = new IntWritable();
 		private int numRead=0;
 		private boolean noRecordsNeeded=false;
 		ReadWithZeros reader=null;
@@ -406,7 +394,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
 			return Integer.parseInt(name.substring(i+5));
 		}
 		
-		@SuppressWarnings("unchecked")
 		public PickRecordReader(JobConf job, FileSplit split)
 			throws IOException
 		{
@@ -429,12 +416,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
 	    		indexes[i]=Integer.parseInt(temp[1]);
 	    	}
 	    	
-	    	Class<? extends Writable> valueClass=(Class<? extends Writable>) job.getClass(VALUE_CLASS, Writable.class);
-	    	try {
-	    		readValue=valueClass.newInstance();
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
 			valueIsWeight=job.getBoolean(VALUE_IS_WEIGHT, true);
 			
 			int part0=job.getInt(PARTITION_OF_ZERO, -1);
@@ -456,9 +437,9 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M
 		
 			while(numRead<=pos[posIndex])
 			{
-				reader.readNextKeyValuePairs(readKey, (IntWritable)readValue);
+				reader.readNextKeyValuePairs(readKey, readValue);
 				if(valueIsWeight)
-					numRead+=((IntWritable)readValue).get();
+					numRead+=readValue.get();
 				else
 					numRead++;
 			}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/240143bb/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
index dc578a8..8793be9 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
@@ -28,9 +28,7 @@ import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 
 public class ReadWithZeros 
-{
-
-	
+{	
 	private boolean contain0s=false;
 	private long numZeros=0;
 	private FSDataInputStream currentStream;
@@ -46,8 +44,11 @@ public class ReadWithZeros
 		numZeros=num0;
 	}
 	
-	public void readNextKeyValuePairs(DoubleWritable readKey, IntWritable readValue)throws IOException 
+	public boolean readNextKeyValuePairs(DoubleWritable readKey, IntWritable readValue)
+		throws IOException 
 	{
+		boolean ret = true;
+		
 		try {
 			if(contain0s && justFound0)
 			{
@@ -68,7 +69,7 @@ public class ReadWithZeros
 				readValue.set((int)numZeros);
 			}
 			else {
-				throw e;
+				ret = false;
 			}
 		}
 		
@@ -80,5 +81,7 @@ public class ReadWithZeros
 			readKey.set(0);
 			readValue.set((int)numZeros);
 		}
+		
+		return ret;
 	}
 }