You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/03/13 09:32:52 UTC

[2/2] incubator-systemml git commit: [SYSTEMML-555] New transformapply builtin function over cp frames

[SYSTEMML-555] New transformapply builtin function over cp frames

This patch introduces a new transformapply builtin function over
in-memory frames consuming both input and metadata as frames. In order
to allow for a smooth transition from the existing transform, it exists
redundantly to the transform apply functionality. Once we have full
bufferpool integration and distributed operations for transformapply, we
will remove the old apply functionality from transform. Note that right
now transformapply is limited to recoding. Finally, this change also
includes a fix for frames (maintaining the number of rows) as well as a
transformapply jmlc test.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e69a1c26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e69a1c26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e69a1c26

Branch: refs/heads/master
Commit: e69a1c2613892fea614435c26e75b38d8b0d67f4
Parents: 94dbca0
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Mar 12 15:37:00 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Mar 12 15:37:00 2016 -0800

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/Hop.java    |   4 +-
 .../sysml/hops/ParameterizedBuiltinOp.java      |  22 +++-
 .../apache/sysml/lops/ParameterizedBuiltin.java |  65 +++++-----
 .../org/apache/sysml/parser/DMLTranslator.java  |   8 ++
 .../org/apache/sysml/parser/Expression.java     |   2 +-
 .../ParameterizedBuiltinFunctionExpression.java |  41 ++++++-
 .../instructions/CPInstructionParser.java       |   1 +
 .../cp/ParameterizedBuiltinCPInstruction.java   |  29 +++--
 .../sysml/runtime/matrix/data/FrameBlock.java   |  17 ++-
 .../sysml/runtime/transform/DataTransform.java  |  44 +++++++
 .../sysml/runtime/transform/RecodeAgent.java    |  31 ++++-
 .../apache/sysml/runtime/transform/TfUtils.java | 119 ++++++++++---------
 .../functions/jmlc/FrameTransformTest.java      |  56 +++++++--
 src/test/scripts/functions/jmlc/transform.dml   |   5 +-
 14 files changed, 322 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/hops/Hop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/Hop.java b/src/main/java/org/apache/sysml/hops/Hop.java
