You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2015/12/09 21:43:45 UTC

incubator-systemml git commit: Adding ID support for converter utils vectorDataFrameToBinaryBlock.

Repository: incubator-systemml
Updated Branches:
  refs/heads/master abd19df94 -> 35319ba70


Adding ID support for converter utils vectorDataFrameToBinaryBlock.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/35319ba7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/35319ba7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/35319ba7

Branch: refs/heads/master
Commit: 35319ba708e7ef3b03d7c916f51f975d53161663
Parents: abd19df
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Wed Dec 9 12:43:44 2015 -0800
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Wed Dec 9 12:43:44 2015 -0800

----------------------------------------------------------------------
 .../spark/utils/RDDConverterUtilsExt.java       | 63 +++++++++++---------
 1 file changed, 36 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/35319ba7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index e64fb89..3a227a9 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -147,9 +147,7 @@ public class RDDConverterUtilsExt
 			throws DMLRuntimeException {
 		
 		if(containsID) {
-			// Uncomment this when we move to Spark 1.4.0 or higher 
-			// df = df.sort("ID").drop("ID");
-			throw new DMLRuntimeException("containsID is not supported yet");
+			inputDF = dropColumn(inputDF.sort("ID"), "ID");
 		}
 		
 		DataFrame df = inputDF.select(vectorColumnName);
@@ -178,34 +176,45 @@ public class RDDConverterUtilsExt
 		return out;
 	}
 	
+	/**
+	 * Adding utility to support for dropping columns for older Spark versions.
+	 * @param df
+	 * @param column
+	 * @return
+	 * @throws DMLRuntimeException
+	 */
+	public static DataFrame dropColumn(DataFrame df, String column) throws DMLRuntimeException {
+		ArrayList<String> columnToSelect = new ArrayList<String>();
+		String firstCol = null;
+		boolean colPresent = false;
+		for(String col : df.columns()) {
+			if(col.compareTo(column) == 0) {
+				colPresent = true;
+			}
+			else if(firstCol == null) {
+				firstCol = col;
+			}
+			else {
+				columnToSelect.add(col);
+			}
+		}
+		
+		if(!colPresent) {
+			throw new DMLRuntimeException("The column \"" + column + "\" is not present in the dataframe.");
+		}
+		else if(firstCol == null) {
+			throw new DMLRuntimeException("No column other than \"" + column + "\" present in the dataframe.");
+		}
+		
+		// Round about way to do in Java (not exposed in Spark 1.3.0): df = df.sort("ID").drop("ID");
+		return df.select(firstCol, scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
+	}
+	
 	public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(JavaSparkContext sc,
 			DataFrame df, MatrixCharacteristics mcOut, boolean containsID) 
 			throws DMLRuntimeException {
 		if(containsID) {
-			ArrayList<String> columnToSelect = new ArrayList<String>();
-			String firstCol = null;
-			boolean colIDPresent = false;
-			for(String col : df.columns()) {
-				if(col.compareTo("ID") == 0) {
-					colIDPresent = true;
-				}
-				else if(firstCol == null) {
-					firstCol = col;
-				}
-				else {
-					columnToSelect.add(col);
-				}
-			}
-			
-			if(!colIDPresent) {
-				throw new DMLRuntimeException("The column \"ID\" is not present in the dataframe.");
-			}
-			else if(firstCol == null) {
-				throw new DMLRuntimeException("No column other than \"ID\" present in the dataframe.");
-			}
-			
-			// Round about way to do in Java (not exposed in Spark 1.3.0): df = df.sort("ID").drop("ID");
-			df = df.sort("ID").select(firstCol, scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
+			df = dropColumn(df.sort("ID"), "ID");
 		}
 			
 		//determine unknown dimensions and sparsity if required