You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/17 07:26:34 UTC
[3/5] incubator-systemml git commit: [SYSTEMML-927] Fix frame schema
handling in spark cast/write instruction
[SYSTEMML-927] Fix frame schema handling in spark cast/write instruction
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/81d2b641
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/81d2b641
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/81d2b641
Branch: refs/heads/master
Commit: 81d2b641d99743ab54528a214659a5166e65aabe
Parents: 69a7858
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Sep 17 05:39:56 2016 +0200
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Sep 17 00:25:22 2016 -0700
----------------------------------------------------------------------
.../sysml/runtime/instructions/spark/CastSPInstruction.java | 9 +++++++++
.../runtime/instructions/spark/WriteSPInstruction.java | 9 ++++++---
2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/81d2b641/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
index d869f11..4487b20 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
@@ -19,9 +19,12 @@
package org.apache.sysml.runtime.instructions.spark;
+import java.util.Collections;
+
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.lops.UnaryCP;
+import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
@@ -88,5 +91,11 @@ public class CastSPInstruction extends UnarySPInstruction
sec.setRDDHandleForVariable(output.getName(), out);
updateUnaryOutputMatrixCharacteristics(sec, input1.getName(), output.getName());
sec.addLineageRDD(output.getName(), input1.getName());
+
+ //update schema information for output frame
+ if( opcode.equals(UnaryCP.CAST_AS_FRAME_OPCODE) ) {
+ sec.getFrameObject(output.getName()).setSchema(
+ Collections.nCopies((int)mcIn.getCols(), ValueType.DOUBLE));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/81d2b641/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index e4e2606..1b974f9 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import org.apache.hadoop.io.LongWritable;
@@ -136,6 +137,8 @@ public class WriteSPInstruction extends SPInstruction
//get filename (literal or variable expression)
String fname = ec.getScalarInput(input2.getName(), ValueType.STRING, input2.isLiteral()).getStringValue();
+ List<ValueType> schema = (input1.getDataType()==DataType.FRAME) ?
+ sec.getFrameObject(input1.getName()).getSchema() : null;
try
{
@@ -150,7 +153,7 @@ public class WriteSPInstruction extends SPInstruction
if( input1.getDataType()==DataType.MATRIX )
processMatrixWriteInstruction(sec, fname, oi);
else
- processFrameWriteInstruction(sec, fname, oi);
+ processFrameWriteInstruction(sec, fname, oi, schema);
}
catch(IOException ex)
{
@@ -279,7 +282,7 @@ public class WriteSPInstruction extends SPInstruction
* @throws IOException
*/
@SuppressWarnings("unchecked")
- protected void processFrameWriteInstruction(SparkExecutionContext sec, String fname, OutputInfo oi)
+ protected void processFrameWriteInstruction(SparkExecutionContext sec, String fname, OutputInfo oi, List<ValueType> schema)
throws DMLRuntimeException, IOException
{
//get input rdd
@@ -310,7 +313,7 @@ public class WriteSPInstruction extends SPInstruction
}
// write meta data file
- MapReduceTool.writeMetaDataFile(fname + ".mtd", input1.getValueType(), null, DataType.FRAME, mc, oi, formatProperties);
+ MapReduceTool.writeMetaDataFile(fname + ".mtd", input1.getValueType(), schema, DataType.FRAME, mc, oi, formatProperties);
}
/**