You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/20 20:36:55 UTC
incubator-systemml git commit: [SYSTEMML-927] Fix schema handling
spark frame append/right indexing
Repository: incubator-systemml
Updated Branches:
refs/heads/master 1729d13ae -> 6550c04b9
[SYSTEMML-927] Fix schema handling spark frame append/right indexing
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/6550c04b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/6550c04b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/6550c04b
Branch: refs/heads/master
Commit: 6550c04b9cf9b446a56bd4846a205a4745b20ec6
Parents: 1729d13
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Tue Sep 20 13:36:02 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Sep 20 13:36:02 2016 -0700
----------------------------------------------------------------------
.../sysml/api/mlcontext/MatrixFormat.java | 18 +++----------
.../controlprogram/caching/FrameObject.java | 27 ++++++++++++++++++++
.../spark/FrameAppendMSPInstruction.java | 5 ++++
.../spark/FrameAppendRSPInstruction.java | 5 ++++
.../spark/FrameIndexingSPInstruction.java | 4 +++
5 files changed, 45 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java b/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
index 4f4c7f9..0c07dd2 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MatrixFormat.java
@@ -63,13 +63,8 @@ public enum MatrixFormat {
* otherwise.
*/
public boolean isVectorBased() {
- if (this == DF_VECTOR_WITH_INDEX) {
- return true;
- } else if (this == DF_VECTOR) {
- return true;
- } else {
- return false;
- }
+ return (this == DF_VECTOR_WITH_INDEX
+ || this == DF_VECTOR);
}
/**
@@ -79,13 +74,8 @@ public enum MatrixFormat {
* otherwise.
*/
public boolean hasIDColumn() {
- if (this == DF_DOUBLES_WITH_INDEX) {
- return true;
- } else if (this == DF_VECTOR_WITH_INDEX) {
- return true;
- } else {
- return false;
- }
+ return (this == DF_DOUBLES_WITH_INDEX
+ || this == DF_VECTOR_WITH_INDEX);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
index e3d2332..bfccdf1 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
@@ -91,6 +91,33 @@ public class FrameObject extends CacheableData<FrameBlock>
return _schema;
}
+ /**
+ *
+ * @param cl column lower bound, inclusive
+ * @param cu column upper bound, inclusive
+ * @return
+ */
+ public List<ValueType> getSchema(int cl, int cu) {
+ return (_schema!=null && _schema.size()>cu) ? _schema.subList(cl, cu+1) :
+ Collections.nCopies(cu-cl+1, ValueType.STRING);
+ }
+
+ /**
+ * Creates a new collection which contains the schema of the current
+ * frame object concatenated with the schema of the passed frame object.
+ *
+ * @param fo
+ * @return
+ */
+ public List<ValueType> mergeSchemas(FrameObject fo) {
+ ArrayList<ValueType> ret = new ArrayList<ValueType>();
+ ret.addAll((_schema!=null) ? _schema :
+ Collections.nCopies((int)getNumColumns(), ValueType.STRING));
+ ret.addAll((fo.getSchema()!=null) ? fo.getSchema() :
+ Collections.nCopies((int)fo.getNumColumns(), ValueType.STRING));
+ return ret;
+ }
+
public void setSchema(String schema) {
if( schema.equals("*") ) {
//populate default schema
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
index 7aad0bf..236dfb0 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
@@ -68,6 +68,11 @@ public class FrameAppendMSPInstruction extends AppendMSPInstruction
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
sec.addLineageBroadcast(output.getName(), input2.getName());
+
+ //update schema of output with merged input schemas
+ sec.getFrameObject(output.getName()).setSchema(
+ sec.getFrameObject(input1.getName()).mergeSchemas(
+ sec.getFrameObject(input2.getName())));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
index 067769d..ad6ef58 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -68,6 +68,11 @@ public class FrameAppendRSPInstruction extends AppendRSPInstruction
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
sec.addLineageRDD(output.getName(), input2.getName());
+
+ //update schema of output with merged input schemas
+ sec.getFrameObject(output.getName()).setSchema(
+ sec.getFrameObject(input1.getName()).mergeSchemas(
+ sec.getFrameObject(input2.getName())));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6550c04b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
index ac45ec0..b4556cf 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
@@ -117,6 +117,10 @@ public class FrameIndexingSPInstruction extends IndexingSPInstruction
//put output RDD handle into symbol table
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
+
+ //update schema of output with subset of input schema
+ sec.getFrameObject(output.getName()).setSchema(
+ sec.getFrameObject(input1.getName()).getSchema((int)cl, (int)cu));
}
//left indexing
else if ( opcode.equalsIgnoreCase("leftIndex") || opcode.equalsIgnoreCase("mapLeftIndex"))