index a468ed7..1385995 100644
--- a/src/main/java/org/apache/sysml/hops/Hop.java
+++ b/src/main/java/org/apache/sysml/hops/Hop.java
@@ -1089,7 +1089,8 @@ public abstract class Hop
 	};
 
 	public enum ParamBuiltinOp {
-		INVALID, CDF, INVCDF, GROUPEDAGG, RMEMPTY, REPLACE, REXPAND, TRANSFORM
+		INVALID, CDF, INVCDF, GROUPEDAGG, RMEMPTY, REPLACE, REXPAND, 
+		TRANSFORM, TRANSFORMAPPLY
 	};
 
 	/**
@@ -1330,6 +1331,7 @@ public abstract class Hop
 		HopsParameterizedBuiltinLops.put(ParamBuiltinOp.REPLACE, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.REPLACE);
 		HopsParameterizedBuiltinLops.put(ParamBuiltinOp.REXPAND, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.REXPAND);
 		HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORM, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM);
+		HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMAPPLY, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMAPPLY);		
 	}
 
 	protected static final HashMap<Hop.OpOp2, String> HopsOpOp2String;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 3a8445f..cf5206d 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -217,6 +217,15 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
 			pbilop.getOutputParameters().setFormat(Format.CSV);
 			setLops(pbilop);
 		}
+		else if ( _op == ParamBuiltinOp.TRANSFORMAPPLY ) 
+		{
+			ExecType et = optFindExecType();			
+			ParameterizedBuiltin pbilop = new ParameterizedBuiltin(inputlops,
+					HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et);
+			setOutputDimensions(pbilop);
+			setLineNumbers(pbilop);
+			setLops(pbilop);
+		}
 
 		//add reblock/checkpoint lops if necessary
 		constructAndSetLopsDataFlowProperties();
@@ -979,12 +988,13 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
 		}
 		else 
 		{
-			if( _op == ParamBuiltinOp.TRANSFORM )
-			{
-				// force remote execution type here.
-				// At runtime, cp-side transform is triggered for small files.
+			if( _op == ParamBuiltinOp.TRANSFORM ) {
+				// force remote, at runtime cp transform triggered for small files.
 				return REMOTE;
 			}
+			else if( _op == ParamBuiltinOp.TRANSFORMAPPLY ) {
+				return ExecType.CP;
+			}
 			
 			if ( OptimizerUtils.isMemoryBasedOptLevel() ) {
 				_etype = findExecTypeByMemEstimate();
@@ -1081,6 +1091,10 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
 				
 				break;	
 			}
+			case TRANSFORMAPPLY: {
+				Hop target = getInput().get(_paramIndexMap.get("target"));
+				setDim1( target.getDim1() ); //rows remain unchanged
+			}
 			default:
 				//do nothing
 				break;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
index 17ec274..4a8ae72 100644
--- a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
+++ b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.lops;
 
 import java.util.HashMap;
+import java.util.Map.Entry;
 
 import org.apache.sysml.hops.HopsException;
 import org.apache.sysml.lops.LopProperties.ExecLocation;
@@ -40,7 +41,7 @@ public class ParameterizedBuiltin extends Lop
 	public enum OperationTypes { 
 		INVALID, CDF, INVCDF, RMEMPTY, REPLACE, REXPAND, 
 		PNORM, QNORM, PT, QT, PF, QF, PCHISQ, QCHISQ, PEXP, QEXP,
-		TRANSFORM
+		TRANSFORM, TRANSFORMAPPLY,
 	};
 	
 	private OperationTypes _operation;
@@ -211,24 +212,12 @@ public class ParameterizedBuiltin extends Lop
 				
 				break;
 			
-			case REPLACE:
+			case REPLACE: {
 				sb.append( "replace" );
 				sb.append( OPERAND_DELIMITOR );
-				
-				for ( String s : _inputParams.keySet() ) 
-				{	
-					sb.append( s );
-					sb.append( NAME_VALUE_SEPARATOR );
-					
-					// get the value/label of the scalar input associated with name "s"
-					Lop iLop = _inputParams.get(s);
-					if( s.equals("target") )
-						sb.append(iLop.getOutputParameters().getLabel());
-					else
-						sb.append( iLop.prepScalarLabel() );
-					sb.append( OPERAND_DELIMITOR );
-				}
+				sb.append(compileGenericParamMap(_inputParams));
 				break;
+			}
 			
 			case REXPAND:
 				sb.append("rexpand");
@@ -252,23 +241,16 @@ public class ParameterizedBuiltin extends Lop
 				
 				break;
 				
-			case TRANSFORM:
-			{
+			case TRANSFORM: {
 				sb.append("transform");
 				sb.append(OPERAND_DELIMITOR);
-				
-				for ( String s : _inputParams.keySet() ) {
-					sb.append(s);
-					sb.append(NAME_VALUE_SEPARATOR);
-					
-					Lop iLop = _inputParams.get(s);
-					if( iLop.getDataType() != DataType.SCALAR )
-						sb.append( iLop.getOutputParameters().getLabel());
-					else
-						sb.append( iLop.prepScalarLabel() );
-					
-					sb.append(OPERAND_DELIMITOR);
-				}
+				sb.append(compileGenericParamMap(_inputParams));
+				break;
+			}			
+			case TRANSFORMAPPLY: {
+				sb.append("transformapply");
+				sb.append(OPERAND_DELIMITOR);
+				sb.append(compileGenericParamMap(_inputParams));
 				break;
 			}
 				
@@ -512,5 +494,24 @@ public class ParameterizedBuiltin extends Lop
 		sb.append(" ; blocked=" + this.getOutputParameters().isBlocked());
 		return sb.toString();
 	}
-
+	
+	/**
+	 * 
+	 * @param params
+	 * @return
+	 */
+	private static String compileGenericParamMap(HashMap<String, Lop> params) {
+		StringBuilder sb = new StringBuilder();		
+		for ( Entry<String, Lop> e : params.entrySet() ) {
+			sb.append(e.getKey());
+			sb.append(NAME_VALUE_SEPARATOR);
+			if( e.getValue().getDataType() != DataType.SCALAR )
+				sb.append( e.getValue().getOutputParameters().getLabel());
+			else
+				sb.append( e.getValue().prepScalarLabel() );
+			sb.append(OPERAND_DELIMITOR);
+		}
+		
+		return sb.toString();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/parser/DMLTranslator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DMLTranslator.java b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
index 543acf0..f295fe0 100644
--- a/src/main/java/org/apache/sysml/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
@@ -1983,6 +1983,14 @@ public class DMLTranslator
 									target.getValueType(), ParamBuiltinOp.TRANSFORM, 
 									paramHops);
 			break;	
