You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2015/12/09 21:43:45 UTC
incubator-systemml git commit: Adding ID support for converter utils
vectorDataFrameToBinaryBlock.
Repository: incubator-systemml
Updated Branches:
refs/heads/master abd19df94 -> 35319ba70
Adding ID support for converter utils vectorDataFrameToBinaryBlock.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/35319ba7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/35319ba7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/35319ba7
Branch: refs/heads/master
Commit: 35319ba708e7ef3b03d7c916f51f975d53161663
Parents: abd19df
Author: Niketan Pansare <np...@us.ibm.com>
Authored: Wed Dec 9 12:43:44 2015 -0800
Committer: Niketan Pansare <np...@us.ibm.com>
Committed: Wed Dec 9 12:43:44 2015 -0800
----------------------------------------------------------------------
.../spark/utils/RDDConverterUtilsExt.java | 63 +++++++++++---------
1 file changed, 36 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/35319ba7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index e64fb89..3a227a9 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -147,9 +147,7 @@ public class RDDConverterUtilsExt
throws DMLRuntimeException {
if(containsID) {
- // Uncomment this when we move to Spark 1.4.0 or higher
- // df = df.sort("ID").drop("ID");
- throw new DMLRuntimeException("containsID is not supported yet");
+ inputDF = dropColumn(inputDF.sort("ID"), "ID");
}
DataFrame df = inputDF.select(vectorColumnName);
@@ -178,34 +176,45 @@ public class RDDConverterUtilsExt
return out;
}
+ /**
+ * Adding utility to support for dropping columns for older Spark versions.
+ * @param df
+ * @param column
+ * @return
+ * @throws DMLRuntimeException
+ */
+ public static DataFrame dropColumn(DataFrame df, String column) throws DMLRuntimeException {
+ ArrayList<String> columnToSelect = new ArrayList<String>();
+ String firstCol = null;
+ boolean colPresent = false;
+ for(String col : df.columns()) {
+ if(col.compareTo(column) == 0) {
+ colPresent = true;
+ }
+ else if(firstCol == null) {
+ firstCol = col;
+ }
+ else {
+ columnToSelect.add(col);
+ }
+ }
+
+ if(!colPresent) {
+ throw new DMLRuntimeException("The column \"" + column + "\" is not present in the dataframe.");
+ }
+ else if(firstCol == null) {
+ throw new DMLRuntimeException("No column other than \"" + column + "\" present in the dataframe.");
+ }
+
+ // Round about way to do in Java (not exposed in Spark 1.3.0): df = df.sort("ID").drop("ID");
+ return df.select(firstCol, scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
+ }
+
public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToBinaryBlock(JavaSparkContext sc,
DataFrame df, MatrixCharacteristics mcOut, boolean containsID)
throws DMLRuntimeException {
if(containsID) {
- ArrayList<String> columnToSelect = new ArrayList<String>();
- String firstCol = null;
- boolean colIDPresent = false;
- for(String col : df.columns()) {
- if(col.compareTo("ID") == 0) {
- colIDPresent = true;
- }
- else if(firstCol == null) {
- firstCol = col;
- }
- else {
- columnToSelect.add(col);
- }
- }
-
- if(!colIDPresent) {
- throw new DMLRuntimeException("The column \"ID\" is not present in the dataframe.");
- }
- else if(firstCol == null) {
- throw new DMLRuntimeException("No column other than \"ID\" present in the dataframe.");
- }
-
- // Round about way to do in Java (not exposed in Spark 1.3.0): df = df.sort("ID").drop("ID");
- df = df.sort("ID").select(firstCol, scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
+ df = dropColumn(df.sort("ID"), "ID");
}
//determine unknown dimensions and sparsity if required