You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/17 07:26:33 UTC
[2/5] incubator-systemml git commit: [SYSTEMML-925] Performance
binary-to-csv frame conversion (w/o sort)
[SYSTEMML-925] Performance binary-to-csv frame conversion (w/o sort)
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/69a78581
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/69a78581
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/69a78581
Branch: refs/heads/master
Commit: 69a78581e8cc4bcc6dec5ecc88d9dad6aec96297
Parents: 0e8f1e1
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Sep 17 01:57:32 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Sep 17 00:25:19 2016 -0700
----------------------------------------------------------------------
.../spark/utils/FrameRDDConverterUtils.java | 57 ++++++++++++++++++--
1 file changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/69a78581/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index faf8ba1..b541242 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -156,7 +156,7 @@ public class FrameRDDConverterUtils
JavaPairRDD<Long,FrameBlock> input = in;
//sort if required (on blocks/rows)
- if( strict ) {
+ if( strict && !isSorted(input) ) {
input = input.sortByKey(true);
}
@@ -454,7 +454,7 @@ public class FrameRDDConverterUtils
JavaRDD<String> dataRdd = sc.textFile(fnameIn);
return dataRdd.map(new RowGenerator(schema, delim));
}
-
+
/*
* Row Generator class based on individual line in CSV file.
*/
@@ -480,8 +480,50 @@ public class FrameRDDConverterUtils
return RowFactory.create(objects);
}
}
-
+ /**
+ * Check if the rdd is already sorted in order to avoid unnecessary
+ * sampling, shuffle, and sort per partition.
+ *
+ * @param in
+ * @return
+ */
+ private static boolean isSorted(JavaPairRDD<Long, FrameBlock> in) {
+ //check sorted partitions (returns max key if true; -1 otherwise)
+ List<Long> keys = in.keys().mapPartitions(
+ new SortingAnalysisFunction()).collect();
+ long max = 0;
+ for( Long val : keys ) {
+ if( val < max )
+ return false;
+ max = val;
+ }
+ return true;
+ }
+
+ /**
+ *
+ */
+ private static class SortingAnalysisFunction implements FlatMapFunction<Iterator<Long>,Long>
+ {
+ private static final long serialVersionUID = -5789003262381127469L;
+
+ @Override
+ public Iterable<Long> call(Iterator<Long> arg0) throws Exception
+ {
+ long max = 0;
+ while( max >= 0 && arg0.hasNext() ) {
+ long val = arg0.next();
+ max = (val < max) ? -1 : val;
+ }
+
+ ArrayList<Long> ret = new ArrayList<Long>();
+ ret.add(max);
+ return ret;
+ }
+ }
+
+
/////////////////////////////////
// CSV-SPECIFIC FUNCTIONS
@@ -1087,7 +1129,14 @@ public class FrameRDDConverterUtils
//////////////////////////////////////
// Common functions
- // Flushes current state of filled column blocks to output list.
+ /**
+ * Flushes current state of filled column blocks to output list.
+ *
+ * @param ix
+ * @param fb
+ * @param ret
+ * @throws DMLRuntimeException
+ */
private static void flushBlocksToList( Long[] ix, FrameBlock[] fb, ArrayList<Tuple2<Long,FrameBlock>> ret )
throws DMLRuntimeException
{