You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/20 20:36:55 UTC

incubator-systemml git commit: [SYSTEMML-927] Fix schema handling spark frame append/right indexing

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 1729d13ae -> 6550c04b9


[SYSTEMML-927] Fix schema handling spark frame append/right indexing

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/6550c04b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/6550c04b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/6550c04b

Branch: refs/heads/master
Commit: 6550c04b9cf9b446a56bd4846a205a4745b20ec6
Parents: 1729d13
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Tue Sep 20 13:36:02 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Sep 20 13:36:02 2016 -0700

----------------------------------------------------------------------
 .../sysml/api/mlcontext/MatrixFormat.java       | 18 +++----------
 .../controlprogram/caching/FrameObject.java     | 27 ++++++++++++++++++++
 .../spark/FrameAppendMSPInstruction.java        |  5 ++++
 .../spark/FrameAppendRSPInstruction.java        |  5 ++++
 .../spark/FrameIndexingSPInstruction.java       |  4 +++
 5 files changed, 45 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java b/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
index 4f4c7f9..0c07dd2 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
@@ -63,13 +63,8 @@ public enum MatrixFormat {
 	 *         otherwise.
 	 */
 	public boolean isVectorBased() {
-		if (this == DF_VECTOR_WITH_INDEX) {
-			return true;
-		} else if (this == DF_VECTOR) {
-			return true;
-		} else {
-			return false;
-		}
+		return (this == DF_VECTOR_WITH_INDEX
+			|| this == DF_VECTOR);
 	}
 
 	/**
@@ -79,13 +74,8 @@ public enum MatrixFormat {
 	 *         otherwise.
 	 */
 	public boolean hasIDColumn() {
-		if (this == DF_DOUBLES_WITH_INDEX) {
-			return true;
-		} else if (this == DF_VECTOR_WITH_INDEX) {
-			return true;
-		} else {
-			return false;
-		}
+		return (this == DF_DOUBLES_WITH_INDEX 
+			|| this == DF_VECTOR_WITH_INDEX);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
index e3d2332..bfccdf1 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
@@ -91,6 +91,33 @@ public class FrameObject extends CacheableData<FrameBlock>
 		return _schema;
 	}
 
+	/**
+	 * 
+	 * @param cl column lower bound, inclusive
+	 * @param cu column upper bound, inclusive
+	 * @return
+	 */
+	public List<ValueType> getSchema(int cl, int cu) {
+		return (_schema!=null && _schema.size()>cu) ? _schema.subList(cl, cu+1) :
+			Collections.nCopies(cu-cl+1, ValueType.STRING);
+	}
+
+	/**
+	 * Creates a new collection which contains the schema of the current
+	 * frame object concatenated with the schema of the passed frame object.
+	 * 
+	 * @param fo
+	 * @return
+	 */
+	public List<ValueType> mergeSchemas(FrameObject fo) {
+		ArrayList<ValueType> ret = new ArrayList<ValueType>();
+		ret.addAll((_schema!=null) ? _schema : 
+			Collections.nCopies((int)getNumColumns(), ValueType.STRING));
+		ret.addAll((fo.getSchema()!=null) ? fo.getSchema() : 
+			Collections.nCopies((int)fo.getNumColumns(), ValueType.STRING));
+		return ret;
+	} 
+	
 	public void setSchema(String schema) {
 		if( schema.equals("*") ) {
 			//populate default schema

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
index 7aad0bf..236dfb0 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
@@ -68,6 +68,11 @@ public class FrameAppendMSPInstruction extends AppendMSPInstruction
 		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
 		sec.addLineageBroadcast(output.getName(), input2.getName());
+		
+		//update schema of output with merged input schemas
+		sec.getFrameObject(output.getName()).setSchema(
+			sec.getFrameObject(input1.getName()).mergeSchemas(
+			sec.getFrameObject(input2.getName())));
 	}
 	
 	/** 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
index 067769d..ad6ef58 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -68,6 +68,11 @@ public class FrameAppendRSPInstruction extends AppendRSPInstruction
 		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
 		sec.addLineageRDD(output.getName(), input2.getName());		
+		
+		//update schema of output with merged input schemas
+		sec.getFrameObject(output.getName()).setSchema(
+			sec.getFrameObject(input1.getName()).mergeSchemas(
+			sec.getFrameObject(input2.getName())));
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
index ac45ec0..b4556cf 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
@@ -117,6 +117,10 @@ public class FrameIndexingSPInstruction  extends IndexingSPInstruction
 			//put output RDD handle into symbol table
 			sec.setRDDHandleForVariable(output.getName(), out);
 			sec.addLineageRDD(output.getName(), input1.getName());
+			
+			//update schema of output with subset of input schema
+			sec.getFrameObject(output.getName()).setSchema(
+				sec.getFrameObject(input1.getName()).getSchema((int)cl, (int)cu));
 		}
 		//left indexing
 		else if ( opcode.equalsIgnoreCase("leftIndex") || opcode.equalsIgnoreCase("mapLeftIndex"))