+		
+		case TRANSFORMAPPLY:
+			currBuiltinOp = new ParameterizedBuiltinOp(
+									target.getName(), target.getDataType(), 
+									target.getValueType(), ParamBuiltinOp.TRANSFORMAPPLY, 
+									paramHops);
+			break;	
+			
 			
 		default:
 			

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/parser/Expression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/Expression.java b/src/main/java/org/apache/sysml/parser/Expression.java
index 99edb74..afce7bd 100644
--- a/src/main/java/org/apache/sysml/parser/Expression.java
+++ b/src/main/java/org/apache/sysml/parser/Expression.java
@@ -126,7 +126,7 @@ public abstract class Expression
 		GROUPEDAGG, RMEMPTY, REPLACE, ORDER, 
 		// Distribution Functions
 		CDF, INVCDF, PNORM, QNORM, PT, QT, PF, QF, PCHISQ, QCHISQ, PEXP, QEXP,
-		TRANSFORM, 
+		TRANSFORM, TRANSFORMAPPLY, 
 		INVALID
 	};
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
index 8d1f454..edda675 100644
--- a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
@@ -33,8 +33,9 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
 	private HashMap<String,Expression> _varParams;
 	
 	public static final String TF_FN_PARAM_DATA = "target";
-	public static final String TF_FN_PARAM_MTD = "transformPath";
+	public static final String TF_FN_PARAM_MTD2 = "meta";
 	public static final String TF_FN_PARAM_SPEC = "spec";
+	public static final String TF_FN_PARAM_MTD = "transformPath"; //NOTE MB: for backwards compatibility
 	public static final String TF_FN_PARAM_APPLYMTD = "applyTransformPath";
 	public static final String TF_FN_PARAM_OUTNAMES = "outputNames";
 	
@@ -64,6 +65,7 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
 
 		// data transformation functions
 		opcodeMap.put("transform",	Expression.ParameterizedBuiltinFunctionOp.TRANSFORM);
+		opcodeMap.put("transformapply",	Expression.ParameterizedBuiltinFunctionOp.TRANSFORMAPPLY);
 	}
 	
 	public static HashMap<Expression.ParameterizedBuiltinFunctionOp, ParamBuiltinOp> pbHopMap;
@@ -230,7 +232,11 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
 		case TRANSFORM:
 			validateTransform(output, conditional);
 			break;
-			
+		
+		case TRANSFORMAPPLY:
+			validateTransformApply(output, conditional);
+			break;
+		
 		default: //always unconditional (because unsupported operation)
 			raiseValidateError("Unsupported parameterized function "+ this.getOpCode(), false, LanguageErrorCodes.INVALID_PARAMETERS);
 		}
@@ -291,6 +297,37 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
 		output.setDimensions(-1, -1);
 	}
 	
