You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:51 UTC

[09/60] Rewrite the Scala API as (somewhat) thin Layer on Java API

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 58a9a2a..b6193aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -53,6 +53,22 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
 	}
 
 	@Override
+	public T createInstance(Object[] fields) {
+		try {
+			T t = tupleClass.newInstance();
+
+			for (int i = 0; i < arity; i++) {
+				t.setField(fields[i], i);
+			}
+
+			return t;
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Cannot instantiate tuple.", e);
+		}
+	}
+
+	@Override
 	public T copy(T from, T reuse) {
 		for (int i = 0; i < arity; i++) {
 			Object copy = fieldSerializers[i].copy(from.getField(i), reuse.getField(i));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 69133b6..3e630fd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -69,6 +69,14 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 		return -1;
 	}
 
+	public int getArity() {
+		return arity;
+	}
+
+	// We use this in the Aggregate and Distinct Operators to create instances
+	// of immutable Typles (i.e. Scala Tuples)
+	public abstract T createInstance(Object[] fields);
+
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		for (int i = 0; i < arity; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index 3a88565..fdda028 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -46,9 +46,7 @@ under the License.
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   </properties>
 
-  <!--  These two requirements are the minimum to use and develop Flink.
-        You can add others like <artifactId>flink-scala</artifactId> for Scala!
-  -->
+  <!--  These two requirements are the minimum to use and develop Flink. -->
   <dependencies>
     <dependency>
       <groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
index 0c29856..f9dad03 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
@@ -1,108 +1,53 @@
-package ${package};
+package ${package}
 
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.flink.api.common.Program
-import org.apache.flink.api.common.ProgramDescription
-import org.apache.flink.client.LocalExecutor
-import org.apache.flink.api.scala.TextFile
-import org.apache.flink.api.scala.ScalaPlan
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.operators._
-import org.apache.flink.client.RemoteExecutor
-
-// You can run this locally using:
-// mvn exec:exec -Dexec.executable="java" -Dexec.args="-cp %classpath ${package}.RunJobLocal 2 file:///some/path file:///some/other/path"
-object RunJobLocal {
-  def main(args: Array[String]) {
-    val job = new Job
-    if (args.size < 3) {
-      println(job.getDescription)
-      return
-    }
-    val plan = job.getScalaPlan(args(0).toInt, args(1), args(2))
-    LocalExecutor.execute(plan)
-    System.exit(0)
-  }
-}
-
-// You can run this on a cluster using:
-// mvn exec:exec -Dexec.executable="java" -Dexec.args="-cp %classpath ${package}.RunJobRemote 2 file:///some/path file:///some/other/path"
-object RunJobRemote {
-  def main(args: Array[String]) {
-    val job = new Job
-    if (args.size < 3) {
-      println(job.getDescription)
-      return
-    }
-    val plan = job.getScalaPlan(args(0).toInt, args(1), args(2))
-    // This will create an executor to run the plan on a cluster. We assume
-    // that the JobManager is running on the local machine on the default
-    // port. Change this according to your configuration.
-    // You will also need to change the name of the jar if you change the
-    // project name and/or version. Before running this you also need
-    // to run "mvn package" to create the jar.
-    val ex = new RemoteExecutor("localhost", 6123, "target/flink-project-0.1-SNAPSHOT.jar")
-    ex.executePlan(plan)
-  }
-}
-
 
 /**
- * This is a outline for a Flink scala job. It is actually the WordCount
- * example from the here distribution.
+ * Skeleton for a Flink Job.
  *
- * You can run it out of your IDE using the main() method of RunJob.
- * This will use the LocalExecutor to start a little Flink instance
- * out of your IDE.
+ * For a full example of a Flink Job, see the WordCountJob.scala file in the
+ * same package/directory or have a look at the website.
  *
  * You can also generate a .jar file that you can submit on your Flink
- * cluster.
- * Just type
- *      mvn clean package
- * in the projects root directory.
- * You will find the jar in
- *      target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
+ * cluster. Just type
+ * {{{
+ *   mvn clean package
+ * }}}
+ * in the projects root directory. You will find the jar in
+ * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
  *
  */
-class Job extends Program with ProgramDescription with Serializable {
-  override def getDescription() = {
-    "Parameters: [numSubStasks] [input] [output]"
-  }
-  override def getPlan(args: String*) = {
-    getScalaPlan(args(0).toInt, args(1), args(2))
-  }
-
-  def formatOutput = (word: String, count: Int) => "%s %d".format(word, count)
-
-  def getScalaPlan(numSubTasks: Int, textInput: String, wordsOutput: String) = {
-    val input = TextFile(textInput)
-
-    val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } map { (_, 1) } }
-    val counts = words groupBy { case (word, _) => word } reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }
-
-    counts neglects { case (word, _) => word }
-    counts preserves({ case (word, _) => word }, { case (word, _) => word })
-    val output = counts.write(wordsOutput, DelimitedOutputFormat(formatOutput.tupled))
-
-    val plan = new ScalaPlan(Seq(output), "Word Count (immutable)")
-    plan.setDefaultParallelism(numSubTasks)
-    plan
+object Job {
+  def main(args: Array[String]) {
+    // set up the execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    /**
+     * Here, you can start creating your execution plan for Flink.
+     *
+     * Start with getting some data from the environment, like
+     * env.readTextFile(textPath);
+     *
+     * then, transform the resulting DataSet[String] using operations
+     * like:
+     *   .filter()
+     *   .flatMap()
+     *   .join()
+     *   .group()
+     *
+     * and many more.
+     * Have a look at the programming guide for the Scala API:
+     *
+     * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_guide.html
+     *
+     * and the examples
+     *
+     * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_examples.html
+     *
+     */
+
+
+    // execute program
+    env.execute("Flink Scala API Skeleton")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/WordCountJob.scala
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/WordCountJob.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/WordCountJob.scala
new file mode 100644
index 0000000..54408cb
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/WordCountJob.scala
@@ -0,0 +1,37 @@
+package ${package}
+
+import org.apache.flink.api.scala._
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over some sample data
+ *
+ * This example shows how to:
+ *
+ *   - write a simple Flink program.
+ *   - use Tuple data types.
+ *   - write and use user-defined functions.
+ */
+object WordCountJob {
+  def main(args: Array[String]) {
+
+    // set up the execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // get input data
+    val text = env.fromElements("To be, or not to be,--that is the question:--",
+      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
+      "Or to take arms against a sea of troubles,")
+
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    // emit result
+    counts.print()
+
+    // execute program
+    env.execute("WordCount Example")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 6ce737c..45db390 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -39,7 +39,23 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
-		<dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/java/org/apache/flink/api/scala/operators/Annotations.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/Annotations.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/Annotations.java
deleted file mode 100644
index ec7e9b7..0000000
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/Annotations.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.operators;
-
-import java.lang.annotation.Annotation;
-import java.util.Arrays;
-
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-
-public class Annotations {
-
-	public static Annotation getConstantFields(int[] fields) {
-		return new ConstantFields(fields);
-	}
-
-	public static Annotation getConstantFieldsFirst(int[] fields) {
-		return new ConstantFieldsFirst(fields);
-	}
-
-	public static Annotation getConstantFieldsSecond(int[] fields) {
-		return new ConstantFieldsSecond(fields);
-	}
-
-	public static Annotation getConstantFieldsExcept(int[] fields) {
-		return new ConstantFieldsExcept(fields);
-	}
-
-	public static Annotation getConstantFieldsFirstExcept(int[] fields) {
-		return new ConstantFieldsFirstExcept(fields);
-	}
-
-	public static Annotation getConstantFieldsSecondExcept(int[] fields) {
-		return new ConstantFieldsSecondExcept(fields);
-	}
-
-	public static Annotation getCombinable() {
-		return new Combinable();
-	}
-
-	private static abstract class Fields<T extends Annotation> implements Annotation {
-
-		private final Class<T> clazz;
-
-		private final int[] fields;
-
-		public Fields(Class<T> clazz, int[] fields) {
-			this.clazz = clazz;
-			this.fields = fields;
-		}
-
-		public int[] value() {
-			return fields;
-		}
-
-		@Override
-		public Class<? extends Annotation> annotationType() {
-			return clazz;
-		}
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public boolean equals(Object obj) {
-			if (obj == null || !annotationType().isAssignableFrom(obj.getClass())) {
-				return false;
-			}
-
-			if (!annotationType().equals(((Annotation) obj).annotationType())) {
-				return false;
-			}
-
-			int[] otherFields = getOtherFields((T) obj);
-			return Arrays.equals(fields, otherFields);
-		}
-
-		protected abstract int[] getOtherFields(T other);
-
-		@Override
-		public int hashCode() {
-			return 0xf16cd51b ^ Arrays.hashCode(fields);
-		}
-	}
-
-	@SuppressWarnings("all")
-	private static class ConstantFields extends Fields<FunctionAnnotation.ConstantFields> implements
-			FunctionAnnotation.ConstantFields {
-
-		public ConstantFields(int[] fields) {
-			super(FunctionAnnotation.ConstantFields.class, fields);
-		}
-
-		@Override
-		protected int[] getOtherFields(FunctionAnnotation.ConstantFields other) {
-			return other.value();
-		}
-	}
-
-	@SuppressWarnings("all")
-	private static class ConstantFieldsFirst extends Fields<FunctionAnnotation.ConstantFieldsFirst> implements
-			FunctionAnnotation.ConstantFieldsFirst {
-
-		public ConstantFieldsFirst(int[] fields) {
-			super(FunctionAnnotation.ConstantFieldsFirst.class, fields);
-		}
-
-		@Override
-		protected int[] getOtherFields(FunctionAnnotation.ConstantFieldsFirst other) {
-			return other.value();
-		}
-	}
-
-	@SuppressWarnings("all")
-	private static class ConstantFieldsSecond extends Fields<FunctionAnnotation.ConstantFieldsSecond> implements
-			FunctionAnnotation.ConstantFieldsSecond {
-
-		public ConstantFieldsSecond(int[] fields) {
-			super(FunctionAnnotation.ConstantFieldsSecond.class, fields);
-		}
-
-		@Override
-		protected int[] getOtherFields(FunctionAnnotation.ConstantFieldsSecond other) {
-			return other.value();
-		}
-	}
-
-	@SuppressWarnings("all")
-	private static class ConstantFieldsExcept extends Fields<FunctionAnnotation.ConstantFieldsExcept> implements
-			FunctionAnnotation.ConstantFieldsExcept {
-
-		public ConstantFieldsExcept(int[] fields) {
-			super(FunctionAnnotation.ConstantFieldsExcept.class, fields);
-		}
-
-		@Override
-		protected int[] getOtherFields(FunctionAnnotation.ConstantFieldsExcept other) {
-			return other.value();
-		}
-	}
-
-	@SuppressWarnings("all")
-	private static class ConstantFieldsFirstExcept extends Fields<FunctionAnnotation.ConstantFieldsFirstExcept> implements
-			FunctionAnnotation.ConstantFieldsFirstExcept {
-
-		public ConstantFieldsFirstExcept(int[] fields) {
-			super(FunctionAnnotation.ConstantFieldsFirstExcept.class, fields);
-		}
-
-		@Override
-		protected int[] getOtherFields(FunctionAnnotation.ConstantFieldsFirstExcept other) {
-			return other.value();
-		}
-	}
-
-	@SuppressWarnings("all")
-	private static class ConstantFieldsSecondExcept extends Fields<FunctionAnnotation.ConstantFieldsSecondExcept> implements
-			FunctionAnnotation.ConstantFieldsSecondExcept {
-
-		public ConstantFieldsSecondExcept(int[] fields) {
-			super(FunctionAnnotation.ConstantFieldsSecondExcept.class, fields);
-		}
-
-		@Override
-		protected int[] getOtherFields(FunctionAnnotation.ConstantFieldsSecondExcept other) {
-			return other.value();
-		}
-	}
-
-	@SuppressWarnings("all")
-	private static class Combinable implements Annotation, ReduceOperator.Combinable {
-
-		public Combinable() {
-		}
-
-		@Override
-		public Class<? extends Annotation> annotationType() {
-			return ReduceOperator.Combinable.class;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj == null || !annotationType().isAssignableFrom(obj.getClass())) {
-				return false;
-			}
-
-			if (!annotationType().equals(((Annotation) obj).annotationType())) {
-				return false;
-			}
-
-			return true;
-		}
-
-		@Override
-		public int hashCode() {
-			return 0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
new file mode 100644
index 0000000..82a1dd5
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.operators;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.aggregation.AggregationFunction;
+import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.java.operators.Grouping;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.operators.SingleInputOperator;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import scala.Product;
+
+/**
+ * This operator represents the application of a "aggregate" operation on a data set, and the
+ * result data set produced by the function.
+ *
+ * @param <IN> The type of the data set aggregated by the operator.
+ */
+public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, ScalaAggregateOperator<IN>> {
+
+	private final List<AggregationFunction<?>> aggregationFunctions = new ArrayList<AggregationFunction<?>>(4);
+
+	private final List<Integer> fields = new ArrayList<Integer>(4);
+
+	private final Grouping<IN> grouping;
+
+	/**
+	 * <p>
+	 * Non grouped aggregation
+	 */
+	public ScalaAggregateOperator(org.apache.flink.api.java.DataSet<IN> input, Aggregations function, int field) {
+		super(Validate.notNull(input), input.getType());
+
+		Validate.notNull(function);
+
+		if (!input.getType().isTupleType()) {
+			throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
+		}
+
+		TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getType();
+
+		if (field < 0 || field >= inType.getArity()) {
+			throw new IllegalArgumentException("Aggregation field position is out of range.");
+		}
+
+		AggregationFunctionFactory factory = function.getFactory();
+		AggregationFunction<?> aggFunct = factory.createAggregationFunction(inType.getTypeAt(field).getTypeClass());
+
+		// this is the first aggregation operator after a regular data set (non grouped aggregation)
+		this.aggregationFunctions.add(aggFunct);
+		this.fields.add(field);
+		this.grouping = null;
+	}
+
+	/**
+	 *
+	 * Grouped aggregation
+	 *
+	 * @param input
+	 * @param function
+	 * @param field
+	 */
+	public ScalaAggregateOperator(Grouping<IN> input, Aggregations function, int field) {
+		super(Validate.notNull(input).getDataSet(), input.getDataSet().getType());
+
+		Validate.notNull(function);
+
+		if (!input.getDataSet().getType().isTupleType()) {
+			throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
+		}
+
+		TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getDataSet().getType();
+
+		if (field < 0 || field >= inType.getArity()) {
+			throw new IllegalArgumentException("Aggregation field position is out of range.");
+		}
+
+		AggregationFunctionFactory factory = function.getFactory();
+		AggregationFunction<?> aggFunct = factory.createAggregationFunction(inType.getTypeAt(field).getTypeClass());
+
+		// set the aggregation fields
+		this.aggregationFunctions.add(aggFunct);
+		this.fields.add(field);
+		this.grouping = input;
+	}
+
+
+	public ScalaAggregateOperator<IN> and(Aggregations function, int field) {
+		Validate.notNull(function);
+
+		TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) getType();
+
+		if (field < 0 || field >= inType.getArity()) {
+			throw new IllegalArgumentException("Aggregation field position is out of range.");
+		}
+
+
+		AggregationFunctionFactory factory = function.getFactory();
+		AggregationFunction<?> aggFunct = factory.createAggregationFunction(inType.getTypeAt(field).getTypeClass());
+
+		this.aggregationFunctions.add(aggFunct);
+		this.fields.add(field);
+
+		return this;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> translateToDataFlow(Operator<IN> input) {
+
+		// sanity check
+		if (this.aggregationFunctions.isEmpty() || this.aggregationFunctions.size() != this.fields.size()) {
+			throw new IllegalStateException();
+		}
+
+
+		// construct the aggregation function
+		AggregationFunction<Object>[] aggFunctions = new AggregationFunction[this.aggregationFunctions.size()];
+		int[] fields = new int[this.fields.size()];
+		StringBuilder genName = new StringBuilder();
+
+		for (int i = 0; i < fields.length; i++) {
+			aggFunctions[i] = (AggregationFunction<Object>) this.aggregationFunctions.get(i);
+			fields[i] = this.fields.get(i);
+
+			genName.append(aggFunctions[i].toString()).append('(').append(fields[i]).append(')').append(',');
+		}
+		genName.setLength(genName.length()-1);
+
+		TypeSerializer<IN> serializer = getInputType().createSerializer();
+		TypeSerializerFactory<IN> serializerFactory = null;
+		if (serializer.isStateful()) {
+			serializerFactory = new RuntimeStatefulSerializerFactory<IN>(
+					serializer, getInputType().getTypeClass());
+		} else {
+			serializerFactory = new RuntimeStatelessSerializerFactory<IN>(
+					serializer, getInputType().getTypeClass());
+		}
+
+		@SuppressWarnings("rawtypes")
+		RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(serializerFactory, aggFunctions, fields);
+
+
+		String name = getName() != null ? getName() : genName.toString();
+
+		// distinguish between grouped reduce and non-grouped reduce
+		if (this.grouping == null) {
+			// non grouped aggregation
+			UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getResultType());
+			GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po =
+					new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, new int[0], name);
+
+			po.setCombinable(true);
+
+			// set input
+			po.setInput(input);
+			// set dop
+			po.setDegreeOfParallelism(this.getParallelism());
+
+			return po;
+		}
+
+		if (this.grouping.getKeys() instanceof Keys.FieldPositionKeys) {
+			// grouped aggregation
+			int[] logicalKeyPositions = this.grouping.getKeys().computeLogicalKeyPositions();
+			UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getResultType());
+			GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po =
+					new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, logicalKeyPositions, name);
+
+			po.setCombinable(true);
+
+			// set input
+			po.setInput(input);
+			// set dop
+			po.setDegreeOfParallelism(this.getParallelism());
+
+			SingleInputSemanticProperties props = new SingleInputSemanticProperties();
+
+			for (int i = 0; i < logicalKeyPositions.length; i++) {
+				int keyField = logicalKeyPositions[i];
+				boolean keyFieldUsedInAgg = false;
+
+				for (int k = 0; k < fields.length; k++) {
+					int aggField = fields[k];
+					if (keyField == aggField) {
+						keyFieldUsedInAgg = true;
+						break;
+					}
+				}
+
+				if (!keyFieldUsedInAgg) {
+					props.addForwardedField(keyField, keyField);
+				}
+			}
+
+			po.setSemanticProperties(props);
+
+			return po;
+		}
+		else if (this.grouping.getKeys() instanceof Keys.SelectorFunctionKeys) {
+			throw new UnsupportedOperationException("Aggregate does not support grouping with KeySelector functions, yet.");
+		}
+		else {
+			throw new UnsupportedOperationException("Unrecognized key type.");
+		}
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Combinable
+	public static final class AggregatingUdf<T extends Product> extends RichGroupReduceFunction<T, T> {
+		private static final long serialVersionUID = 1L;
+
+		private final int[] fieldPositions;
+
+		private final AggregationFunction<Object>[] aggFunctions;
+
+		private final TypeSerializerFactory<T> serializerFactory;
+
+		private transient TupleSerializerBase<T> serializer;
+
+		public AggregatingUdf(TypeSerializerFactory<T> serializerFactory, AggregationFunction<Object>[] aggFunctions, int[] fieldPositions) {
+			Validate.notNull(serializerFactory);
+			Validate.notNull(aggFunctions);
+			Validate.isTrue(aggFunctions.length == fieldPositions.length);
+			Validate.isTrue(serializerFactory.getSerializer() instanceof TupleSerializerBase);
+
+			this.serializerFactory = serializerFactory;
+			this.aggFunctions = aggFunctions;
+			this.fieldPositions = fieldPositions;
+		}
+
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			for (int i = 0; i < aggFunctions.length; i++) {
+				aggFunctions[i].initializeAggregate();
+			}
+			serializer = (TupleSerializerBase<T>)serializerFactory.getSerializer();
+		}
+
+		@Override
+		public void reduce(Iterable<T> records, Collector<T> out) {
+			final AggregationFunction<Object>[] aggFunctions = this.aggFunctions;
+			final int[] fieldPositions = this.fieldPositions;
+
+			// aggregators are initialized from before
+
+			T current = null;
+			final Iterator<T> values = records.iterator();
+			while (values.hasNext()) {
+				current = values.next();
+
+				for (int i = 0; i < fieldPositions.length; i++) {
+					Object val = current.productElement(fieldPositions[i]);
+					aggFunctions[i].aggregate(val);
+				}
+			}
+
+			Object[] fields = new Object[serializer.getArity()];
+			// First copy all tuple fields, then overwrite the aggregated ones
+			for (int i = 0; i < fieldPositions.length; i++) {
+				fields[0] = current.productElement(i);
+			}
+			for (int i = 0; i < fieldPositions.length; i++) {
+				Object aggVal = aggFunctions[i].getAggregate();
+				fields[fieldPositions[i]] = aggVal;
+				aggFunctions[i].initializeAggregate();
+			}
+
+			T result = serializer.createInstance(fields);
+
+			out.collect(result);
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
new file mode 100644
index 0000000..f563a8f
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.operators;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.StringUtils;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import scala.Product;
+
+public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFormat<OUT> {
+
+	private static final long serialVersionUID = 1L;
+	
+	private transient Object[] parsedValues;
+	
+	// To speed up readRecord processing. Used to find windows line endings.
+	// It is set when open so that readRecord does not have to evaluate it
+	private boolean lineDelimiterIsLinebreak = false;
+
+	private final TypeSerializerFactory<OUT> serializerFactory;
+
+	private transient TupleSerializerBase<OUT> serializer;
+	
+	public ScalaCsvInputFormat(Path filePath, TypeInformation<OUT> typeInfo) {
+		super(filePath);
+
+		TypeSerializer<OUT> serializer = typeInfo.createSerializer();
+		if (serializer.isStateful()) {
+			serializerFactory = new RuntimeStatefulSerializerFactory<OUT>(
+					serializer, typeInfo.getTypeClass());
+		} else {
+			serializerFactory = new RuntimeStatelessSerializerFactory<OUT>(
+					serializer, typeInfo.getTypeClass());
+		}
+
+		if (!(typeInfo.isTupleType())) {
+			throw new UnsupportedOperationException("This only works on tuple types.");
+		}
+		TupleTypeInfoBase<OUT> tupleType = (TupleTypeInfoBase<OUT>) typeInfo;
+		Class[] classes = new Class[tupleType.getArity()];
+		for (int i = 0; i < tupleType.getArity(); i++) {
+			classes[i] = tupleType.getTypeAt(i).getTypeClass();
+		}
+		setFieldTypes(classes);
+	}
+
+	public void setFieldTypes(Class<?>[] fieldTypes) {
+		if (fieldTypes == null || fieldTypes.length == 0) {
+			throw new IllegalArgumentException("Field types must not be null or empty.");
+		}
+
+		setFieldTypesGeneric(fieldTypes);
+	}
+
+	public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
+		Preconditions.checkNotNull(sourceFieldIndices);
+		Preconditions.checkNotNull(fieldTypes);
+
+		checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
+
+		setFieldsGeneric(sourceFieldIndices, fieldTypes);
+	}
+	
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		
+		@SuppressWarnings("unchecked")
+		FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
+		
+		//throw exception if no field parsers are available
+		if (fieldParsers.length == 0) {
+			throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
+		}
+		
+		// create the value holders
+		this.parsedValues = new Object[fieldParsers.length];
+		for (int i = 0; i < fieldParsers.length; i++) {
+			this.parsedValues[i] = fieldParsers[i].createValue();
+		}
+		
+		// left to right evaluation makes access [0] okay
+		// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
+		if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
+			this.lineDelimiterIsLinebreak = true;
+		}
+
+		serializer = (TupleSerializerBase<OUT>)serializerFactory.getSerializer();
+	}
+
+	@Override
+	public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
+		/*
+		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
+		 */
+		//Find windows end line, so find carriage return before the newline 
+		if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) {
+			//reduce the number of bytes so that the Carriage return is not taken as data
+			numBytes--;
+		}
+		
+		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
+			OUT result = serializer.createInstance(parsedValues);
+			return result;
+		} else {
+			return null;
+		}
+	}
+	
+	
+	@Override
+	public String toString() {
+		return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@SuppressWarnings("unused")
+	private static void checkAndCoSort(int[] positions, Class<?>[] types) {
+		if (positions.length != types.length) {
+			throw new IllegalArgumentException("The positions and types must be of the same length");
+		}
+		
+		TreeMap<Integer, Class<?>> map = new TreeMap<Integer, Class<?>>();
+		
+		for (int i = 0; i < positions.length; i++) {
+			if (positions[i] < 0) {
+				throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
+			}
+			if (types[i] == null) {
+				throw new IllegalArgumentException("The type " + i + " is invalid (null)");
+			}
+			
+			if (map.containsKey(positions[i])) {
+				throw new IllegalArgumentException("The position " + positions[i] + " occurs multiple times.");
+			}
+			
+			map.put(positions[i], types[i]);
+		}
+		
+		int i = 0;
+		for (Map.Entry<Integer, Class<?>> entry : map.entrySet()) {
+			positions[i] = entry.getKey();
+			types[i] = entry.getValue();
+			i++;
+		}
+	}
+	
+	private static void checkForMonotonousOrder(int[] positions, Class<?>[] types) {
+		if (positions.length != types.length) {
+			throw new IllegalArgumentException("The positions and types must be of the same length");
+		}
+		
+		int lastPos = -1;
+		
+		for (int i = 0; i < positions.length; i++) {
+			if (positions[i] < 0) {
+				throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
+			}
+			if (types[i] == null) {
+				throw new IllegalArgumentException("The type " + i + " is invalid (null)");
+			}
+			
+			if (positions[i] <= lastPos) {
+				throw new IllegalArgumentException("The positions must be strictly increasing (no permutations are supported).");
+			}
+			
+			lastPos = positions[i];
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
new file mode 100644
index 0000000..07fb3a2
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.scala.operators;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.TypeInformation;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import scala.Product;
+
+/**
+ * This is an OutputFormat to serialize Scala Tuples to text. The output is
+ * structured by record delimiters and field delimiters as common in CSV files.
+ * Record delimiter separate records from each other ('\n' is common). Field
+ * delimiters separate fields within a record.
+ */
+public class ScalaCsvOutputFormat<T extends Product> extends FileOutputFormat<T> implements InputTypeConfigurable {
+	private static final long serialVersionUID = 1L;
+
+	@SuppressWarnings("unused")
+	private static final Log LOG = LogFactory.getLog(ScalaCsvOutputFormat.class);
+
+	// --------------------------------------------------------------------------------------------
+
+	public static final String DEFAULT_LINE_DELIMITER = CsvInputFormat.DEFAULT_LINE_DELIMITER;
+
+	public static final String DEFAULT_FIELD_DELIMITER = String.valueOf(CsvInputFormat.DEFAULT_FIELD_DELIMITER);
+
+	// --------------------------------------------------------------------------------------------
+
+	private transient Writer wrt;
+
+	private String fieldDelimiter;
+
+	private String recordDelimiter;
+
+	private String charsetName;
+
+	private boolean allowNullValues = true;
+
+	private boolean quoteStrings = false;
+
+	// --------------------------------------------------------------------------------------------
+	// Constructors and getters/setters for the configurable parameters
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates an instance of CsvOutputFormat. Lines are separated by the newline character '\n',
+	 * fields are separated by ','.
+	 *
+	 * @param outputPath The path where the CSV file is written.
+	 */
+	public ScalaCsvOutputFormat(Path outputPath) {
+		this(outputPath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER);
+	}
+
+	/**
+	 * Creates an instance of CsvOutputFormat. Lines are separated by the newline character '\n',
+	 * fields by the given field delimiter.
+	 *
+	 * @param outputPath The path where the CSV file is written.
+	 * @param fieldDelimiter
+	 *            The delimiter that is used to separate fields in a tuple.
+	 */
+	public ScalaCsvOutputFormat(Path outputPath, String fieldDelimiter) {
+		this(outputPath, DEFAULT_LINE_DELIMITER, fieldDelimiter);
+	}
+
+	/**
+	 * Creates an instance of CsvOutputFormat.
+	 *
+	 * @param outputPath The path where the CSV file is written.
+	 * @param recordDelimiter
+	 *            The delimiter that is used to separate the tuples.
+	 * @param fieldDelimiter
+	 *            The delimiter that is used to separate fields in a tuple.
+	 */
+	public ScalaCsvOutputFormat(Path outputPath, String recordDelimiter, String fieldDelimiter) {
+		super(outputPath);
+		if (recordDelimiter == null) {
+			throw new IllegalArgumentException("RecordDelmiter shall not be null.");
+		}
+
+		if (fieldDelimiter == null) {
+			throw new IllegalArgumentException("FieldDelimiter shall not be null.");
+		}
+
+		this.fieldDelimiter = fieldDelimiter;
+		this.recordDelimiter = recordDelimiter;
+		this.allowNullValues = false;
+	}
+
+	/**
+	 * Configures the format to either allow null values (writing an empty field),
+	 * or to throw an exception when encountering a null field.
+	 * <p>
+	 * by default, null values are allowed.
+	 *
+	 * @param allowNulls Flag to indicate whether the output format should accept null values.
+	 */
+	public void setAllowNullValues(boolean allowNulls) {
+		this.allowNullValues = allowNulls;
+	}
+
+	/**
+	 * Sets the charset with which the CSV strings are written to the file.
+	 * If not specified, the output format uses the systems default character encoding.
+	 *
+	 * @param charsetName The name of charset to use for encoding the output.
+	 */
+	public void setCharsetName(String charsetName) {
+		this.charsetName = charsetName;
+	}
+
+	/**
+	 * Configures whether the output format should quote string values. String values are fields
+	 * of type {@link String} and {@link org.apache.flink.types.StringValue}, as well as
+	 * all subclasses of the latter.
+	 * <p>
+	 * By default, strings are not quoted.
+	 *
+	 * @param quoteStrings Flag indicating whether string fields should be quoted.
+	 */
+	public void setQuoteStrings(boolean quoteStrings) {
+		this.quoteStrings = quoteStrings;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		super.open(taskNumber, numTasks);
+		this.wrt = this.charsetName == null ? new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096)) :
+				new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096), this.charsetName);
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (wrt != null) {
+			this.wrt.close();
+		}
+		super.close();
+	}
+
+	@Override
+	public void writeRecord(T element) throws IOException {
+		int numFields = element.productArity();
+
+		for (int i = 0; i < numFields; i++) {
+			Object v = element.productElement(i);
+			if (v != null) {
+				if (i != 0) {
+					this.wrt.write(this.fieldDelimiter);
+				}
+
+				if (quoteStrings) {
+					if (v instanceof String || v instanceof StringValue) {
+						this.wrt.write('"');
+						this.wrt.write(v.toString());
+						this.wrt.write('"');
+					} else {
+						this.wrt.write(v.toString());
+					}
+				} else {
+					this.wrt.write(v.toString());
+				}
+			} else {
+				if (this.allowNullValues) {
+					if (i != 0) {
+						this.wrt.write(this.fieldDelimiter);
+					}
+				} else {
+					throw new RuntimeException("Cannot write tuple with <null> value at position: " + i);
+				}
+			}
+		}
+
+		// add the record delimiter
+		this.wrt.write(this.recordDelimiter);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	@Override
+	public String toString() {
+		return "CsvOutputFormat (path: " + this.getOutputFilePath() + ", delimiter: " + this.fieldDelimiter + ")";
+	}
+
+    /**
+	 *
+	 * The purpose of this method is solely to check whether the data type to be processed
+	 * is in fact a tuple type.
+	 */
+	@Override
+	public void setInputType(TypeInformation<?> type) {
+		if (!type.isTupleType()) {
+			throw new InvalidProgramException("The " + ScalaCsvOutputFormat.class.getSimpleName() +
+				" can only be used to write tuple data sets.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/AnnotationUtil.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/AnnotationUtil.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/AnnotationUtil.scala
deleted file mode 100644
index d23e94d..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/AnnotationUtil.scala
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import collection.JavaConversions.asScalaIterator
-
-import org.apache.flink.api.common.operators.util.FieldSet
-import org.apache.flink.api.common.operators._
-import org.apache.flink.api.common.operators.base.{GroupReduceOperatorBase, DeltaIterationBase, BulkIterationBase, GenericDataSourceBase}
-import org.apache.flink.api.java.record.functions.FunctionAnnotation
-import org.apache.flink.api.java.record.operators.BulkIteration.PartialSolutionPlaceHolder
-import org.apache.flink.api.java.record.operators.DeltaIteration.{WorksetPlaceHolder, SolutionSetPlaceHolder}
-import org.apache.flink.api.java.record.operators.GenericDataSink
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecond
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecondExcept
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable
-
-object AnnotationUtil {
-  val visited = collection.mutable.Set[Operator[_]]()
-
-  def setAnnotations(sinks: Seq[ScalaSink[_]]): Seq[ScalaSink[_]] = {
-    visited.clear()
-
-    sinks foreach setAnnotations
-
-    sinks
-  }
-
-  def setAnnotations(sink: ScalaSink[_]):Unit = {
-    setAnnotations(sink.sink.getInput)
-  }
-
-  def setAnnotations(operator: Operator[_]):Unit = {
-    if(operator != null && !visited.contains(operator)){
-      visited.add(operator)
-
-      operator match {
-        case op: GenericDataSourceBase[_,_] =>
-        case op: GenericDataSink =>
-          setAnnotations(op.getInput)
-        case op: PartialSolutionPlaceHolder =>
-        case op: SolutionSetPlaceHolder =>
-        case op: WorksetPlaceHolder =>
-        case op: DeltaIterationBase[_, _] =>
-          updateDualSemanticProperties(op)
-          setAnnotations(op.getSolutionSetDelta)
-          setAnnotations(op.getNextWorkset)
-          setAnnotations(op.getInitialWorkset)
-          setAnnotations(op.getInitialSolutionSet)
-        case op: DualInputOperator[_, _, _, _] =>
-          updateDualSemanticProperties(op)
-          setAnnotations(op.getFirstInput)
-          setAnnotations(op.getSecondInput)
-        case op: BulkIterationBase[_] =>
-          updateSingleSemanticProperties(op)
-          setAnnotations(op.getInput)
-          setAnnotations(op.getNextPartialSolution)
-          setAnnotations(op.getTerminationCriterion)
-        case op: GroupReduceOperatorBase[_, _, _] =>
-          updateCombinable(op)
-          setAnnotations(op.getInput)
-        case op: SingleInputOperator[_, _, _] =>
-          updateSingleSemanticProperties(op)
-          setAnnotations(op.getInput)
-      }
-    }
-  }
-
-  def updateCombinable(op: GroupReduceOperatorBase[_, _, _]){
-    if(op.isInstanceOf[ScalaOperator[_,_]]) {
-      val scalaOp = op.asInstanceOf[ScalaOperator[_, _]]
-
-      val combinableAnnotaion = scalaOp.getUserCodeAnnotation(classOf[Combinable])
-
-      if (combinableAnnotaion != null) {
-        op.setCombinable(true)
-      }
-    }
-  }
-
-  def updateDualSemanticProperties(op: DualInputOperator[_, _, _, _]){
-    if(op.isInstanceOf[ScalaOperator[_,_]]) {
-      val scalaOp = op.asInstanceOf[ScalaOperator[_, _]]
-      val properties = op.getSemanticProperties
-
-      // get readSet annotation from stub
-      val constantSet1Annotation: FunctionAnnotation.ConstantFieldsFirst = scalaOp.getUserCodeAnnotation(
-        classOf[FunctionAnnotation.ConstantFieldsFirst])
-      val constantSet2Annotation: FunctionAnnotation.ConstantFieldsSecond = scalaOp.getUserCodeAnnotation(
-        classOf[FunctionAnnotation.ConstantFieldsSecond])
-
-      // get readSet annotation from stub
-      val notConstantSet1Annotation: FunctionAnnotation.ConstantFieldsFirstExcept = scalaOp.getUserCodeAnnotation(
-        classOf[FunctionAnnotation.ConstantFieldsFirstExcept])
-      val notConstantSet2Annotation: FunctionAnnotation.ConstantFieldsSecondExcept = scalaOp.getUserCodeAnnotation(
-        classOf[FunctionAnnotation.ConstantFieldsSecondExcept])
-
-      if (notConstantSet1Annotation != null && constantSet1Annotation != null) {
-        throw new RuntimeException("Either ConstantFieldsFirst or ConstantFieldsFirstExcept can be specified, not both.")
-      }
-
-      if (constantSet2Annotation != null && notConstantSet2Annotation != null) {
-        throw new RuntimeException("Either ConstantFieldsSecond or ConstantFieldsSecondExcept can be specified, not both.")
-      }
-
-      // extract readSets from annotations
-      if (notConstantSet1Annotation != null) {
-        for (element <- notConstantSet1Annotation.value()) {
-          if (properties.getForwardedField1(element) != null) {
-            throw new RuntimeException("Field " + element + " cannot be forwarded and non constant at the same time.")
-          }
-        }
-
-        val fieldSet = new FieldSet(notConstantSet1Annotation.value(): _*)
-
-        for (i <- 0 until scalaOp.getUDF.getOutputLength) {
-          if (!fieldSet.contains(i)) {
-            properties.addForwardedField1(i, i)
-          }
-        }
-      } else if (constantSet1Annotation != null) {
-        for (value <- constantSet1Annotation.value) {
-          properties.addForwardedField1(value, value)
-        }
-      }
-
-      if (notConstantSet2Annotation != null) {
-        for (element <- notConstantSet2Annotation.value()) {
-          if (properties.getForwardedField2(element) != null) {
-            throw new RuntimeException("Field " + element + " cannot be forwarded and non constant at the same time.")
-          }
-        }
-
-        val fieldSet = new FieldSet(notConstantSet2Annotation.value(): _*)
-
-        for (i <- 0 until scalaOp.getUDF.getOutputLength) {
-          if (!fieldSet.contains(i)) {
-            properties.addForwardedField2(i, i)
-          }
-        }
-      } else if (constantSet2Annotation != null) {
-        for (value <- constantSet2Annotation.value) {
-          properties.addForwardedField2(value, value)
-        }
-      }
-
-      op.setSemanticProperties(properties)
-    }
-  }
-
-  def updateSingleSemanticProperties(op: SingleInputOperator[_, _, _]) {
-    if (op.isInstanceOf[ScalaOperator[_, _]]) {
-      val scalaOp = op.asInstanceOf[ScalaOperator[_, _]]
-      var properties = op.getSemanticProperties
-
-      if (properties == null) {
-        properties = new SingleInputSemanticProperties()
-      }
-
-      // get constantSet annotation from stub
-      val constantSet: FunctionAnnotation.ConstantFields =
-        scalaOp.getUserCodeAnnotation(classOf[FunctionAnnotation.ConstantFields])
-      val notConstantSet: FunctionAnnotation.ConstantFieldsExcept =
-        scalaOp.getUserCodeAnnotation(classOf[FunctionAnnotation.ConstantFieldsExcept])
-
-      if (notConstantSet != null && constantSet != null) {
-        throw new RuntimeException("Either ConstantFields or ConstantFieldsExcept can be specified, not both.")
-      }
-
-      // extract notConstantSet from annotation
-      if (notConstantSet != null) {
-        val nonConstant: FieldSet = new FieldSet(notConstantSet.value: _*)
-
-        for (element <- nonConstant.iterator()) {
-          if (properties.getForwardedField(element) != null) {
-            throw new RuntimeException("Field " + element + " is non constant and at the same time forwarded. This " +
-              "cannot happen.")
-          }
-        }
-
-        for (i <- 0 until scalaOp.getUDF.getOutputLength) {
-          if (!nonConstant.contains(i)) {
-            properties.addForwardedField(i, i)
-          }
-        }
-
-      } else if (constantSet != null) {
-        // extract constantSet from annotation
-        for (value <- constantSet.value) {
-          properties.addForwardedField(value, value)
-        }
-      }
-
-      op.setSemanticProperties(properties)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/CompilerHints.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CompilerHints.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CompilerHints.scala
deleted file mode 100644
index a03f5cf..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CompilerHints.scala
+++ /dev/null
@@ -1,386 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import language.experimental.macros
-import scala.reflect.macros.Context
-
-import scala.util.DynamicVariable
-
-import org.apache.flink.api.scala.codegen.MacroContextHolder
-import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.analysis.FieldSet.toSeq
-import org.apache.flink.api.scala.analysis.UDF2
-import org.apache.flink.api.scala.analysis.UDF1
-import org.apache.flink.api.scala.analysis.FieldSelector
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.common.operators.util.{FieldSet => PactFieldSet}
-import org.apache.flink.types.Record
-
-
-case class KeyCardinality(
-    key: FieldSelector,
-    isUnique: Boolean,
-    distinctCount: Option[Long],
-    avgNumRecords: Option[Float]) {
-
-  @transient private var pactFieldSets =
-    collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
-
-  def getPactFieldSet(contract: Operator[Record] with ScalaOperator[_, _]): PactFieldSet = {
-
-    if (pactFieldSets == null) {
-      pactFieldSets =
-        collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
-    }
-
-    val keyCopy = key.copy()
-    contract.getUDF.attachOutputsToInputs(keyCopy.inputFields)
-    val keySet = keyCopy.selectedFields.toIndexSet.toArray
-
-    val fieldSet = pactFieldSets.getOrElseUpdate(contract, new PactFieldSet(keySet, true))
-    fieldSet
-  }
-}
-
-trait OutputHintable[Out] { this: DataSet[Out] =>
-  def getContract = contract
-  
-  private var _cardinalities: List[KeyCardinality] = List[KeyCardinality]()
-  
-  def addCardinality(card: KeyCardinality) {
-    _cardinalities = card :: _cardinalities
-    applyHints(getContract)
-  }
-
-  def degreeOfParallelism = contract.getDegreeOfParallelism()
-  def degreeOfParallelism_=(value: Int) = contract.setDegreeOfParallelism(value)
-  def degreeOfParallelism(value: Int): this.type = { contract.setDegreeOfParallelism(value); this }
-    
-  def outputSize = contract.getCompilerHints().getOutputSize()
-  def outputSize_=(value: Long) = contract.getCompilerHints().setOutputSize(value)
-  def outputSize(value: Long): this.type = {
-    contract.getCompilerHints().setOutputSize(value)
-    this
-  }
-  
-  def outputCardinality = contract.getCompilerHints().getOutputCardinality()
-  def outputCardinality_=(value: Long) = contract.getCompilerHints().setOutputCardinality(value)
-  def outputCardinality(value: Long): this.type = {
-    contract.getCompilerHints().setOutputCardinality(value)
-    this
-  }
-  
-  def avgBytesPerRecord = contract.getCompilerHints().getAvgOutputRecordSize()
-  def avgBytesPerRecord_=(value: Float) = contract.getCompilerHints().setAvgOutputRecordSize(value)
-  def avgBytesPerRecord(value: Float): this.type = {
-    contract.getCompilerHints().setAvgOutputRecordSize(value)
-    this
-  }
-
-  def filterFactor = contract.getCompilerHints().getFilterFactor()
-  def filterFactor_=(value: Float) = contract.getCompilerHints().setFilterFactor(value)
-  def filterFactor(value: Float): this.type = {
-    contract.getCompilerHints().setFilterFactor(value)
-    this
-  }
-
-  def uniqueKey[Key](fields: Out => Key) = macro OutputHintableMacros.uniqueKey[Out, Key]
-
-  def applyHints(contract: Operator[Record] with ScalaOperator[_, _]): Unit = {
-    val hints = contract.getCompilerHints
-
-    if (hints.getUniqueFields != null) {
-      hints.getUniqueFields.clear()
-    }
-
-    _cardinalities.foreach { card =>
-
-      val fieldSet = card.getPactFieldSet(contract)
-
-      if (card.isUnique) {
-        hints.addUniqueField(fieldSet)
-      }
-    }
-  }
-}
-
-object OutputHintableMacros {
-  
-  def uniqueKey[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
-      (c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key])
-    : c.Expr[Unit] = {
-
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val generatedKeySelector = slave.getSelector(fields)
-
-    val result = reify {
-      val contract = c.prefix.splice.getContract
-      val hints = contract.getCompilerHints
-      
-      val keySelection = generatedKeySelector.splice
-      val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
-      val card = KeyCardinality(key, true, None, None)
-      
-      c.prefix.splice.addCardinality(card)
-    }
-    result
-  }
-  
-  def uniqueKeyWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
-      (c: Context { type PrefixType = OutputHintable[Out] })
-      (fields: c.Expr[Out => Key], distinctCount: c.Expr[Long])
-    : c.Expr[Unit] = {
-
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val generatedKeySelector = slave.getSelector(fields)
-
-    val result = reify {
-      val contract = c.prefix.splice.getContract
-      val hints = contract.getCompilerHints
-      
-      val keySelection = generatedKeySelector.splice
-      val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
-      val card = KeyCardinality(key, true, Some(distinctCount.splice), None)
-      
-      c.prefix.splice.addCardinality(card)
-    }
-    result
-  }
-  
-  def cardinality[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
-      (c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key])
-    : c.Expr[Unit] = {
-
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val generatedKeySelector = slave.getSelector(fields)
-
-    val result = reify {
-      val contract = c.prefix.splice.getContract
-      val hints = contract.getCompilerHints
-      
-      val keySelection = generatedKeySelector.splice
-      val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
-      val card = KeyCardinality(key, false, None, None)
-      
-      c.prefix.splice.addCardinality(card)
-    }
-    result
-  }
-  
-  def cardinalityWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
-      (c: Context { type PrefixType = OutputHintable[Out] })
-      (fields: c.Expr[Out => Key], distinctCount: c.Expr[Long])
-    : c.Expr[Unit] = {
-
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val generatedKeySelector = slave.getSelector(fields)
-
-    val result = reify {
-      val contract = c.prefix.splice.getContract
-      val hints = contract.getCompilerHints
-      
-      val keySelection = generatedKeySelector.splice
-      val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
-      val card = KeyCardinality(key, false, Some(distinctCount.splice), None)
-      
-      c.prefix.splice.addCardinality(card)
-    }
-    result
-  }
-  
-  def cardinalityWithAvgNumRecords[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
-      (c: Context { type PrefixType = OutputHintable[Out] })
-      (fields: c.Expr[Out => Key], avgNumRecords: c.Expr[Float]): c.Expr[Unit] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val generatedKeySelector = slave.getSelector(fields)
-
-    val result = reify {
-      val contract = c.prefix.splice.getContract
-      val hints = contract.getCompilerHints
-      
-      val keySelection = generatedKeySelector.splice
-      val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
-      val card = KeyCardinality(key, false, None, Some(avgNumRecords.splice))
-      
-      c.prefix.splice.addCardinality(card)
-    }
-    result
-  }
-  
-  def cardinalityWithAll[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
-      (c: Context { type PrefixType = OutputHintable[Out] })
-      (fields: c.Expr[Out => Key], distinctCount: c.Expr[Long], avgNumRecords: c.Expr[Float])
-    : c.Expr[Unit] = {
-
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val generatedKeySelector = slave.getSelector(fields)
-
-    val result = reify {
-      val contract = c.prefix.splice.getContract
-      
-      val keySelection = generatedKeySelector.splice
-      val key = new FieldSelector(contract.getUDF.outputUDT, keySelection)
-      val card = KeyCardinality(key, false, Some(distinctCount.splice), Some(avgNumRecords.splice))
-      
-      c.prefix.splice.addCardinality(card)
-    }
-    result
-  }
-}
-
-trait InputHintable[In, Out] { this: DataSet[Out] =>
-  def markUnread: Int => Unit
-  def markCopied: (Int, Int) => Unit
-  
-  def getInputUDT: UDT[In]
-  def getOutputUDT: UDT[Out]
-
-  def neglects[Fields](fields: In => Fields): Unit =
-    macro InputHintableMacros.neglects[In, Out, Fields]
-  def observes[Fields](fields: In => Fields): Unit =
-    macro InputHintableMacros.observes[In, Out, Fields]
-  def preserves[Fields](from: In => Fields, to: Out => Fields) =
-    macro InputHintableMacros.preserves[In, Out, Fields]
-}
-
-object InputHintable {
-
-  private val enabled = new DynamicVariable[Boolean](true)
-
-  def withEnabled[T](isEnabled: Boolean)(thunk: => T): T = enabled.withValue(isEnabled) { thunk }
-  
-}
-
-object InputHintableMacros {
-  
-  def neglects[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
-      (c: Context { type PrefixType = InputHintable[In, Out] })
-      (fields: c.Expr[In => Fields])
-    : c.Expr[Unit] = {
-
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val generatedFieldSelector = slave.getSelector(fields)
-
-    val result = reify {
-      val fieldSelection = generatedFieldSelector.splice
-      val fieldSelector = new FieldSelector(c.prefix.splice.getInputUDT, fieldSelection)
-      val unreadFields = fieldSelector.selectedFields.map(_.localPos).toSet
-      unreadFields.foreach(c.prefix.splice.markUnread(_))
-    }
-    result
-  }
-  
-  def observes[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
-      (c: Context { type PrefixType = InputHintable[In, Out] })
-      (fields: c.Expr[In => Fields])
-    : c.Expr[Unit] = {
-
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val generatedFieldSelector = slave.getSelector(fields)
-
-    val result = reify {
-      val fieldSelection = generatedFieldSelector.splice
-      val fieldSelector = new FieldSelector(c.prefix.splice.getInputUDT, fieldSelection)
-      val fieldSet = fieldSelector.selectedFields.map(_.localPos).toSet
-      val unreadFields = fieldSelector.inputFields.map(_.localPos).toSet.diff(fieldSet)
-      unreadFields.foreach(c.prefix.splice.markUnread(_))
-    }
-    result
-  }
-  
-  def preserves[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
-      (c: Context { type PrefixType = InputHintable[In, Out] })
-      (from: c.Expr[In => Fields], to: c.Expr[Out => Fields])
-    : c.Expr[Unit] = {
-
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-
-    val generatedFromFieldSelector = slave.getSelector(from)
-    val generatedToFieldSelector = slave.getSelector(to)
-
-    val result = reify {
-      val fromSelection = generatedFromFieldSelector.splice
-      val fromSelector = new FieldSelector(c.prefix.splice.getInputUDT, fromSelection)
-      val toSelection = generatedToFieldSelector.splice
-      val toSelector = new FieldSelector(c.prefix.splice.getOutputUDT, toSelection)
-      val pairs = fromSelector.selectedFields.map(_.localPos)
-        .zip(toSelector.selectedFields.map(_.localPos))
-      pairs.foreach(c.prefix.splice.markCopied.tupled)
-    }
-    result
-  }
-}
-
-trait OneInputHintable[In, Out] extends InputHintable[In, Out] with OutputHintable[Out] {
-  this: DataSet[Out] =>
-	override def markUnread = contract.getUDF.asInstanceOf[UDF1[In, Out]].markInputFieldUnread _ 
-	override def markCopied = contract.getUDF.asInstanceOf[UDF1[In, Out]].markFieldCopied _ 
-	
-	override def getInputUDT = contract.getUDF.asInstanceOf[UDF1[In, Out]].inputUDT
-	override def getOutputUDT = contract.getUDF.asInstanceOf[UDF1[In, Out]].outputUDT
-}
-
-trait TwoInputHintable[LeftIn, RightIn, Out] extends OutputHintable[Out] { this: DataSet[Out] =>
-  val left = new DataSet[Out](contract) with OneInputHintable[LeftIn, Out] {
-	override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]]
-    .markInputFieldUnread(Left(pos))}
-	override def markCopied = { (from: Int, to: Int) => contract.getUDF
-    .asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Left(from), to)}
-	override def getInputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].leftInputUDT
-	override def getOutputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].outputUDT
-  }
-  
-  val right = new DataSet[Out](contract) with OneInputHintable[RightIn, Out] {
-	override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]]
-    .markInputFieldUnread(Right(pos))}
-	override def markCopied = { (from: Int, to: Int) => contract.getUDF
-    .asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Right(from), to)}
-	override def getInputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].rightInputUDT
-	override def getOutputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].outputUDT
-  }
-}