You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:51 UTC
[09/60] Rewrite the Scala API as (somewhat) thin Layer on Java API
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 58a9a2a..b6193aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -53,6 +53,22 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
}
@Override
+ public T createInstance(Object[] fields) {
+ try {
+ T t = tupleClass.newInstance();
+
+ for (int i = 0; i < arity; i++) {
+ t.setField(fields[i], i);
+ }
+
+ return t;
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Cannot instantiate tuple.", e);
+ }
+ }
+
+ @Override
public T copy(T from, T reuse) {
for (int i = 0; i < arity; i++) {
Object copy = fieldSerializers[i].copy(from.getField(i), reuse.getField(i));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index 69133b6..3e630fd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -69,6 +69,14 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
return -1;
}
+ public int getArity() {
+ return arity;
+ }
+
+ // We use this in the Aggregate and Distinct Operators to create instances
+ // of immutable Typles (i.e. Scala Tuples)
+ public abstract T createInstance(Object[] fields);
+
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
for (int i = 0; i < arity; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index 3a88565..fdda028 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -46,9 +46,7 @@ under the License.
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
- <!-- These two requirements are the minimum to use and develop Flink.
- You can add others like <artifactId>flink-scala</artifactId> for Scala!
- -->
+ <!-- These two requirements are the minimum to use and develop Flink. -->
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
index 0c29856..f9dad03 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
@@ -1,108 +1,53 @@
-package ${package};
+package ${package}
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.flink.api.common.Program
-import org.apache.flink.api.common.ProgramDescription
-import org.apache.flink.client.LocalExecutor
-import org.apache.flink.api.scala.TextFile
-import org.apache.flink.api.scala.ScalaPlan
import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.operators._
-import org.apache.flink.client.RemoteExecutor
-
-// You can run this locally using:
-// mvn exec:exec -Dexec.executable="java" -Dexec.args="-cp %classpath ${package}.RunJobLocal 2 file:///some/path file:///some/other/path"
-object RunJobLocal {
- def main(args: Array[String]) {
- val job = new Job
- if (args.size < 3) {
- println(job.getDescription)
- return
- }
- val plan = job.getScalaPlan(args(0).toInt, args(1), args(2))
- LocalExecutor.execute(plan)
- System.exit(0)
- }
-}
-
-// You can run this on a cluster using:
-// mvn exec:exec -Dexec.executable="java" -Dexec.args="-cp %classpath ${package}.RunJobRemote 2 file:///some/path file:///some/other/path"
-object RunJobRemote {
- def main(args: Array[String]) {
- val job = new Job
- if (args.size < 3) {
- println(job.getDescription)
- return
- }
- val plan = job.getScalaPlan(args(0).toInt, args(1), args(2))
- // This will create an executor to run the plan on a cluster. We assume
- // that the JobManager is running on the local machine on the default
- // port. Change this according to your configuration.
- // You will also need to change the name of the jar if you change the
- // project name and/or version. Before running this you also need
- // to run "mvn package" to create the jar.
- val ex = new RemoteExecutor("localhost", 6123, "target/flink-project-0.1-SNAPSHOT.jar")
- ex.executePlan(plan)
- }
-}
-
/**
- * This is a outline for a Flink scala job. It is actually the WordCount
- * example from the here distribution.
+ * Skeleton for a Flink Job.
*
- * You can run it out of your IDE using the main() method of RunJob.
- * This will use the LocalExecutor to start a little Flink instance
- * out of your IDE.
+ * For a full example of a Flink Job, see the WordCountJob.scala file in the
+ * same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
- * cluster.
- * Just type
- * mvn clean package
- * in the projects root directory.
- * You will find the jar in
- * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
+ * cluster. Just type
+ * {{{
+ * mvn clean package
+ * }}}
+ * in the projects root directory. You will find the jar in
+ * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
*
*/
-class Job extends Program with ProgramDescription with Serializable {
- override def getDescription() = {
- "Parameters: [numSubStasks] [input] [output]"
- }
- override def getPlan(args: String*) = {
- getScalaPlan(args(0).toInt, args(1), args(2))
- }
-
- def formatOutput = (word: String, count: Int) => "%s %d".format(word, count)
-
- def getScalaPlan(numSubTasks: Int, textInput: String, wordsOutput: String) = {
- val input = TextFile(textInput)
-
- val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } map { (_, 1) } }
- val counts = words groupBy { case (word, _) => word } reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }
-
- counts neglects { case (word, _) => word }
- counts preserves({ case (word, _) => word }, { case (word, _) => word })
- val output = counts.write(wordsOutput, DelimitedOutputFormat(formatOutput.tupled))
-
- val plan = new ScalaPlan(Seq(output), "Word Count (immutable)")
- plan.setDefaultParallelism(numSubTasks)
- plan
+object Job {
+ def main(args: Array[String]) {
+ // set up the execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ /**
+ * Here, you can start creating your execution plan for Flink.
+ *
+ * Start with getting some data from the environment, like
+ * env.readTextFile(textPath);
+ *
+ * then, transform the resulting DataSet[String] using operations
+ * like:
+ * .filter()
+ * .flatMap()
+ * .join()
+ * .group()
+ *
+ * and many more.
+ * Have a look at the programming guide for the Scala API:
+ *
+ * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_guide.html
+ *
+ * and the examples
+ *
+ * http://flink.incubator.apache.org/docs/0.6-SNAPSHOT/java_api_examples.html
+ *
+ */
+
+
+ // execute program
+ env.execute("Flink Scala API Skeleton")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/WordCountJob.scala
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/WordCountJob.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/WordCountJob.scala
new file mode 100644
index 0000000..54408cb
--- /dev/null
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/WordCountJob.scala
@@ -0,0 +1,37 @@
+package ${package}
+
+import org.apache.flink.api.scala._
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over some sample data
+ *
+ * This example shows how to:
+ *
+ * - write a simple Flink program.
+ * - use Tuple data types.
+ * - write and use user-defined functions.
+ */
+object WordCountJob {
+ def main(args: Array[String]) {
+
+ // set up the execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // get input data
+ val text = env.fromElements("To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,")
+
+ val counts = text.flatMap { _.toLowerCase.split("\\W+") }
+ .map { (_, 1) }
+ .groupBy(0)
+ .sum(1)
+
+ // emit result
+ counts.print()
+
+ // execute program
+ env.execute("WordCount Example")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 6ce737c..45db390 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -39,7 +39,23 @@ under the License.
<version>${project.version}</version>
</dependency>
- <dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/java/org/apache/flink/api/scala/operators/Annotations.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/Annotations.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/Annotations.java
deleted file mode 100644
index ec7e9b7..0000000
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/Annotations.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.operators;
-
-import java.lang.annotation.Annotation;
-import java.util.Arrays;
-
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-
-public class Annotations {
-
- public static Annotation getConstantFields(int[] fields) {
- return new ConstantFields(fields);
- }
-
- public static Annotation getConstantFieldsFirst(int[] fields) {
- return new ConstantFieldsFirst(fields);
- }
-
- public static Annotation getConstantFieldsSecond(int[] fields) {
- return new ConstantFieldsSecond(fields);
- }
-
- public static Annotation getConstantFieldsExcept(int[] fields) {
- return new ConstantFieldsExcept(fields);
- }
-
- public static Annotation getConstantFieldsFirstExcept(int[] fields) {
- return new ConstantFieldsFirstExcept(fields);
- }
-
- public static Annotation getConstantFieldsSecondExcept(int[] fields) {
- return new ConstantFieldsSecondExcept(fields);
- }
-
- public static Annotation getCombinable() {
- return new Combinable();
- }
-
- private static abstract class Fields<T extends Annotation> implements Annotation {
-
- private final Class<T> clazz;
-
- private final int[] fields;
-
- public Fields(Class<T> clazz, int[] fields) {
- this.clazz = clazz;
- this.fields = fields;
- }
-
- public int[] value() {
- return fields;
- }
-
- @Override
- public Class<? extends Annotation> annotationType() {
- return clazz;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public boolean equals(Object obj) {
- if (obj == null || !annotationType().isAssignableFrom(obj.getClass())) {
- return false;
- }
-
- if (!annotationType().equals(((Annotation) obj).annotationType())) {
- return false;
- }
-
- int[] otherFields = getOtherFields((T) obj);
- return Arrays.equals(fields, otherFields);
- }
-
- protected abstract int[] getOtherFields(T other);
-
- @Override
- public int hashCode() {
- return 0xf16cd51b ^ Arrays.hashCode(fields);
- }
- }
-
- @SuppressWarnings("all")
- private static class ConstantFields extends Fields<FunctionAnnotation.ConstantFields> implements
- FunctionAnnotation.ConstantFields {
-
- public ConstantFields(int[] fields) {
- super(FunctionAnnotation.ConstantFields.class, fields);
- }
-
- @Override
- protected int[] getOtherFields(FunctionAnnotation.ConstantFields other) {
- return other.value();
- }
- }
-
- @SuppressWarnings("all")
- private static class ConstantFieldsFirst extends Fields<FunctionAnnotation.ConstantFieldsFirst> implements
- FunctionAnnotation.ConstantFieldsFirst {
-
- public ConstantFieldsFirst(int[] fields) {
- super(FunctionAnnotation.ConstantFieldsFirst.class, fields);
- }
-
- @Override
- protected int[] getOtherFields(FunctionAnnotation.ConstantFieldsFirst other) {
- return other.value();
- }
- }
-
- @SuppressWarnings("all")
- private static class ConstantFieldsSecond extends Fields<FunctionAnnotation.ConstantFieldsSecond> implements
- FunctionAnnotation.ConstantFieldsSecond {
-
- public ConstantFieldsSecond(int[] fields) {
- super(FunctionAnnotation.ConstantFieldsSecond.class, fields);
- }
-
- @Override
- protected int[] getOtherFields(FunctionAnnotation.ConstantFieldsSecond other) {
- return other.value();
- }
- }
-
- @SuppressWarnings("all")
- private static class ConstantFieldsExcept extends Fields<FunctionAnnotation.ConstantFieldsExcept> implements
- FunctionAnnotation.ConstantFieldsExcept {
-
- public ConstantFieldsExcept(int[] fields) {
- super(FunctionAnnotation.ConstantFieldsExcept.class, fields);
- }
-
- @Override
- protected int[] getOtherFields(FunctionAnnotation.ConstantFieldsExcept other) {
- return other.value();
- }
- }
-
- @SuppressWarnings("all")
- private static class ConstantFieldsFirstExcept extends Fields<FunctionAnnotation.ConstantFieldsFirstExcept> implements
- FunctionAnnotation.ConstantFieldsFirstExcept {
-
- public ConstantFieldsFirstExcept(int[] fields) {
- super(FunctionAnnotation.ConstantFieldsFirstExcept.class, fields);
- }
-
- @Override
- protected int[] getOtherFields(FunctionAnnotation.ConstantFieldsFirstExcept other) {
- return other.value();
- }
- }
-
- @SuppressWarnings("all")
- private static class ConstantFieldsSecondExcept extends Fields<FunctionAnnotation.ConstantFieldsSecondExcept> implements
- FunctionAnnotation.ConstantFieldsSecondExcept {
-
- public ConstantFieldsSecondExcept(int[] fields) {
- super(FunctionAnnotation.ConstantFieldsSecondExcept.class, fields);
- }
-
- @Override
- protected int[] getOtherFields(FunctionAnnotation.ConstantFieldsSecondExcept other) {
- return other.value();
- }
- }
-
- @SuppressWarnings("all")
- private static class Combinable implements Annotation, ReduceOperator.Combinable {
-
- public Combinable() {
- }
-
- @Override
- public Class<? extends Annotation> annotationType() {
- return ReduceOperator.Combinable.class;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || !annotationType().isAssignableFrom(obj.getClass())) {
- return false;
- }
-
- if (!annotationType().equals(((Annotation) obj).annotationType())) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
new file mode 100644
index 0000000..82a1dd5
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.operators;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.aggregation.AggregationFunction;
+import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.java.operators.Grouping;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.operators.SingleInputOperator;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import scala.Product;
+
+/**
+ * This operator represents the application of a "aggregate" operation on a data set, and the
+ * result data set produced by the function.
+ *
+ * @param <IN> The type of the data set aggregated by the operator.
+ */
+public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, ScalaAggregateOperator<IN>> {
+
+ private final List<AggregationFunction<?>> aggregationFunctions = new ArrayList<AggregationFunction<?>>(4);
+
+ private final List<Integer> fields = new ArrayList<Integer>(4);
+
+ private final Grouping<IN> grouping;
+
+ /**
+ * <p>
+ * Non grouped aggregation
+ */
+ public ScalaAggregateOperator(org.apache.flink.api.java.DataSet<IN> input, Aggregations function, int field) {
+ super(Validate.notNull(input), input.getType());
+
+ Validate.notNull(function);
+
+ if (!input.getType().isTupleType()) {
+ throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
+ }
+
+ TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getType();
+
+ if (field < 0 || field >= inType.getArity()) {
+ throw new IllegalArgumentException("Aggregation field position is out of range.");
+ }
+
+ AggregationFunctionFactory factory = function.getFactory();
+ AggregationFunction<?> aggFunct = factory.createAggregationFunction(inType.getTypeAt(field).getTypeClass());
+
+ // this is the first aggregation operator after a regular data set (non grouped aggregation)
+ this.aggregationFunctions.add(aggFunct);
+ this.fields.add(field);
+ this.grouping = null;
+ }
+
+ /**
+ *
+ * Grouped aggregation
+ *
+ * @param input
+ * @param function
+ * @param field
+ */
+ public ScalaAggregateOperator(Grouping<IN> input, Aggregations function, int field) {
+ super(Validate.notNull(input).getDataSet(), input.getDataSet().getType());
+
+ Validate.notNull(function);
+
+ if (!input.getDataSet().getType().isTupleType()) {
+ throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
+ }
+
+ TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getDataSet().getType();
+
+ if (field < 0 || field >= inType.getArity()) {
+ throw new IllegalArgumentException("Aggregation field position is out of range.");
+ }
+
+ AggregationFunctionFactory factory = function.getFactory();
+ AggregationFunction<?> aggFunct = factory.createAggregationFunction(inType.getTypeAt(field).getTypeClass());
+
+ // set the aggregation fields
+ this.aggregationFunctions.add(aggFunct);
+ this.fields.add(field);
+ this.grouping = input;
+ }
+
+
+ public ScalaAggregateOperator<IN> and(Aggregations function, int field) {
+ Validate.notNull(function);
+
+ TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) getType();
+
+ if (field < 0 || field >= inType.getArity()) {
+ throw new IllegalArgumentException("Aggregation field position is out of range.");
+ }
+
+
+ AggregationFunctionFactory factory = function.getFactory();
+ AggregationFunction<?> aggFunct = factory.createAggregationFunction(inType.getTypeAt(field).getTypeClass());
+
+ this.aggregationFunctions.add(aggFunct);
+ this.fields.add(field);
+
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> translateToDataFlow(Operator<IN> input) {
+
+ // sanity check
+ if (this.aggregationFunctions.isEmpty() || this.aggregationFunctions.size() != this.fields.size()) {
+ throw new IllegalStateException();
+ }
+
+
+ // construct the aggregation function
+ AggregationFunction<Object>[] aggFunctions = new AggregationFunction[this.aggregationFunctions.size()];
+ int[] fields = new int[this.fields.size()];
+ StringBuilder genName = new StringBuilder();
+
+ for (int i = 0; i < fields.length; i++) {
+ aggFunctions[i] = (AggregationFunction<Object>) this.aggregationFunctions.get(i);
+ fields[i] = this.fields.get(i);
+
+ genName.append(aggFunctions[i].toString()).append('(').append(fields[i]).append(')').append(',');
+ }
+ genName.setLength(genName.length()-1);
+
+ TypeSerializer<IN> serializer = getInputType().createSerializer();
+ TypeSerializerFactory<IN> serializerFactory = null;
+ if (serializer.isStateful()) {
+ serializerFactory = new RuntimeStatefulSerializerFactory<IN>(
+ serializer, getInputType().getTypeClass());
+ } else {
+ serializerFactory = new RuntimeStatelessSerializerFactory<IN>(
+ serializer, getInputType().getTypeClass());
+ }
+
+ @SuppressWarnings("rawtypes")
+ RichGroupReduceFunction<IN, IN> function = new AggregatingUdf(serializerFactory, aggFunctions, fields);
+
+
+ String name = getName() != null ? getName() : genName.toString();
+
+ // distinguish between grouped reduce and non-grouped reduce
+ if (this.grouping == null) {
+ // non grouped aggregation
+ UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getResultType());
+ GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po =
+ new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, new int[0], name);
+
+ po.setCombinable(true);
+
+ // set input
+ po.setInput(input);
+ // set dop
+ po.setDegreeOfParallelism(this.getParallelism());
+
+ return po;
+ }
+
+ if (this.grouping.getKeys() instanceof Keys.FieldPositionKeys) {
+ // grouped aggregation
+ int[] logicalKeyPositions = this.grouping.getKeys().computeLogicalKeyPositions();
+ UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getResultType());
+ GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po =
+ new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, logicalKeyPositions, name);
+
+ po.setCombinable(true);
+
+ // set input
+ po.setInput(input);
+ // set dop
+ po.setDegreeOfParallelism(this.getParallelism());
+
+ SingleInputSemanticProperties props = new SingleInputSemanticProperties();
+
+ for (int i = 0; i < logicalKeyPositions.length; i++) {
+ int keyField = logicalKeyPositions[i];
+ boolean keyFieldUsedInAgg = false;
+
+ for (int k = 0; k < fields.length; k++) {
+ int aggField = fields[k];
+ if (keyField == aggField) {
+ keyFieldUsedInAgg = true;
+ break;
+ }
+ }
+
+ if (!keyFieldUsedInAgg) {
+ props.addForwardedField(keyField, keyField);
+ }
+ }
+
+ po.setSemanticProperties(props);
+
+ return po;
+ }
+ else if (this.grouping.getKeys() instanceof Keys.SelectorFunctionKeys) {
+ throw new UnsupportedOperationException("Aggregate does not support grouping with KeySelector functions, yet.");
+ }
+ else {
+ throw new UnsupportedOperationException("Unrecognized key type.");
+ }
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Combinable
+ public static final class AggregatingUdf<T extends Product> extends RichGroupReduceFunction<T, T> {
+ private static final long serialVersionUID = 1L;
+
+ private final int[] fieldPositions;
+
+ private final AggregationFunction<Object>[] aggFunctions;
+
+ private final TypeSerializerFactory<T> serializerFactory;
+
+ private transient TupleSerializerBase<T> serializer;
+
+ public AggregatingUdf(TypeSerializerFactory<T> serializerFactory, AggregationFunction<Object>[] aggFunctions, int[] fieldPositions) {
+ Validate.notNull(serializerFactory);
+ Validate.notNull(aggFunctions);
+ Validate.isTrue(aggFunctions.length == fieldPositions.length);
+ Validate.isTrue(serializerFactory.getSerializer() instanceof TupleSerializerBase);
+
+ this.serializerFactory = serializerFactory;
+ this.aggFunctions = aggFunctions;
+ this.fieldPositions = fieldPositions;
+ }
+
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ for (int i = 0; i < aggFunctions.length; i++) {
+ aggFunctions[i].initializeAggregate();
+ }
+ serializer = (TupleSerializerBase<T>)serializerFactory.getSerializer();
+ }
+
+ @Override
+ public void reduce(Iterable<T> records, Collector<T> out) {
+ final AggregationFunction<Object>[] aggFunctions = this.aggFunctions;
+ final int[] fieldPositions = this.fieldPositions;
+
+ // aggregators are initialized from before
+
+ T current = null;
+ final Iterator<T> values = records.iterator();
+ while (values.hasNext()) {
+ current = values.next();
+
+ for (int i = 0; i < fieldPositions.length; i++) {
+ Object val = current.productElement(fieldPositions[i]);
+ aggFunctions[i].aggregate(val);
+ }
+ }
+
+ Object[] fields = new Object[serializer.getArity()];
+ // First copy all tuple fields, then overwrite the aggregated ones
+ for (int i = 0; i < fieldPositions.length; i++) {
+ fields[0] = current.productElement(i);
+ }
+ for (int i = 0; i < fieldPositions.length; i++) {
+ Object aggVal = aggFunctions[i].getAggregate();
+ fields[fieldPositions[i]] = aggVal;
+ aggFunctions[i].initializeAggregate();
+ }
+
+ T result = serializer.createInstance(fields);
+
+ out.collect(result);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
new file mode 100644
index 0000000..f563a8f
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.operators;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.StringUtils;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import scala.Product;
+
+public class ScalaCsvInputFormat<OUT extends Product> extends GenericCsvInputFormat<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Object[] parsedValues;
+
+ // To speed up readRecord processing. Used to find windows line endings.
+ // It is set when open so that readRecord does not have to evaluate it
+ private boolean lineDelimiterIsLinebreak = false;
+
+ private final TypeSerializerFactory<OUT> serializerFactory;
+
+ private transient TupleSerializerBase<OUT> serializer;
+
+ public ScalaCsvInputFormat(Path filePath, TypeInformation<OUT> typeInfo) {
+ super(filePath);
+
+ TypeSerializer<OUT> serializer = typeInfo.createSerializer();
+ if (serializer.isStateful()) {
+ serializerFactory = new RuntimeStatefulSerializerFactory<OUT>(
+ serializer, typeInfo.getTypeClass());
+ } else {
+ serializerFactory = new RuntimeStatelessSerializerFactory<OUT>(
+ serializer, typeInfo.getTypeClass());
+ }
+
+ if (!(typeInfo.isTupleType())) {
+ throw new UnsupportedOperationException("This only works on tuple types.");
+ }
+ TupleTypeInfoBase<OUT> tupleType = (TupleTypeInfoBase<OUT>) typeInfo;
+ Class[] classes = new Class[tupleType.getArity()];
+ for (int i = 0; i < tupleType.getArity(); i++) {
+ classes[i] = tupleType.getTypeAt(i).getTypeClass();
+ }
+ setFieldTypes(classes);
+ }
+
+ public void setFieldTypes(Class<?>[] fieldTypes) {
+ if (fieldTypes == null || fieldTypes.length == 0) {
+ throw new IllegalArgumentException("Field types must not be null or empty.");
+ }
+
+ setFieldTypesGeneric(fieldTypes);
+ }
+
+ public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) {
+ Preconditions.checkNotNull(sourceFieldIndices);
+ Preconditions.checkNotNull(fieldTypes);
+
+ checkForMonotonousOrder(sourceFieldIndices, fieldTypes);
+
+ setFieldsGeneric(sourceFieldIndices, fieldTypes);
+ }
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+
+ @SuppressWarnings("unchecked")
+ FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers();
+
+ //throw exception if no field parsers are available
+ if (fieldParsers.length == 0) {
+ throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input");
+ }
+
+ // create the value holders
+ this.parsedValues = new Object[fieldParsers.length];
+ for (int i = 0; i < fieldParsers.length; i++) {
+ this.parsedValues[i] = fieldParsers[i].createValue();
+ }
+
+ // left to right evaluation makes access [0] okay
+ // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
+ if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
+ this.lineDelimiterIsLinebreak = true;
+ }
+
+ serializer = (TupleSerializerBase<OUT>)serializerFactory.getSerializer();
+ }
+
+ @Override
+ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) {
+ /*
+ * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
+ */
+ //Find windows end line, so find carriage return before the newline
+ if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) {
+ //reduce the number of bytes so that the Carriage return is not taken as data
+ numBytes--;
+ }
+
+ if (parseRecord(parsedValues, bytes, offset, numBytes)) {
+ OUT result = serializer.createInstance(parsedValues);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+
+ @Override
+ public String toString() {
+ return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("unused")
+ private static void checkAndCoSort(int[] positions, Class<?>[] types) {
+ if (positions.length != types.length) {
+ throw new IllegalArgumentException("The positions and types must be of the same length");
+ }
+
+ TreeMap<Integer, Class<?>> map = new TreeMap<Integer, Class<?>>();
+
+ for (int i = 0; i < positions.length; i++) {
+ if (positions[i] < 0) {
+ throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
+ }
+ if (types[i] == null) {
+ throw new IllegalArgumentException("The type " + i + " is invalid (null)");
+ }
+
+ if (map.containsKey(positions[i])) {
+ throw new IllegalArgumentException("The position " + positions[i] + " occurs multiple times.");
+ }
+
+ map.put(positions[i], types[i]);
+ }
+
+ int i = 0;
+ for (Map.Entry<Integer, Class<?>> entry : map.entrySet()) {
+ positions[i] = entry.getKey();
+ types[i] = entry.getValue();
+ i++;
+ }
+ }
+
+ private static void checkForMonotonousOrder(int[] positions, Class<?>[] types) {
+ if (positions.length != types.length) {
+ throw new IllegalArgumentException("The positions and types must be of the same length");
+ }
+
+ int lastPos = -1;
+
+ for (int i = 0; i < positions.length; i++) {
+ if (positions[i] < 0) {
+ throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
+ }
+ if (types[i] == null) {
+ throw new IllegalArgumentException("The type " + i + " is invalid (null)");
+ }
+
+ if (positions[i] <= lastPos) {
+ throw new IllegalArgumentException("The positions must be strictly increasing (no permutations are supported).");
+ }
+
+ lastPos = positions[i];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
new file mode 100644
index 0000000..07fb3a2
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.scala.operators;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.TypeInformation;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import scala.Product;
+
+/**
+ * This is an OutputFormat to serialize Scala Tuples to text. The output is
+ * structured by record delimiters and field delimiters as common in CSV files.
+ * Record delimiter separate records from each other ('\n' is common). Field
+ * delimiters separate fields within a record.
+ */
+public class ScalaCsvOutputFormat<T extends Product> extends FileOutputFormat<T> implements InputTypeConfigurable {
+ private static final long serialVersionUID = 1L;
+
+ @SuppressWarnings("unused")
+ private static final Log LOG = LogFactory.getLog(ScalaCsvOutputFormat.class);
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final String DEFAULT_LINE_DELIMITER = CsvInputFormat.DEFAULT_LINE_DELIMITER;
+
+ public static final String DEFAULT_FIELD_DELIMITER = String.valueOf(CsvInputFormat.DEFAULT_FIELD_DELIMITER);
+
+ // --------------------------------------------------------------------------------------------
+
+ private transient Writer wrt;
+
+ private String fieldDelimiter;
+
+ private String recordDelimiter;
+
+ private String charsetName;
+
+ private boolean allowNullValues = true;
+
+ private boolean quoteStrings = false;
+
+ // --------------------------------------------------------------------------------------------
+ // Constructors and getters/setters for the configurable parameters
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates an instance of CsvOutputFormat. Lines are separated by the newline character '\n',
+ * fields are separated by ','.
+ *
+ * @param outputPath The path where the CSV file is written.
+ */
+ public ScalaCsvOutputFormat(Path outputPath) {
+ this(outputPath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER);
+ }
+
+ /**
+ * Creates an instance of CsvOutputFormat. Lines are separated by the newline character '\n',
+ * fields by the given field delimiter.
+ *
+ * @param outputPath The path where the CSV file is written.
+ * @param fieldDelimiter
+ * The delimiter that is used to separate fields in a tuple.
+ */
+ public ScalaCsvOutputFormat(Path outputPath, String fieldDelimiter) {
+ this(outputPath, DEFAULT_LINE_DELIMITER, fieldDelimiter);
+ }
+
+ /**
+ * Creates an instance of CsvOutputFormat.
+ *
+ * @param outputPath The path where the CSV file is written.
+ * @param recordDelimiter
+ * The delimiter that is used to separate the tuples.
+ * @param fieldDelimiter
+ * The delimiter that is used to separate fields in a tuple.
+ */
+ public ScalaCsvOutputFormat(Path outputPath, String recordDelimiter, String fieldDelimiter) {
+ super(outputPath);
+ if (recordDelimiter == null) {
+ throw new IllegalArgumentException("RecordDelmiter shall not be null.");
+ }
+
+ if (fieldDelimiter == null) {
+ throw new IllegalArgumentException("FieldDelimiter shall not be null.");
+ }
+
+ this.fieldDelimiter = fieldDelimiter;
+ this.recordDelimiter = recordDelimiter;
+ this.allowNullValues = false;
+ }
+
+ /**
+ * Configures the format to either allow null values (writing an empty field),
+ * or to throw an exception when encountering a null field.
+ * <p>
+ * by default, null values are allowed.
+ *
+ * @param allowNulls Flag to indicate whether the output format should accept null values.
+ */
+ public void setAllowNullValues(boolean allowNulls) {
+ this.allowNullValues = allowNulls;
+ }
+
+ /**
+ * Sets the charset with which the CSV strings are written to the file.
+ * If not specified, the output format uses the systems default character encoding.
+ *
+ * @param charsetName The name of charset to use for encoding the output.
+ */
+ public void setCharsetName(String charsetName) {
+ this.charsetName = charsetName;
+ }
+
+ /**
+ * Configures whether the output format should quote string values. String values are fields
+ * of type {@link String} and {@link org.apache.flink.types.StringValue}, as well as
+ * all subclasses of the latter.
+ * <p>
+ * By default, strings are not quoted.
+ *
+ * @param quoteStrings Flag indicating whether string fields should be quoted.
+ */
+ public void setQuoteStrings(boolean quoteStrings) {
+ this.quoteStrings = quoteStrings;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ super.open(taskNumber, numTasks);
+ this.wrt = this.charsetName == null ? new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096)) :
+ new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096), this.charsetName);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (wrt != null) {
+ this.wrt.close();
+ }
+ super.close();
+ }
+
+ @Override
+ public void writeRecord(T element) throws IOException {
+ int numFields = element.productArity();
+
+ for (int i = 0; i < numFields; i++) {
+ Object v = element.productElement(i);
+ if (v != null) {
+ if (i != 0) {
+ this.wrt.write(this.fieldDelimiter);
+ }
+
+ if (quoteStrings) {
+ if (v instanceof String || v instanceof StringValue) {
+ this.wrt.write('"');
+ this.wrt.write(v.toString());
+ this.wrt.write('"');
+ } else {
+ this.wrt.write(v.toString());
+ }
+ } else {
+ this.wrt.write(v.toString());
+ }
+ } else {
+ if (this.allowNullValues) {
+ if (i != 0) {
+ this.wrt.write(this.fieldDelimiter);
+ }
+ } else {
+ throw new RuntimeException("Cannot write tuple with <null> value at position: " + i);
+ }
+ }
+ }
+
+ // add the record delimiter
+ this.wrt.write(this.recordDelimiter);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ @Override
+ public String toString() {
+ return "CsvOutputFormat (path: " + this.getOutputFilePath() + ", delimiter: " + this.fieldDelimiter + ")";
+ }
+
+ /**
+ *
+ * The purpose of this method is solely to check whether the data type to be processed
+ * is in fact a tuple type.
+ */
+ @Override
+ public void setInputType(TypeInformation<?> type) {
+ if (!type.isTupleType()) {
+ throw new InvalidProgramException("The " + ScalaCsvOutputFormat.class.getSimpleName() +
+ " can only be used to write tuple data sets.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/AnnotationUtil.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/AnnotationUtil.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/AnnotationUtil.scala
deleted file mode 100644
index d23e94d..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/AnnotationUtil.scala
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import collection.JavaConversions.asScalaIterator
-
-import org.apache.flink.api.common.operators.util.FieldSet
-import org.apache.flink.api.common.operators._
-import org.apache.flink.api.common.operators.base.{GroupReduceOperatorBase, DeltaIterationBase, BulkIterationBase, GenericDataSourceBase}
-import org.apache.flink.api.java.record.functions.FunctionAnnotation
-import org.apache.flink.api.java.record.operators.BulkIteration.PartialSolutionPlaceHolder
-import org.apache.flink.api.java.record.operators.DeltaIteration.{WorksetPlaceHolder, SolutionSetPlaceHolder}
-import org.apache.flink.api.java.record.operators.GenericDataSink
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecond
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecondExcept
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable
-
-object AnnotationUtil {
- val visited = collection.mutable.Set[Operator[_]]()
-
- def setAnnotations(sinks: Seq[ScalaSink[_]]): Seq[ScalaSink[_]] = {
- visited.clear()
-
- sinks foreach setAnnotations
-
- sinks
- }
-
- def setAnnotations(sink: ScalaSink[_]):Unit = {
- setAnnotations(sink.sink.getInput)
- }
-
- def setAnnotations(operator: Operator[_]):Unit = {
- if(operator != null && !visited.contains(operator)){
- visited.add(operator)
-
- operator match {
- case op: GenericDataSourceBase[_,_] =>
- case op: GenericDataSink =>
- setAnnotations(op.getInput)
- case op: PartialSolutionPlaceHolder =>
- case op: SolutionSetPlaceHolder =>
- case op: WorksetPlaceHolder =>
- case op: DeltaIterationBase[_, _] =>
- updateDualSemanticProperties(op)
- setAnnotations(op.getSolutionSetDelta)
- setAnnotations(op.getNextWorkset)
- setAnnotations(op.getInitialWorkset)
- setAnnotations(op.getInitialSolutionSet)
- case op: DualInputOperator[_, _, _, _] =>
- updateDualSemanticProperties(op)
- setAnnotations(op.getFirstInput)
- setAnnotations(op.getSecondInput)
- case op: BulkIterationBase[_] =>
- updateSingleSemanticProperties(op)
- setAnnotations(op.getInput)
- setAnnotations(op.getNextPartialSolution)
- setAnnotations(op.getTerminationCriterion)
- case op: GroupReduceOperatorBase[_, _, _] =>
- updateCombinable(op)
- setAnnotations(op.getInput)
- case op: SingleInputOperator[_, _, _] =>
- updateSingleSemanticProperties(op)
- setAnnotations(op.getInput)
- }
- }
- }
-
- def updateCombinable(op: GroupReduceOperatorBase[_, _, _]){
- if(op.isInstanceOf[ScalaOperator[_,_]]) {
- val scalaOp = op.asInstanceOf[ScalaOperator[_, _]]
-
- val combinableAnnotaion = scalaOp.getUserCodeAnnotation(classOf[Combinable])
-
- if (combinableAnnotaion != null) {
- op.setCombinable(true)
- }
- }
- }
-
- def updateDualSemanticProperties(op: DualInputOperator[_, _, _, _]){
- if(op.isInstanceOf[ScalaOperator[_,_]]) {
- val scalaOp = op.asInstanceOf[ScalaOperator[_, _]]
- val properties = op.getSemanticProperties
-
- // get readSet annotation from stub
- val constantSet1Annotation: FunctionAnnotation.ConstantFieldsFirst = scalaOp.getUserCodeAnnotation(
- classOf[FunctionAnnotation.ConstantFieldsFirst])
- val constantSet2Annotation: FunctionAnnotation.ConstantFieldsSecond = scalaOp.getUserCodeAnnotation(
- classOf[FunctionAnnotation.ConstantFieldsSecond])
-
- // get readSet annotation from stub
- val notConstantSet1Annotation: FunctionAnnotation.ConstantFieldsFirstExcept = scalaOp.getUserCodeAnnotation(
- classOf[FunctionAnnotation.ConstantFieldsFirstExcept])
- val notConstantSet2Annotation: FunctionAnnotation.ConstantFieldsSecondExcept = scalaOp.getUserCodeAnnotation(
- classOf[FunctionAnnotation.ConstantFieldsSecondExcept])
-
- if (notConstantSet1Annotation != null && constantSet1Annotation != null) {
- throw new RuntimeException("Either ConstantFieldsFirst or ConstantFieldsFirstExcept can be specified, not both.")
- }
-
- if (constantSet2Annotation != null && notConstantSet2Annotation != null) {
- throw new RuntimeException("Either ConstantFieldsSecond or ConstantFieldsSecondExcept can be specified, not both.")
- }
-
- // extract readSets from annotations
- if (notConstantSet1Annotation != null) {
- for (element <- notConstantSet1Annotation.value()) {
- if (properties.getForwardedField1(element) != null) {
- throw new RuntimeException("Field " + element + " cannot be forwarded and non constant at the same time.")
- }
- }
-
- val fieldSet = new FieldSet(notConstantSet1Annotation.value(): _*)
-
- for (i <- 0 until scalaOp.getUDF.getOutputLength) {
- if (!fieldSet.contains(i)) {
- properties.addForwardedField1(i, i)
- }
- }
- } else if (constantSet1Annotation != null) {
- for (value <- constantSet1Annotation.value) {
- properties.addForwardedField1(value, value)
- }
- }
-
- if (notConstantSet2Annotation != null) {
- for (element <- notConstantSet2Annotation.value()) {
- if (properties.getForwardedField2(element) != null) {
- throw new RuntimeException("Field " + element + " cannot be forwarded and non constant at the same time.")
- }
- }
-
- val fieldSet = new FieldSet(notConstantSet2Annotation.value(): _*)
-
- for (i <- 0 until scalaOp.getUDF.getOutputLength) {
- if (!fieldSet.contains(i)) {
- properties.addForwardedField2(i, i)
- }
- }
- } else if (constantSet2Annotation != null) {
- for (value <- constantSet2Annotation.value) {
- properties.addForwardedField2(value, value)
- }
- }
-
- op.setSemanticProperties(properties)
- }
- }
-
- def updateSingleSemanticProperties(op: SingleInputOperator[_, _, _]) {
- if (op.isInstanceOf[ScalaOperator[_, _]]) {
- val scalaOp = op.asInstanceOf[ScalaOperator[_, _]]
- var properties = op.getSemanticProperties
-
- if (properties == null) {
- properties = new SingleInputSemanticProperties()
- }
-
- // get constantSet annotation from stub
- val constantSet: FunctionAnnotation.ConstantFields =
- scalaOp.getUserCodeAnnotation(classOf[FunctionAnnotation.ConstantFields])
- val notConstantSet: FunctionAnnotation.ConstantFieldsExcept =
- scalaOp.getUserCodeAnnotation(classOf[FunctionAnnotation.ConstantFieldsExcept])
-
- if (notConstantSet != null && constantSet != null) {
- throw new RuntimeException("Either ConstantFields or ConstantFieldsExcept can be specified, not both.")
- }
-
- // extract notConstantSet from annotation
- if (notConstantSet != null) {
- val nonConstant: FieldSet = new FieldSet(notConstantSet.value: _*)
-
- for (element <- nonConstant.iterator()) {
- if (properties.getForwardedField(element) != null) {
- throw new RuntimeException("Field " + element + " is non constant and at the same time forwarded. This " +
- "cannot happen.")
- }
- }
-
- for (i <- 0 until scalaOp.getUDF.getOutputLength) {
- if (!nonConstant.contains(i)) {
- properties.addForwardedField(i, i)
- }
- }
-
- } else if (constantSet != null) {
- // extract constantSet from annotation
- for (value <- constantSet.value) {
- properties.addForwardedField(value, value)
- }
- }
-
- op.setSemanticProperties(properties)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/CompilerHints.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/CompilerHints.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/CompilerHints.scala
deleted file mode 100644
index a03f5cf..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/CompilerHints.scala
+++ /dev/null
@@ -1,386 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import language.experimental.macros
-import scala.reflect.macros.Context
-
-import scala.util.DynamicVariable
-
-import org.apache.flink.api.scala.codegen.MacroContextHolder
-import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.analysis.FieldSet.toSeq
-import org.apache.flink.api.scala.analysis.UDF2
-import org.apache.flink.api.scala.analysis.UDF1
-import org.apache.flink.api.scala.analysis.FieldSelector
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.common.operators.util.{FieldSet => PactFieldSet}
-import org.apache.flink.types.Record
-
-
-case class KeyCardinality(
- key: FieldSelector,
- isUnique: Boolean,
- distinctCount: Option[Long],
- avgNumRecords: Option[Float]) {
-
- @transient private var pactFieldSets =
- collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
-
- def getPactFieldSet(contract: Operator[Record] with ScalaOperator[_, _]): PactFieldSet = {
-
- if (pactFieldSets == null) {
- pactFieldSets =
- collection.mutable.Map[Operator[Record] with ScalaOperator[_, _], PactFieldSet]()
- }
-
- val keyCopy = key.copy()
- contract.getUDF.attachOutputsToInputs(keyCopy.inputFields)
- val keySet = keyCopy.selectedFields.toIndexSet.toArray
-
- val fieldSet = pactFieldSets.getOrElseUpdate(contract, new PactFieldSet(keySet, true))
- fieldSet
- }
-}
-
-trait OutputHintable[Out] { this: DataSet[Out] =>
- def getContract = contract
-
- private var _cardinalities: List[KeyCardinality] = List[KeyCardinality]()
-
- def addCardinality(card: KeyCardinality) {
- _cardinalities = card :: _cardinalities
- applyHints(getContract)
- }
-
- def degreeOfParallelism = contract.getDegreeOfParallelism()
- def degreeOfParallelism_=(value: Int) = contract.setDegreeOfParallelism(value)
- def degreeOfParallelism(value: Int): this.type = { contract.setDegreeOfParallelism(value); this }
-
- def outputSize = contract.getCompilerHints().getOutputSize()
- def outputSize_=(value: Long) = contract.getCompilerHints().setOutputSize(value)
- def outputSize(value: Long): this.type = {
- contract.getCompilerHints().setOutputSize(value)
- this
- }
-
- def outputCardinality = contract.getCompilerHints().getOutputCardinality()
- def outputCardinality_=(value: Long) = contract.getCompilerHints().setOutputCardinality(value)
- def outputCardinality(value: Long): this.type = {
- contract.getCompilerHints().setOutputCardinality(value)
- this
- }
-
- def avgBytesPerRecord = contract.getCompilerHints().getAvgOutputRecordSize()
- def avgBytesPerRecord_=(value: Float) = contract.getCompilerHints().setAvgOutputRecordSize(value)
- def avgBytesPerRecord(value: Float): this.type = {
- contract.getCompilerHints().setAvgOutputRecordSize(value)
- this
- }
-
- def filterFactor = contract.getCompilerHints().getFilterFactor()
- def filterFactor_=(value: Float) = contract.getCompilerHints().setFilterFactor(value)
- def filterFactor(value: Float): this.type = {
- contract.getCompilerHints().setFilterFactor(value)
- this
- }
-
- def uniqueKey[Key](fields: Out => Key) = macro OutputHintableMacros.uniqueKey[Out, Key]
-
- def applyHints(contract: Operator[Record] with ScalaOperator[_, _]): Unit = {
- val hints = contract.getCompilerHints
-
- if (hints.getUniqueFields != null) {
- hints.getUniqueFields.clear()
- }
-
- _cardinalities.foreach { card =>
-
- val fieldSet = card.getPactFieldSet(contract)
-
- if (card.isUnique) {
- hints.addUniqueField(fieldSet)
- }
- }
- }
-}
-
-object OutputHintableMacros {
-
- def uniqueKey[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
- (c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key])
- : c.Expr[Unit] = {
-
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val generatedKeySelector = slave.getSelector(fields)
-
- val result = reify {
- val contract = c.prefix.splice.getContract
- val hints = contract.getCompilerHints
-
- val keySelection = generatedKeySelector.splice
- val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
- val card = KeyCardinality(key, true, None, None)
-
- c.prefix.splice.addCardinality(card)
- }
- result
- }
-
- def uniqueKeyWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
- (c: Context { type PrefixType = OutputHintable[Out] })
- (fields: c.Expr[Out => Key], distinctCount: c.Expr[Long])
- : c.Expr[Unit] = {
-
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val generatedKeySelector = slave.getSelector(fields)
-
- val result = reify {
- val contract = c.prefix.splice.getContract
- val hints = contract.getCompilerHints
-
- val keySelection = generatedKeySelector.splice
- val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
- val card = KeyCardinality(key, true, Some(distinctCount.splice), None)
-
- c.prefix.splice.addCardinality(card)
- }
- result
- }
-
- def cardinality[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
- (c: Context { type PrefixType = OutputHintable[Out] })(fields: c.Expr[Out => Key])
- : c.Expr[Unit] = {
-
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val generatedKeySelector = slave.getSelector(fields)
-
- val result = reify {
- val contract = c.prefix.splice.getContract
- val hints = contract.getCompilerHints
-
- val keySelection = generatedKeySelector.splice
- val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
- val card = KeyCardinality(key, false, None, None)
-
- c.prefix.splice.addCardinality(card)
- }
- result
- }
-
- def cardinalityWithDistinctCount[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
- (c: Context { type PrefixType = OutputHintable[Out] })
- (fields: c.Expr[Out => Key], distinctCount: c.Expr[Long])
- : c.Expr[Unit] = {
-
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val generatedKeySelector = slave.getSelector(fields)
-
- val result = reify {
- val contract = c.prefix.splice.getContract
- val hints = contract.getCompilerHints
-
- val keySelection = generatedKeySelector.splice
- val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
- val card = KeyCardinality(key, false, Some(distinctCount.splice), None)
-
- c.prefix.splice.addCardinality(card)
- }
- result
- }
-
- def cardinalityWithAvgNumRecords[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
- (c: Context { type PrefixType = OutputHintable[Out] })
- (fields: c.Expr[Out => Key], avgNumRecords: c.Expr[Float]): c.Expr[Unit] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val generatedKeySelector = slave.getSelector(fields)
-
- val result = reify {
- val contract = c.prefix.splice.getContract
- val hints = contract.getCompilerHints
-
- val keySelection = generatedKeySelector.splice
- val key = new FieldSelector(c.prefix.splice.getContract.getUDF.outputUDT, keySelection)
- val card = KeyCardinality(key, false, None, Some(avgNumRecords.splice))
-
- c.prefix.splice.addCardinality(card)
- }
- result
- }
-
- def cardinalityWithAll[Out: c.WeakTypeTag, Key: c.WeakTypeTag]
- (c: Context { type PrefixType = OutputHintable[Out] })
- (fields: c.Expr[Out => Key], distinctCount: c.Expr[Long], avgNumRecords: c.Expr[Float])
- : c.Expr[Unit] = {
-
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val generatedKeySelector = slave.getSelector(fields)
-
- val result = reify {
- val contract = c.prefix.splice.getContract
-
- val keySelection = generatedKeySelector.splice
- val key = new FieldSelector(contract.getUDF.outputUDT, keySelection)
- val card = KeyCardinality(key, false, Some(distinctCount.splice), Some(avgNumRecords.splice))
-
- c.prefix.splice.addCardinality(card)
- }
- result
- }
-}
-
-trait InputHintable[In, Out] { this: DataSet[Out] =>
- def markUnread: Int => Unit
- def markCopied: (Int, Int) => Unit
-
- def getInputUDT: UDT[In]
- def getOutputUDT: UDT[Out]
-
- def neglects[Fields](fields: In => Fields): Unit =
- macro InputHintableMacros.neglects[In, Out, Fields]
- def observes[Fields](fields: In => Fields): Unit =
- macro InputHintableMacros.observes[In, Out, Fields]
- def preserves[Fields](from: In => Fields, to: Out => Fields) =
- macro InputHintableMacros.preserves[In, Out, Fields]
-}
-
-object InputHintable {
-
- private val enabled = new DynamicVariable[Boolean](true)
-
- def withEnabled[T](isEnabled: Boolean)(thunk: => T): T = enabled.withValue(isEnabled) { thunk }
-
-}
-
-object InputHintableMacros {
-
- def neglects[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
- (c: Context { type PrefixType = InputHintable[In, Out] })
- (fields: c.Expr[In => Fields])
- : c.Expr[Unit] = {
-
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val generatedFieldSelector = slave.getSelector(fields)
-
- val result = reify {
- val fieldSelection = generatedFieldSelector.splice
- val fieldSelector = new FieldSelector(c.prefix.splice.getInputUDT, fieldSelection)
- val unreadFields = fieldSelector.selectedFields.map(_.localPos).toSet
- unreadFields.foreach(c.prefix.splice.markUnread(_))
- }
- result
- }
-
- def observes[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
- (c: Context { type PrefixType = InputHintable[In, Out] })
- (fields: c.Expr[In => Fields])
- : c.Expr[Unit] = {
-
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val generatedFieldSelector = slave.getSelector(fields)
-
- val result = reify {
- val fieldSelection = generatedFieldSelector.splice
- val fieldSelector = new FieldSelector(c.prefix.splice.getInputUDT, fieldSelection)
- val fieldSet = fieldSelector.selectedFields.map(_.localPos).toSet
- val unreadFields = fieldSelector.inputFields.map(_.localPos).toSet.diff(fieldSet)
- unreadFields.foreach(c.prefix.splice.markUnread(_))
- }
- result
- }
-
- def preserves[In: c.WeakTypeTag, Out: c.WeakTypeTag, Fields: c.WeakTypeTag]
- (c: Context { type PrefixType = InputHintable[In, Out] })
- (from: c.Expr[In => Fields], to: c.Expr[Out => Fields])
- : c.Expr[Unit] = {
-
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val generatedFromFieldSelector = slave.getSelector(from)
- val generatedToFieldSelector = slave.getSelector(to)
-
- val result = reify {
- val fromSelection = generatedFromFieldSelector.splice
- val fromSelector = new FieldSelector(c.prefix.splice.getInputUDT, fromSelection)
- val toSelection = generatedToFieldSelector.splice
- val toSelector = new FieldSelector(c.prefix.splice.getOutputUDT, toSelection)
- val pairs = fromSelector.selectedFields.map(_.localPos)
- .zip(toSelector.selectedFields.map(_.localPos))
- pairs.foreach(c.prefix.splice.markCopied.tupled)
- }
- result
- }
-}
-
-trait OneInputHintable[In, Out] extends InputHintable[In, Out] with OutputHintable[Out] {
- this: DataSet[Out] =>
- override def markUnread = contract.getUDF.asInstanceOf[UDF1[In, Out]].markInputFieldUnread _
- override def markCopied = contract.getUDF.asInstanceOf[UDF1[In, Out]].markFieldCopied _
-
- override def getInputUDT = contract.getUDF.asInstanceOf[UDF1[In, Out]].inputUDT
- override def getOutputUDT = contract.getUDF.asInstanceOf[UDF1[In, Out]].outputUDT
-}
-
-trait TwoInputHintable[LeftIn, RightIn, Out] extends OutputHintable[Out] { this: DataSet[Out] =>
- val left = new DataSet[Out](contract) with OneInputHintable[LeftIn, Out] {
- override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]]
- .markInputFieldUnread(Left(pos))}
- override def markCopied = { (from: Int, to: Int) => contract.getUDF
- .asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Left(from), to)}
- override def getInputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].leftInputUDT
- override def getOutputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].outputUDT
- }
-
- val right = new DataSet[Out](contract) with OneInputHintable[RightIn, Out] {
- override def markUnread = { pos: Int => contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]]
- .markInputFieldUnread(Right(pos))}
- override def markCopied = { (from: Int, to: Int) => contract.getUDF
- .asInstanceOf[UDF2[LeftIn, RightIn, Out]].markFieldCopied(Right(from), to)}
- override def getInputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].rightInputUDT
- override def getOutputUDT = contract.getUDF.asInstanceOf[UDF2[LeftIn, RightIn, Out]].outputUDT
- }
-}