+	// example: A = transformapply(target=X, meta=M, spec=s)
+	private void validateTransformApply(DataIdentifier output, boolean conditional) 
+		throws LanguageException 
+	{
+		//validate data
+		Expression data = getVarParam(TF_FN_PARAM_DATA);
+		if( data==null )				
+			raiseValidateError("Named parameter '" + TF_FN_PARAM_DATA + "' missing. Please specify the input data set.", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+		else if( data.getOutput().getDataType() != DataType.FRAME )
+			raiseValidateError("Input to tansformapply() must be of type 'frame'. It is of type '"+data.getOutput().getDataType()+"'.", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+			
+		//validate meta data (recode maps)
+		Expression mtd = getVarParam(TF_FN_PARAM_MTD2);
+		if( mtd==null )
+			raiseValidateError("Named parameter '" + TF_FN_PARAM_MTD2 + "' missing. Please specify the transformation metadata.", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+		else if( mtd.getOutput().getDataType() != DataType.FRAME )
+			raiseValidateError("Metadata of tansformapply() must be of type 'frame'. It is of type '"+data.getOutput().getDataType()+"'.", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+		
+		//validate specification
+		Expression spec = getVarParam(TF_FN_PARAM_SPEC);
+		if( spec==null )
+			raiseValidateError("Named parameter '" + TF_FN_PARAM_SPEC + "' missing. Please specify the transformation specification (JSON string).", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+		else if( spec.getOutput().getDataType() != DataType.SCALAR  || spec.getOutput().getValueType() != ValueType.STRING )
+			raiseValidateError("Transformation specification '" + TF_FN_PARAM_SPEC + "' must be a string value (a scalar).", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+		
+		//set output dimensions
+		output.setDataType(DataType.MATRIX);
+		output.setValueType(ValueType.DOUBLE);
+		output.setDimensions(-1, -1);
+	}
+	
 	private void validateReplace(DataIdentifier output, boolean conditional) throws LanguageException {
 		//check existence and correctness of arguments
 		Expression target = getVarParam("target");

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
index 4cb67e3..0a99f2a 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
@@ -181,6 +181,7 @@ public class CPInstructionParser extends InstructionParser
 		String2CPInstructionType.put( "replace"	    , CPINSTRUCTION_TYPE.ParameterizedBuiltin);
 		String2CPInstructionType.put( "rexpand"	    , CPINSTRUCTION_TYPE.ParameterizedBuiltin);
 		String2CPInstructionType.put( "transform"	, CPINSTRUCTION_TYPE.ParameterizedBuiltin);
+		String2CPInstructionType.put( "transformapply",CPINSTRUCTION_TYPE.ParameterizedBuiltin);
 
 		// Variable Instruction Opcodes 
 		String2CPInstructionType.put( "assignvar"   , CPINSTRUCTION_TYPE.Variable);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index 2245d4c..314457c 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -19,14 +19,13 @@
 
 package org.apache.sysml.runtime.instructions.cp;
 
-import java.io.IOException;
 import java.util.HashMap;
 
-import org.apache.wink.json4j.JSONException;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.parser.Statement;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
+import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.functionobjects.ParameterizedBuiltin;
@@ -120,7 +119,9 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction
 			func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
 			return new ParameterizedBuiltinCPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str);
 		}
-		else if ( opcode.equals("transform")) {
+		else if (   opcode.equals("transform")
+				 || opcode.equals("transformapply")) 
+		{
 			return new ParameterizedBuiltinCPInstruction(null, paramsMap, out, opcode, str);
 		}
 		else {
@@ -226,19 +227,27 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction
 		}
 		else if ( opcode.equalsIgnoreCase("transform")) {
 			MatrixObject mo = (MatrixObject) ec.getVariable(params.get("target"));
-			MatrixObject out = (MatrixObject) ec.getVariable(output.getName());
-			
+			MatrixObject out = (MatrixObject) ec.getVariable(output.getName());			
 			try {
 				JobReturn jt = DataTransform.cpDataTransform(this, new MatrixObject[] { mo } , new MatrixObject[] {out} );
 				out.updateMatrixCharacteristics(jt.getMatrixCharacteristics(0));
-			} catch (IllegalArgumentException e) {
-				throw new DMLRuntimeException(e);
-			} catch (IOException e) {
-				throw new DMLRuntimeException(e);
-			} catch (JSONException e) {
+			} catch (Exception e) {
 				throw new DMLRuntimeException(e);
 			}
 		}
+		else if ( opcode.equalsIgnoreCase("transformapply")) {
+			//sanity checks valid inputs
+			if( !(ec.getVariable(params.get("target")) instanceof FrameObject) )
+				throw new DMLRuntimeException("Transformapply requires FrameObject input for 'target'.");
+			if( !(ec.getVariable(params.get("meta")) instanceof FrameObject) )
+				throw new DMLRuntimeException("Transformapply requires FrameObject input for 'meta'.");
+			
+			FrameObject dataobj = (FrameObject) ec.getVariable(params.get("target"));
+			FrameObject metaobj = (FrameObject) ec.getVariable(params.get("meta"));
+			
+			MatrixBlock mbout = DataTransform.cpDataTransform(getParameterMap(), dataobj.getData(), metaobj.getData() );
+			ec.setMatrixOutput(output.getName(), mbout);
+		}
 		else {
 			throw new DMLRuntimeException("Unknown opcode : " + opcode);
 		}		

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 34078f7..cb44d8e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -27,8 +27,10 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -73,7 +75,7 @@ public class FrameBlock implements Writable, Externalizable
 	}
 	
 	public FrameBlock(List<ValueType> schema, List<String> names, String[][] data) {
-		_numRows = data.length;
+		_numRows = 0; //maintained on append
 		_schema = new ArrayList<ValueType>(schema);
 		_colnames = new ArrayList<String>(names);
 		_coldata = new ArrayList<Array>();
@@ -119,6 +121,19 @@ public class FrameBlock implements Writable, Externalizable
 	}
 	
 	/**
+	 * Creates a mapping from column names to column IDs, i.e., 
+	 * 1-based column indexes
+	 * 
+	 * @return
+	 */
+	public Map<String,Integer> getColumnNameIDMap() {
+		Map<String, Integer> ret = new HashMap<String, Integer>();
+		for( int j=0; j<getNumColumns(); j++ )
+			ret.put(_colnames.get(j), j+1);
+		return ret;	
+	}
+	
+	/**
 	 * Allocate column data structures if necessary, i.e., if schema specified
 	 * but not all column data structures created yet.
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
index fb6df5e..44e9141 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -72,6 +73,7 @@ import org.apache.sysml.runtime.matrix.JobReturn;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FileFormatProperties;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
@@ -1053,6 +1055,48 @@ public class DataTransform
 	}
 	
 	/**
+	 * Apply given transform metadata (incl recode maps) over an in-memory frame input in order to
+	 * create a transformed numerical matrix. Note: The number of rows always remains unchanged, 
+	 * whereas the number of column might increase or decrease. 
+	 * 
+	 * @param params
+	 * @param input
+	 * @param meta
+	 * @param spec
+	 * @return
+	 * @throws DMLRuntimeException
+	 * @throws  
+	 */
+	public static MatrixBlock cpDataTransform(HashMap<String,String> params, FrameBlock input, FrameBlock meta) 
+		throws DMLRuntimeException
+	{
+		MatrixBlock ret = null;
+		
+		try
+		{
+			//initialize transform encoders
+			JSONObject spec = new JSONObject(params.get("spec"));
+			TfUtils agents = new TfUtils(spec, input.getNumColumns());		
+			agents.getRecodeAgent().initRecodeMaps(meta);
+			
+			//core transform apply over input frame and append to output
+			//FIXME number of output columns after encoder creation
+			ret = new MatrixBlock(input.getNumRows(), input.getNumColumns(), false);
+			Iterator<String[]> iter = input.getStringRowIterator();
+			for( int i=0; iter.hasNext(); i++ ) {
+				String[] tmp = agents.apply(iter.next(), true);
+				for( int j=0; j<tmp.length; j++ )
+					ret.appendValue(i, j, UtilFunctions.parseToDouble(tmp[j]));
+			}
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+		
+		return ret;
+	}
+	
+	/**
 	 * Helper function to fetch and sort the list of part files under the given
 	 * input directory.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
index 295c056..2ca3cfc 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
@@ -42,6 +42,9 @@ import org.apache.wink.json4j.JSONObject;
 import scala.Tuple2;
 
 import com.google.common.collect.Ordering;
+
+import org.apache.sysml.lops.Lop;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
@@ -62,8 +65,14 @@ public class RecodeAgent extends TransformationAgent {
 		
 		if ( parsedSpec.containsKey(TX_METHOD.RECODE.toString())) 
 		{
-			JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.RECODE.toString());
-			JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
+			//TODO consolidate external and internal json spec definitions
+			JSONArray attrs = null;
+			if( parsedSpec.get(TX_METHOD.RECODE.toString()) instanceof JSONObject ) {
+				JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.RECODE.toString());
+				attrs = (JSONArray) obj.get(JSON_ATTRS);
+			}
+			else
+				attrs = (JSONArray)parsedSpec.get(TX_METHOD.RECODE.toString());
 			
 			_rcdList = new int[attrs.size()];
 			for(int i=0; i < _rcdList.length; i++) 
@@ -96,6 +105,24 @@ public class RecodeAgent extends TransformationAgent {
 		}
 	}
 	
+	/**
+	 * Construct the recodemaps from the given input frame for all 
+	 * columns registered for recode.
+	 * 
+	 * @param frame
+	 */
+	public void initRecodeMaps( FrameBlock frame ) {
+		for( int j=0; j<_rcdList.length; j++ ) {
+			int colID = _rcdList[j]; //1-based
+			HashMap<String,Long> map = new HashMap<String,Long>();
+			for( int i=0; i<frame.getNumRows(); i++ ) {
+				String[] tmp = frame.get(i, colID-1).toString().split(Lop.DATATYPE_PREFIX);
+				map.put(tmp[0], Long.parseLong(tmp[1]));
+			}
+			_rcdMaps.put(colID, map);
+		}
+	}
+	
 	void prepare(String[] words, TfUtils agents) {
 		if ( _rcdList == null && _mvrcdList == null )
 			return;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index 786b111..7ce3228 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -82,6 +82,69 @@ public class TfUtils implements Serializable{
 	private String _tmpDir = null;
 	private String _outputPath = null;
 	
+
+	public TfUtils(JobConf job, boolean minimal) 
+		throws IOException, JSONException 
+	{
+		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+			ConfigurationManager.setCachedJobConf(job);
+		}
+		
+		_NAstrings = TfUtils.parseNAStrings(job);
+		_specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
+		
+		FileSystem fs = FileSystem.get(job);
+		JSONObject spec = TfUtils.readSpec(fs, _specFile);
+		
+		_oa = new OmitAgent(spec);
+	}
+	
+	// called from GenTFMtdMapper, ApplyTf (Hadoop)
+	public TfUtils(JobConf job) 
+		throws IOException, JSONException 
+	{
+		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+			ConfigurationManager.setCachedJobConf(job);
+		}
+		
+		boolean hasHeader = Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
+		//Pattern delim = Pattern.compile(Pattern.quote(job.get(MRJobConfiguration.TF_DELIM)));
+		String[] naStrings = TfUtils.parseNAStrings(job);
+		
+		long numCols = UtilFunctions.parseToLong( job.get(MRJobConfiguration.TF_NUM_COLS) );		// #of columns in input data
+			
+		String specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
+		String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
+		String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
+		String outputPath = FileOutputFormat.getOutputPath(job).toString();
+		FileSystem fs = FileSystem.get(job);
+		JSONObject spec = TfUtils.readSpec(fs, specFile);
+		
+		init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, job.get(MRJobConfiguration.TF_DELIM), naStrings, spec, numCols, offsetFile, tmpPath, outputPath);
+	}
+	
+	// called from GenTfMtdReducer 
+	public TfUtils(JobConf job, String tfMtdDir) throws IOException, JSONException 
+	{
+		this(job);
+		_tfMtdDir = tfMtdDir;
+	}
+	
+	// called from GenTFMtdReducer and ApplyTf (Spark)
+	public TfUtils(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String offsetFile, String tmpPath) throws IOException, JSONException {
+		init (headerLine, hasHeader, delim, naStrings, spec, ncol, offsetFile, tmpPath, null);
+		_tfMtdDir = tfMtdDir;
+	}
+	
+	//called from cp frame transformapply
+	public TfUtils(JSONObject spec, long inNcol) 
+		throws IOException, JSONException 
+	{
+		//TODO recodemaps handover
+		_numInputCols = inNcol;
+		createAgents(spec);
+	}
+	
 	protected static boolean checkValidInputFile(FileSystem fs, Path path, boolean err)
 			throws IOException {
 		// check non-existing file
@@ -193,59 +256,6 @@ public class TfUtils implements Serializable{
 		createAgents(spec);
 	}
 	
-	public TfUtils(JobConf job, boolean minimal) 
-		throws IOException, JSONException 
-	{
-		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-			ConfigurationManager.setCachedJobConf(job);
-		}
-		
-		_NAstrings = TfUtils.parseNAStrings(job);
-		_specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
-		
-		FileSystem fs = FileSystem.get(job);
-		JSONObject spec = TfUtils.readSpec(fs, _specFile);
-		
-		_oa = new OmitAgent(spec);
-	}
-	
-	// called from GenTFMtdMapper, ApplyTf (Hadoop)
-	public TfUtils(JobConf job) 
-		throws IOException, JSONException 
-	{
-		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-			ConfigurationManager.setCachedJobConf(job);
-		}
-		
-		boolean hasHeader = Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
-		//Pattern delim = Pattern.compile(Pattern.quote(job.get(MRJobConfiguration.TF_DELIM)));
-		String[] naStrings = TfUtils.parseNAStrings(job);
-		
-		long numCols = UtilFunctions.parseToLong( job.get(MRJobConfiguration.TF_NUM_COLS) );		// #of columns in input data
-			
-		String specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
-		String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
-		String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
-		String outputPath = FileOutputFormat.getOutputPath(job).toString();
-		FileSystem fs = FileSystem.get(job);
-		JSONObject spec = TfUtils.readSpec(fs, specFile);
-		
-		init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, job.get(MRJobConfiguration.TF_DELIM), naStrings, spec, numCols, offsetFile, tmpPath, outputPath);
-	}
-	
-	// called from GenTfMtdReducer 
-	public TfUtils(JobConf job, String tfMtdDir) throws IOException, JSONException 
-	{
-		this(job);
-		_tfMtdDir = tfMtdDir;
-	}
-	
-	// called from GenTFMtdReducer and ApplyTf (Spark)
-	public TfUtils(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String offsetFile, String tmpPath) throws IOException, JSONException {
-		init (headerLine, hasHeader, delim, naStrings, spec, ncol, offsetFile, tmpPath, null);
-		_tfMtdDir = tfMtdDir;
-	}
-	
 	public void incrValid() { _numValidRecords++; }
 	public long getValid()  { return _numValidRecords; }
 	public long getTotal()  { return _numRecordsInPartFile; }
@@ -424,8 +434,7 @@ public class TfUtils implements Serializable{
 			words = getRecodeAgent().apply(words, this);
 
 		words = getBinAgent().apply(words, this);
-		words = getDummycodeAgent().apply(words, this);
-		
+		words = getDummycodeAgent().apply(words, this);		
 		_numTransformedRows++;
 		
 		return words;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java
index 69950a3..7940095 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java
@@ -22,12 +22,15 @@ package org.apache.sysml.test.integration.functions.jmlc;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Map.Entry;
 
 import org.junit.Assert;
+import org.junit.Test;
 import org.apache.sysml.api.DMLException;
 import org.apache.sysml.api.jmlc.Connection;
 import org.apache.sysml.api.jmlc.PreparedScript;
 import org.apache.sysml.api.jmlc.ResultVariables;
+import org.apache.sysml.lops.Lop;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
@@ -46,7 +49,7 @@ public class FrameTransformTest extends AutomatedTestBase
 	private final static int rows = 700;
 	private final static int cols = 3;
 	
-	private final static int nRuns = 10;
+	private final static int nRuns = 2;
 	
 	private final static double sparsity1 = 0.7;
 	private final static double sparsity2 = 0.1;
@@ -57,7 +60,6 @@ public class FrameTransformTest extends AutomatedTestBase
 		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "Y" }) ); 
 	}
 	
