You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/17 07:26:33 UTC

[2/5] incubator-systemml git commit: [SYSTEMML-925] Performance binary-to-csv frame conversion (w/o sort)

[SYSTEMML-925] Performance binary-to-csv frame conversion (w/o sort) 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/69a78581
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/69a78581
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/69a78581

Branch: refs/heads/master
Commit: 69a78581e8cc4bcc6dec5ecc88d9dad6aec96297
Parents: 0e8f1e1
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Sep 17 01:57:32 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Sep 17 00:25:19 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/FrameRDDConverterUtils.java     | 57 ++++++++++++++++++--
 1 file changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/69a78581/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index faf8ba1..b541242 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -156,7 +156,7 @@ public class FrameRDDConverterUtils
 		JavaPairRDD<Long,FrameBlock> input = in;
 		
 		//sort if required (on blocks/rows)
-		if( strict ) {
+		if( strict && !isSorted(input) ) {
 			input = input.sortByKey(true);
 		}
 		
@@ -454,7 +454,7 @@ public class FrameRDDConverterUtils
 		JavaRDD<String> dataRdd = sc.textFile(fnameIn);
 		return dataRdd.map(new RowGenerator(schema, delim));
 	}
-	
+
 	/* 
 	 * Row Generator class based on individual line in CSV file.
 	 */
@@ -480,8 +480,50 @@ public class FrameRDDConverterUtils
 		      return RowFactory.create(objects);
 		}
 	}
-	
 
+	/**
+	 * Check if the rdd is already sorted in order to avoid unnecessary
+	 * sampling, shuffle, and sort per partition.
+	 * 
+	 * @param in
+	 * @return
+	 */
+	private static boolean isSorted(JavaPairRDD<Long, FrameBlock> in) {		
+		//check sorted partitions (returns max key if true; -1 otherwise)
+		List<Long> keys = in.keys().mapPartitions(
+				new SortingAnalysisFunction()).collect();
+		long max = 0;
+		for( Long val : keys ) {
+			if( val < max )
+				return false;
+			max = val;
+		}
+		return true;
+	}
+
+	/**
+	 * 
+	 */
+	private static class SortingAnalysisFunction implements FlatMapFunction<Iterator<Long>,Long> 
+	{
+		private static final long serialVersionUID = -5789003262381127469L;
+
+		@Override
+		public Iterable<Long> call(Iterator<Long> arg0) throws Exception 
+		{
+			long max = 0;
+			while( max >= 0 && arg0.hasNext() ) {
+				long val = arg0.next();
+				max = (val < max) ? -1 : val;
+			}			
+			
+			ArrayList<Long> ret = new ArrayList<Long>();	
+			ret.add(max);
+			return ret;
+		}
+	}
+	
+	
 	/////////////////////////////////
 	// CSV-SPECIFIC FUNCTIONS
 	
@@ -1087,7 +1129,14 @@ public class FrameRDDConverterUtils
 	//////////////////////////////////////
 	// Common functions
 	
-	// Flushes current state of filled column blocks to output list.
+	/**
+	 * Flushes current state of filled column blocks to output list.
+	 * 
+	 * @param ix
+	 * @param fb
+	 * @param ret
+	 * @throws DMLRuntimeException
+	 */
 	private static void flushBlocksToList( Long[] ix, FrameBlock[] fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) 
 		throws DMLRuntimeException
 	{