-	/*
 	@Test
 	public void testJMLCTransformDense() throws IOException {
 		runJMLCReuseTest(TEST_NAME1, false, false);
@@ -77,7 +79,6 @@ public class FrameTransformTest extends AutomatedTestBase
 	public void testJMLCTransformSparseReuse() throws IOException {
 		runJMLCReuseTest(TEST_NAME1, true, true);
 	}
-	*/
 
 	/**
 	 * 
@@ -86,7 +87,6 @@ public class FrameTransformTest extends AutomatedTestBase
 	 * @param instType
 	 * @throws IOException 
 	 */
-	@SuppressWarnings("unused")
 	private void runJMLCReuseTest( String testname, boolean sparse, boolean modelReuse ) 
 		throws IOException
 	{	
@@ -98,13 +98,14 @@ public class FrameTransformTest extends AutomatedTestBase
 		//generate inputs
 		double[][] Xd = TestUtils.round(getRandomMatrix(rows, cols, 0.51, 7.49, sparse?sparsity2:sparsity1, 1234));
 		String[][] Xs = createFrameData(Xd);
+		String[][] Ms = createRecodeMaps(Xs);
 		
 		//run DML via JMLC
-		ArrayList<double[][]> Yset = execDMLScriptviaJMLC( TEST_NAME, Xs, modelReuse );
+		ArrayList<double[][]> Yset = execDMLScriptviaJMLC( TEST_NAME, Xs, Ms, modelReuse );
 		
-		//check non-empty y
+		//check correct result (nnz 7 + 0 -> 8 distinct vals)
 		for( double[][] data : Yset )
-			Assert.assertEquals("Wrong result: "+data[0][0]+".", new Double(7), new Double(data[0][0]));
+			Assert.assertEquals("Wrong result: "+data[0][0]+".", new Double(8), new Double(data[0][0]));
 	}
 
 	/**
@@ -114,7 +115,7 @@ public class FrameTransformTest extends AutomatedTestBase
 	 * @throws DMLException
 	 * @throws IOException
 	 */
-	private ArrayList<double[][]> execDMLScriptviaJMLC( String testname, String[][] X, boolean modelReuse) 
+	private ArrayList<double[][]> execDMLScriptviaJMLC( String testname, String[][] X, String[][] M, boolean modelReuse) 
 		throws IOException
 	{
 		Timing time = new Timing(true);
@@ -128,22 +129,22 @@ public class FrameTransformTest extends AutomatedTestBase
 		{
 			//prepare input arguments
 			HashMap<String,String> args = new HashMap<String,String>();
-			args.put("$TRANSFORM_PATH", SCRIPT_DIR + TEST_DIR + "/tfmtd");
 			args.put("$TRANSFORM_SPEC", "{ \"ids\": true ,\"recode\": [ 1, 2, 3] }");
 			
 			//read and precompile script
 			String script = conn.readScript(SCRIPT_DIR + TEST_DIR + testname + ".dml");	
-			PreparedScript pstmt = conn.prepareScript(script, args, new String[]{"X"}, new String[]{"Y"}, false);
+			PreparedScript pstmt = conn.prepareScript(script, args, new String[]{"X","M"}, new String[]{"Y"}, false);
 			
 			if( modelReuse )
-				pstmt.setFrame("X", X);
+				pstmt.setFrame("M", M, true);
 			
 			//execute script multiple times
 			for( int i=0; i<nRuns; i++ )
 			{
 				//bind input parameters
 				if( !modelReuse )
-					pstmt.setFrame("X", X);
+					pstmt.setFrame("M", M);
+				pstmt.setFrame("X", X);
 				
 				//execute script
 				ResultVariables rs = pstmt.executeScript();
@@ -185,4 +186,35 @@ public class FrameTransformTest extends AutomatedTestBase
 		
 		return ret;
 	}
+	
+	private String[][] createRecodeMaps(String[][] data) {
+		//create maps per column
+		ArrayList<HashMap<String,Integer>> map = new ArrayList<HashMap<String,Integer>>(); 
+		for( int j=0; j<data[0].length; j++ )
+			map.add(new HashMap<String,Integer>());
+		//create recode maps per column
+		for( int i=0; i<data.length; i++ ) {
+			for( int j=0; j<data[i].length; j++ )
+				if( !map.get(j).containsKey(data[i][j]) )
+					map.get(j).put(data[i][j], map.get(j).size()+1);
+		}
+		//determine max recode map size
+		int max = 0;
+		for( int j=0; j<data[0].length; j++ )
+			max = Math.max(max, map.get(j).size());
+		
+		//allocate output
+		String[][] ret = new String[max][];
+		for( int i=0; i<max; i++ )
+			ret[i] = new String[data[0].length];
+		
+		//create frame of recode maps
+		for( int j=0; j<data[0].length; j++) {
+			int i = 0;
+			for( Entry<String, Integer> e : map.get(j).entrySet() )
+				ret[i++][j] = e.getKey()+Lop.DATATYPE_PREFIX+e.getValue();
+		}
+		
+		return ret;
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/test/scripts/functions/jmlc/transform.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/jmlc/transform.dml b/src/test/scripts/functions/jmlc/transform.dml
index 1fce0bb..a7fe77a 100644
--- a/src/test/scripts/functions/jmlc/transform.dml
+++ b/src/test/scripts/functions/jmlc/transform.dml
@@ -19,10 +19,11 @@
 #
 #-------------------------------------------------------------
 
-X = read($X, data_type="frame", format="csv");
+X = read($X, data_type="frame", format="csv"); #new data
+M = read($M, data_type="frame", format="csv"); #existing recode maps
 specJson = $TRANSFORM_SPEC
 
-Xt = transform(target=X, transformPath=$TRANSFORM_PATH, spec=specJson);
+Xt = transformapply(target=X, meta=M, spec=specJson);
 
 V = matrix(Xt, rows=nrow(Xt)*ncol(Xt), cols=1);
 Y = as.matrix(sum(table(V, 1) != 0))