You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:50 UTC
[08/60] Rewrite the Scala API as (somewhat) thin Layer on Java API
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 11ee48f..041f269 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -16,57 +16,822 @@
* limitations under the License.
*/
-
package org.apache.flink.api.scala
-import language.experimental.macros
-import scala.reflect.macros.Context
-
-import org.apache.flink.api.scala.operators._
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.types.Record
-
-
-class DataSet[T] (val contract: Operator[Record] with ScalaOperator[T, Record]) {
-
- def cogroup[RightIn](rightInput: DataSet[RightIn]) =
- new CoGroupDataSet[T, RightIn](this, rightInput)
- def cross[RightIn](rightInput: DataSet[RightIn]) = new CrossDataSet[T, RightIn](this, rightInput)
- def join[RightIn](rightInput: DataSet[RightIn]) = new JoinDataSet[T, RightIn](this, rightInput)
-
- def map[Out](fun: T => Out) = macro MapMacros.map[T, Out]
- def flatMap[Out](fun: T => Iterator[Out]) = macro MapMacros.flatMap[T, Out]
- def filter(fun: T => Boolean) = macro MapMacros.filter[T]
-
- // reduce
- def groupBy[Key](keyFun: T => Key) = macro ReduceMacros.groupBy[T, Key]
-
- // reduce without grouping
- def reduce(fun: (T, T) => T) = macro ReduceMacros.globalReduce[T]
- def reduceAll[Out](fun: Iterator[T] => Out) = macro ReduceMacros.globalReduceGroup[T, Out]
- def combinableReduceAll[Out](fun: Iterator[T] => Out) =
- macro ReduceMacros.combinableGlobalReduceGroup[T]
-
- def union(secondInput: DataSet[T]) = UnionOperator.impl[T](this, secondInput)
-
- def iterateWithDelta[DeltaItem](stepFunction: DataSet[T] => (DataSet[T], DataSet[DeltaItem])) =
- macro IterateMacros.iterateWithDelta[T, DeltaItem]
- def iterate(n: Int, stepFunction: DataSet[T] => DataSet[T])= macro IterateMacros.iterate[T]
- def iterateWithTermination[C](
- n: Int,
- stepFunction: DataSet[T] => DataSet[T],
- terminationFunction: (DataSet[T],DataSet[T]) => DataSet[C]) =
- macro IterateMacros.iterateWithTermination[T, C]
- def iterateWithDelta[SolutionKey, WorksetItem](
- workset: DataSet[WorksetItem],
- solutionSetKey: T => SolutionKey,
- stepFunction: (DataSet[T], DataSet[WorksetItem]) =>
- (DataSet[T], DataSet[WorksetItem]), maxIterations: Int) =
- macro WorksetIterateMacros.iterateWithDelta[T, SolutionKey, WorksetItem]
-
- def write(url: String, format: ScalaOutputFormat[T]) = DataSinkOperator.write(this, url, format)
- def write(url: String, format: ScalaOutputFormat[T], name: String) =
- DataSinkOperator.write(this, url, format, name)
-
+import org.apache.commons.lang3.Validate
+import org.apache.flink.api.common.aggregators.Aggregator
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat}
+import org.apache.flink.api.java.operators.JoinOperator.JoinHint
+import org.apache.flink.api.java.operators.Keys.FieldPositionKeys
+import org.apache.flink.api.java.operators._
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+import org.apache.flink.api.scala.operators.{ScalaCsvOutputFormat, ScalaAggregateOperator}
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.core.fs.{FileSystem, Path}
+import org.apache.flink.types.TypeInformation
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+/**
+ * The DataSet, the basic abstraction of Flink. This represents a collection of elements of a
+ * specific type `T`. The operations in this class can be used to create new DataSets and to combine
+ * two DataSets. The methods of [[ExecutionEnvironment]] can be used to create a DataSet from an
+ * external source, such as files in HDFS. The `write*` methods can be used to write the elements
+ * to storage.
+ *
+ * All operations accept either a lambda function or an operation-specific function object for
+ * specifying the operation. For example, using a lambda:
+ * {{{
+ * val input: DataSet[String] = ...
+ * val mapped = input flatMap { _.split(" ") }
+ * }}}
+ * And using a `MapFunction`:
+ * {{{
+ * val input: DataSet[String] = ...
+ * val mapped = input flatMap { new FlatMapFunction[String, String] {
+ * def flatMap(in: String, out: Collector[String]): Unit = {
+ * in.split(" ") foreach { out.collect(_) }
+ * }
+ * }
+ * }}}
+ *
+ * A rich function can be used when more control is required, for example for accessing the
+ * `RuntimeContext`. The rich function for `flatMap` is `RichFlatMapFunction`, all other functions
+ * are named similarly. All functions are available in package
+ * `org.apache.flink.api.common.functions`.
+ *
+ * The elements are partitioned depending on the degree of parallelism of the
+ * [[ExecutionEnvironment]] or of one specific DataSet.
+ *
+ * Most of the operations have an implicit [[TypeInformation]] parameter. This is supplied by
+ * an implicit conversion in the `flink.api.scala` Package. For this to work,
+ * [[createTypeInformation]] needs to be imported. This is normally achieved with a
+ * {{{
+ * import org.apache.flink.api.scala._
+ * }}}
+ *
+ * @tparam T The type of the DataSet, i.e., the type of the elements of the DataSet.
+ */
+class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) {
+ Validate.notNull(set, "Java DataSet must not be null.")
+
+ // --------------------------------------------------------------------------------------------
+ // General methods
+ // --------------------------------------------------------------------------------------------
+ // These are actually implemented in subclasses of the Java DataSet but we perform checking
+ // here and just pass through the calls to make everything much simpler.
+
+ /**
+ * Sets the name of the DataSet. This will appear in logs and graphical
+ * representations of the execution graph.
+ */
+ def name(name: String) = {
+ set match {
+ case ds: DataSource[_] => ds.name(name)
+ case op: Operator[_, _] => op.name(name)
+ case di: DeltaIterationResultSet[_, _] => di.getIterationHead.name(name)
+ case _ =>
+ throw new UnsupportedOperationException("Operator " + set.toString + " cannot have a name.")
+ }
+ // return this for chaining methods calls
+ this
+ }
+
+ /**
+ * Sets the degree of parallelism of this operation. This must be greater than 1.
+ */
+ def setParallelism(dop: Int) = {
+ set match {
+ case ds: DataSource[_] => ds.setParallelism(dop)
+ case op: Operator[_, _] => op.setParallelism(dop)
+ case di: DeltaIterationResultSet[_, _] => di.getIterationHead.parallelism(dop)
+ case _ =>
+ throw new UnsupportedOperationException("Operator " + set.toString + " cannot have " +
+ "parallelism.")
+ }
+ this
+ }
+
+ /**
+ * Returns the degree of parallelism of this operation.
+ */
+ def getParallelism: Int = set match {
+ case ds: DataSource[_] => ds.getParallelism
+ case op: Operator[_, _] => op.getParallelism
+ case _ =>
+ throw new UnsupportedOperationException("Operator " + set.toString + " does not have " +
+ "parallelism.")
+ }
+
+ /**
+ * Registers an [[org.apache.flink.api.common.aggregators.Aggregator]]
+ * for the iteration. Aggregators can be used to maintain simple statistics during the
+ * iteration, such as number of elements processed. The aggregators compute global aggregates:
+ * After each iteration step, the values are globally aggregated to produce one aggregate that
+ * represents statistics across all parallel instances.
+ * The value of an aggregator can be accessed in the next iteration.
+ *
+ * Aggregators can be accessed inside a function via
+ * [[org.apache.flink.api.common.functions.AbstractRichFunction#getIterationRuntimeContext]].
+ *
+ * @param name The name under which the aggregator is registered.
+ * @param aggregator The aggregator class.
+ */
+ def registerAggregator(name: String, aggregator: Aggregator[_]): DataSet[T] = {
+ set match {
+ case di: DeltaIterationResultSet[_, _] =>
+ di.getIterationHead.registerAggregator(name, aggregator)
+ case _ =>
+ throw new UnsupportedOperationException("Operator " + set.toString + " cannot have " +
+ "aggregators.")
+ }
+ this
+ }
+
+ /**
+ * Adds a certain data set as a broadcast set to this operator. Broadcast data sets are
+ * available at all
+ * parallel instances of this operator. A broadcast data set is registered under a certain
+ * name, and can be
+ * retrieved under that name from the operators runtime context via
+ * `org.apache.flink.api.common.functions.RuntimeContext.getBroadCastVariable(String)`
+ *
+ * The runtime context itself is available in all UDFs via
+ * `org.apache.flink.api.common.functions.AbstractRichFunction#getRuntimeContext()`
+ *
+ * @param data The data set to be broadcasted.
+ * @param name The name under which the broadcast data set retrieved.
+ * @return The operator itself, to allow chaining function calls.
+ */
+ def withBroadcastSet(data: DataSet[_], name: String) = {
+ set match {
+ case udfOp: UdfOperator[_] => udfOp.withBroadcastSet(data.set, name)
+ case _ =>
+ throw new UnsupportedOperationException("Operator " + set.toString + " cannot have " +
+ "broadcast variables.")
+ }
+ this
+ }
+
+ def withConstantSet(constantSets: String*) = {
+ set match {
+ case op: SingleInputUdfOperator[_, _, _] => op.withConstantSet(constantSets: _*)
+ case _ =>
+ throw new UnsupportedOperationException("Cannot specify constant sets on Operator " +
+ set.toString + ".")
+ }
+ this
+ }
+
+ def withConstantSetFirst(constantSets: String*) = {
+ set match {
+ case op: TwoInputUdfOperator[_, _, _, _] => op.withConstantSetFirst(constantSets: _*)
+ case _ =>
+ throw new UnsupportedOperationException("Cannot specify constant sets on Operator " + set
+ .toString + ".")
+ }
+ this
+ }
+
+ def withConstantSetSecond(constantSets: String*) = {
+ set match {
+ case op: TwoInputUdfOperator[_, _, _, _] => op.withConstantSetSecond(constantSets: _*)
+ case _ =>
+ throw new UnsupportedOperationException("Cannot specify constant sets on Operator " + set
+ .toString + ".")
+ }
+ this
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Filter & Transformations
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a new DataSet by applying the given function to every element of this DataSet.
+ */
+ def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R] = {
+ if (mapper == null) {
+ throw new NullPointerException("Map function must not be null.")
+ }
+ wrap(new MapOperator[T, R](set, implicitly[TypeInformation[R]], mapper))
+ }
+
+ /**
+ * Creates a new DataSet by applying the given function to every element of this DataSet.
+ */
+ def map[R: TypeInformation: ClassTag](fun: (T) => R): DataSet[R] = {
+ if (fun == null) {
+ throw new NullPointerException("Map function must not be null.")
+ }
+ val mapper = new MapFunction[T, R] {
+ def map(in: T): R = fun(in)
+ }
+ wrap(new MapOperator[T, R](set, implicitly[TypeInformation[R]], mapper))
+ }
+
+ /**
+ * Creates a new DataSet by applying the given function to each parallel partition of the
+ * DataSet.
+ *
+ * This function is intended for operations that cannot transform individual elements and
+ * requires no grouping of elements. To transform individual elements,
+ * the use of [[map]] and [[flatMap]] is preferable.
+ */
+ def mapPartition[R: TypeInformation: ClassTag](
+ partitionMapper: MapPartitionFunction[T, R]): DataSet[R] = {
+ if (partitionMapper == null) {
+ throw new NullPointerException("MapPartition function must not be null.")
+ }
+ wrap(new MapPartitionOperator[T, R](set, implicitly[TypeInformation[R]], partitionMapper))
+ }
+
+ /**
+ * Creates a new DataSet by applying the given function to each parallel partition of the
+ * DataSet.
+ *
+ * This function is intended for operations that cannot transform individual elements and
+ * requires no grouping of elements. To transform individual elements,
+ * the use of [[map]] and [[flatMap]] is preferable.
+ */
+ def mapPartition[R: TypeInformation: ClassTag](
+ fun: (TraversableOnce[T], Collector[R]) => Unit): DataSet[R] = {
+ if (fun == null) {
+ throw new NullPointerException("MapPartition function must not be null.")
+ }
+ val partitionMapper = new MapPartitionFunction[T, R] {
+ def mapPartition(in: java.lang.Iterable[T], out: Collector[R]) {
+ fun(in.iterator().asScala, out)
+ }
+ }
+ wrap(new MapPartitionOperator[T, R](set, implicitly[TypeInformation[R]], partitionMapper))
+ }
+
+ /**
+ * Creates a new DataSet by applying the given function to each parallel partition of the
+ * DataSet.
+ *
+ * This function is intended for operations that cannot transform individual elements and
+ * requires no grouping of elements. To transform individual elements,
+ * the use of [[map]] and [[flatMap]] is preferable.
+ */
+ def mapPartition[R: TypeInformation: ClassTag](
+ fun: (TraversableOnce[T]) => TraversableOnce[R]): DataSet[R] = {
+ if (fun == null) {
+ throw new NullPointerException("MapPartition function must not be null.")
+ }
+ val partitionMapper = new MapPartitionFunction[T, R] {
+ def mapPartition(in: java.lang.Iterable[T], out: Collector[R]) {
+ fun(in.iterator().asScala) foreach out.collect
+ }
+ }
+ wrap(new MapPartitionOperator[T, R](set, implicitly[TypeInformation[R]], partitionMapper))
+ }
+
+ /**
+ * Creates a new DataSet by applying the given function to every element and flattening
+ * the results.
+ */
+ def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R] = {
+ if (flatMapper == null) {
+ throw new NullPointerException("FlatMap function must not be null.")
+ }
+ wrap(new FlatMapOperator[T, R](set, implicitly[TypeInformation[R]], flatMapper))
+ }
+
+ /**
+ * Creates a new DataSet by applying the given function to every element and flattening
+ * the results.
+ */
+ def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataSet[R] = {
+ if (fun == null) {
+ throw new NullPointerException("FlatMap function must not be null.")
+ }
+ val flatMapper = new FlatMapFunction[T, R] {
+ def flatMap(in: T, out: Collector[R]) { fun(in, out) }
+ }
+ wrap(new FlatMapOperator[T, R](set, implicitly[TypeInformation[R]], flatMapper))
+ }
+
+ /**
+ * Creates a new DataSet by applying the given function to every element and flattening
+ * the results.
+ */
+ def flatMap[R: TypeInformation: ClassTag](fun: (T) => TraversableOnce[R]): DataSet[R] = {
+ if (fun == null) {
+ throw new NullPointerException("FlatMap function must not be null.")
+ }
+ val flatMapper = new FlatMapFunction[T, R] {
+ def flatMap(in: T, out: Collector[R]) { fun(in) foreach out.collect }
+ }
+ wrap(new FlatMapOperator[T, R](set, implicitly[TypeInformation[R]], flatMapper))
+ }
+
+ /**
+ * Creates a new DataSet that contains only the elements satisfying the given filter predicate.
+ */
+ def filter(filter: FilterFunction[T]): DataSet[T] = {
+ if (filter == null) {
+ throw new NullPointerException("Filter function must not be null.")
+ }
+ wrap(new FilterOperator[T](set, filter))
+ }
+
+ /**
+ * Creates a new DataSet that contains only the elements satisfying the given filter predicate.
+ */
+ def filter(fun: (T) => Boolean): DataSet[T] = {
+ if (fun == null) {
+ throw new NullPointerException("Filter function must not be null.")
+ }
+ val filter = new FilterFunction[T] {
+ def filter(in: T) = fun(in)
+ }
+ wrap(new FilterOperator[T](set, filter))
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Non-grouped aggregations
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a new [[DataSet]] by aggregating the specified tuple field using the given aggregation
+ * function. Since this is not a keyed DataSet the aggregation will be performed on the whole
+ * collection of elements.
+ *
+ * This only works on Tuple DataSets.
+ */
+ def aggregate(agg: Aggregations, field: Int): DataSet[T] = set match {
+ case aggregation: ScalaAggregateOperator[T] =>
+ aggregation.and(agg, field)
+ wrap(aggregation)
+
+ case _ => wrap(new ScalaAggregateOperator[T](set, agg, field))
+ }
+
+ /**
+ * Syntactic sugar for [[aggregate]] with `SUM`
+ */
+ def sum(field: Int) = {
+ aggregate(Aggregations.SUM, field)
+ }
+
+ /**
+ * Syntactic sugar for [[aggregate]] with `MAX`
+ */
+ def max(field: Int) = {
+ aggregate(Aggregations.MAX, field)
+ }
+
+ /**
+ * Syntactic sugar for [[aggregate]] with `MIN`
+ */
+ def min(field: Int) = {
+ aggregate(Aggregations.MIN, field)
+ }
+
+ /**
+ * Creates a new [[DataSet]] by merging the elements of this DataSet using an associative reduce
+ * function.
+ */
+ def reduce(reducer: ReduceFunction[T]): DataSet[T] = {
+ if (reducer == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+ wrap(new ReduceOperator[T](set, reducer))
+ }
+
+ /**
+ * Creates a new [[DataSet]] by merging the elements of this DataSet using an associative reduce
+ * function.
+ */
+ def reduce(fun: (T, T) => T): DataSet[T] = {
+ if (fun == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+ val reducer = new ReduceFunction[T] {
+ def reduce(v1: T, v2: T) = { fun(v1, v2) }
+ }
+ wrap(new ReduceOperator[T](set, reducer))
+ }
+
+ /**
+ * Creates a new [[DataSet]] by passing all elements in this DataSet to the group reduce function.
+ * The function can output zero or more elements using the [[Collector]]. The concatenation of the
+ * emitted values will form the resulting [[DataSet]].
+ */
+ def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R] = {
+ if (reducer == null) {
+ throw new NullPointerException("GroupReduce function must not be null.")
+ }
+ wrap(new GroupReduceOperator[T, R](set, implicitly[TypeInformation[R]], reducer))
+ }
+
+ /**
+ * Creates a new [[DataSet]] by passing all elements in this DataSet to the group reduce function.
+ * The function can output zero or more elements using the [[Collector]]. The concatenation of the
+ * emitted values will form the resulting [[DataSet]].
+ */
+ def reduceGroup[R: TypeInformation: ClassTag](
+ fun: (TraversableOnce[T], Collector[R]) => Unit): DataSet[R] = {
+ if (fun == null) {
+ throw new NullPointerException("GroupReduce function must not be null.")
+ }
+ val reducer = new GroupReduceFunction[T, R] {
+ def reduce(in: java.lang.Iterable[T], out: Collector[R]) { fun(in.iterator().asScala, out) }
+ }
+ wrap(new GroupReduceOperator[T, R](set, implicitly[TypeInformation[R]], reducer))
+ }
+
+ /**
+ * Creates a new [[DataSet]] by passing all elements in this DataSet to the group reduce function.
+ */
+ def reduceGroup[R: TypeInformation: ClassTag](fun: (TraversableOnce[T]) => R): DataSet[R] = {
+ if (fun == null) {
+ throw new NullPointerException("GroupReduce function must not be null.")
+ }
+ val reducer = new GroupReduceFunction[T, R] {
+ def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
+ out.collect(fun(in.iterator().asScala))
+ }
+ }
+ wrap(new GroupReduceOperator[T, R](set, implicitly[TypeInformation[R]], reducer))
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // distinct
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether
+ * two elements are distinct or not is made using the return value of the given function.
+ */
+ def distinct[K: TypeInformation](fun: (T) => K): DataSet[T] = {
+ val keyExtractor = new KeySelector[T, K] {
+ def getKey(in: T) = fun(in)
+ }
+ wrap(new DistinctOperator[T](
+ set,
+ new Keys.SelectorFunctionKeys[T, K](
+ keyExtractor, set.getType, implicitly[TypeInformation[K]])))
+ }
+
+ /**
+ * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether
+ * two elements are distinct or not is made based on only the specified tuple fields.
+ *
+ * This only works if this DataSet contains Tuples.
+ */
+ def distinct(fields: Int*): DataSet[T] = {
+ wrap(new DistinctOperator[T](
+ set,
+ new Keys.FieldPositionKeys[T](fields.toArray, set.getType, true)))
+ }
+
+ /**
+ * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether
+ * two elements are distinct or not is made based on all tuple fields.
+ *
+ * This only works if this DataSet contains Tuples.
+ */
+ def distinct: DataSet[T] = {
+ wrap(new DistinctOperator[T](set, null))
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Keyed DataSet
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a [[GroupedDataSet]] which provides operations on groups of elements. Elements are
+ * grouped based on the value returned by the given function.
+ *
+ * This will not create a new DataSet, it will just attach the key function which will be used
+ * for grouping when executing a grouped operation.
+ */
+ def groupBy[K: TypeInformation](fun: (T) => K): GroupedDataSet[T] = {
+ val keyType = implicitly[TypeInformation[K]]
+ val keyExtractor = new KeySelector[T, K] {
+ def getKey(in: T) = fun(in)
+ }
+ new GroupedDataSetImpl[T](set,
+ new Keys.SelectorFunctionKeys[T, K](keyExtractor, set.getType, keyType))
+ }
+
+ /**
+ * Creates a [[GroupedDataSet]] which provides operations on groups of elements. Elements are
+ * grouped based on the given tuple fields.
+ *
+ * This will not create a new DataSet, it will just attach the tuple field positions which will be
+ * used for grouping when executing a grouped operation.
+ *
+ * This only works on Tuple DataSets.
+ */
+ def groupBy(fields: Int*): GroupedDataSet[T] = {
+ new GroupedDataSetImpl[T](
+ set,
+ new Keys.FieldPositionKeys[T](fields.toArray, set.getType,false))
+ }
+
+ // public UnsortedGrouping<T> groupBy(String... fields) {
+ // new UnsortedGrouping<T>(this, new Keys.ExpressionKeys<T>(fields, getType()));
+ // }
+
+ // --------------------------------------------------------------------------------------------
+ // Joining
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a new DataSet by joining `this` DataSet with the `other` DataSet. To specify the join
+ * keys the `where` and `isEqualTo` methods must be used. For example:
+ * {{{
+ * val left: DataSet[(String, Int, Int)] = ...
+ * val right: DataSet[(Int, String, Int)] = ...
+ * val joined = left.join(right).where(0).isEqualTo(1)
+ * }}}
+ *
+ * The default join result is a DataSet with 2-Tuples of the joined values. In the above example
+ * that would be `((String, Int, Int), (Int, String, Int))`. A custom join function can be used
+ * if more control over the result is required. This can either be given as a lambda or a
+ * custom [[JoinFunction]]. For example:
+ * {{{
+ * val left: DataSet[(String, Int, Int)] = ...
+ * val right: DataSet[(Int, String, Int)] = ...
+ * val joined = left.join(right).where(0).isEqualTo(1) { (l, r) =>
+ * if (l._2 > 4) {
+ * Some((l._2, r._3))
+ * } else {
+ * None
+ * }
+ * }
+ * }}}
+ * This can be used to implement a filter directly in the join or to output more than one values:
+ * {{{
+ * val left: DataSet[(String, Int, Int)] = ...
+ * val right: DataSet[(Int, String, Int)] = ...
+ * val joined = left.join(right).where(0).isEqualTo(1) {
+ * (l, r, out: Collector[(String, Int)]) =>
+ * if (l._2 > 4) {
+ * out.collect((l._1, r._3))
+ * out.collect((l._1, r._1))
+ * } else {
+ * None
+ * }
+ * }
+ * }}}
+ */
+ def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
+ new UnfinishedJoinOperationImpl(this.set, other.set, JoinHint.OPTIMIZER_CHOOSES)
+
+ /**
+ * Special [[join]] operation for explicitly telling the system that the right side is assumed
+ * to be a lot smaller than the left side of the join.
+ */
+ def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
+ new UnfinishedJoinOperationImpl(this.set, other.set, JoinHint.BROADCAST_HASH_SECOND)
+
+ /**
+ * Special [[join]] operation for explicitly telling the system that the left side is assumed
+ * to be a lot smaller than the right side of the join.
+ */
+ def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
+ new UnfinishedJoinOperationImpl(this.set, other.set, JoinHint.BROADCAST_HASH_FIRST)
+
+ // --------------------------------------------------------------------------------------------
+ // Co-Group
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * For each key in `this` DataSet and the `other` DataSet, create a tuple containing a list
+ * of elements for that key from both DataSets. To specify the join keys the `where` and
+ * `isEqualTo` methods must be used. For example:
+ * {{{
+ * val left: DataSet[(String, Int, Int)] = ...
+ * val right: DataSet[(Int, String, Int)] = ...
+ * val coGrouped = left.coGroup(right).where(0).isEqualTo(1)
+ * }}}
+ *
+ * A custom coGroup function can be used
+ * if more control over the result is required. This can either be given as a lambda or a
+ * custom [[CoGroupFunction]]. For example:
+ * {{{
+ * val left: DataSet[(String, Int, Int)] = ...
+ * val right: DataSet[(Int, String, Int)] = ...
+ * val coGrouped = left.coGroup(right).where(0).isEqualTo(1) { (l, r) =>
+ * // l and r are of type TraversableOnce
+ * Some((l.min, r.max))
+ * }
+ * }}}
+ * This can be used to implement a filter directly in the coGroup or to output more than one
+ * values:
+ * {{{
+ * val left: DataSet[(String, Int, Int)] = ...
+ * val right: DataSet[(Int, String, Int)] = ...
+ * val coGrouped = left.coGroup(right).where(0).isEqualTo(1) {
+ * (l, r, out: Collector[(String, Int)]) =>
+ * out.collect((l.min, r.max))
+ * out.collect(l.max, r.min))
+ * }
+ * }}}
+ */
+ def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O] =
+ new UnfinishedCoGroupOperationImpl(this.set, other.set)
+
+ // --------------------------------------------------------------------------------------------
+ // Cross
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a new DataSet by forming the cartesian product of `this` DataSet and the `other`
+ * DataSet.
+ *
+ * The default cross result is a DataSet with 2-Tuples of the combined values. A custom cross
+ * function can be used if more control over the result is required. This can either be given as
+ * a lambda or a custom [[CrossFunction]]. For example:
+ * {{{
+ * val left: DataSet[(String, Int, Int)] = ...
+ * val right: DataSet[(Int, String, Int)] = ...
+ * val product = left.cross(right) { (l, r) => (l._2, r._3) }
+ * }
+ * }}}
+ */
+ def cross[O](other: DataSet[O]): CrossDataSet[T, O] =
+ CrossDataSetImpl.createCrossOperator(this.set, other.set)
+
+ /**
+ * Special [[cross]] operation for explicitly telling the system that the right side is assumed
+ * to be a lot smaller than the left side of the cartesian product.
+ */
+ def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O] =
+ CrossDataSetImpl.createCrossOperator(this.set, other.set)
+
+ /**
+ * Special [[cross]] operation for explicitly telling the system that the left side is assumed
+ * to be a lot smaller than the right side of the cartesian product.
+ */
+ def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O] =
+ CrossDataSetImpl.createCrossOperator(this.set, other.set)
+
+ // --------------------------------------------------------------------------------------------
+ // Iterations
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a new DataSet by performing bulk iterations using the given step function. The
+ * iterations terminate when `maxIterations` iterations have been performed.
+ *
+ * For example:
+ * {{{
+ * val input: DataSet[(String, Int)] = ...
+ * val iterated = input.iterate(5) { previous =>
+ * val next = previous.map { x => (x._1, x._2 + 1) }
+ * next
+ * }
+ * }}}
+ *
+ * This example will simply increase the second field of the tuple by 5.
+ */
+ def iterate(maxIterations: Int)(stepFunction: (DataSet[T]) => DataSet[T]): DataSet[T] = {
+ val iterativeSet =
+ new IterativeDataSet[T](set.getExecutionEnvironment, set.getType, set,maxIterations)
+
+ val resultSet = stepFunction(wrap(iterativeSet))
+ val result = iterativeSet.closeWith(resultSet.set)
+ wrap(result)
+ }
+
+ /**
+ * Creates a new DataSet by performing bulk iterations using the given step function. The first
+ * DataSet the step function returns is the input for the next iteration, the second DataSet is
+ * the termination criterion. The iterations terminate when either the termination criterion
+ * DataSet contains no elements or when `maxIterations` iterations have been performed.
+ *
+ * For example:
+ * {{{
+ * val input: DataSet[(String, Int)] = ...
+ * val iterated = input.iterateWithTermination(5) { previous =>
+ * val next = previous.map { x => (x._1, x._2 + 1) }
+ * val term = next.filter { _._2 < 3 }
+ * (next, term)
+ * }
+ * }}}
+ *
+ * This example will simply increase the second field of the Tuples until they are no longer
+ * smaller than 3.
+ */
+ def iterateWithTermination(maxIterations: Int)(
+ stepFunction: (DataSet[T]) => (DataSet[T], DataSet[_])): DataSet[T] = {
+ val iterativeSet =
+ new IterativeDataSet[T](set.getExecutionEnvironment, set.getType, set,maxIterations)
+
+ val (resultSet, terminationCriterion) = stepFunction(wrap(iterativeSet))
+ val result = iterativeSet.closeWith(resultSet.set, terminationCriterion.set)
+ wrap(result)
+ }
+
+ /**
+ * Creates a new DataSet by performing delta (or workset) iterations using the given step
+ * function. At the beginning `this` DataSet is the solution set and `workset` is the Workset.
+ * The iteration step function gets the current solution set and workset and must output the
+ * delta for the solution set and the workset for the next iteration.
+ *
+ * Note: The syntax of delta iterations are very likely going to change soon.
+ */
+ def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[Int])(
+ stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
+ val key = new FieldPositionKeys[T](keyFields, set.getType, false)
+ val iterativeSet = new DeltaIteration[T, R](
+ set.getExecutionEnvironment, set.getType, set, workset.set, key, maxIterations)
+ val (newSolution, newWorkset) = stepFunction(
+ wrap(iterativeSet.getSolutionSet),
+ wrap(iterativeSet.getWorkset))
+ val result = iterativeSet.closeWith(newSolution.set, newWorkset.set)
+ wrap(result)
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Union
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a new DataSet containing the elements from both `this` DataSet and the `other`
+ * DataSet.
+ */
+ def union(other: DataSet[T]): DataSet[T] = wrap(new UnionOperator[T](set, other.set))
+
+ // --------------------------------------------------------------------------------------------
+ // Result writing
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Writes `this` DataSet to the specified location. This uses [[AnyRef.toString]] on
+ * each element.
+ */
+ def writeAsText(
+ filePath: String,
+ writeMode: FileSystem.WriteMode = WriteMode.NO_OVERWRITE): DataSink[T] = {
+ val tof: TextOutputFormat[T] = new TextOutputFormat[T](new Path(filePath))
+ tof.setWriteMode(writeMode)
+ output(tof)
+ }
+
+ /**
+ * Writes `this` DataSet to the specified location as a CSV file.
+ *
+ * This only works on Tuple DataSets. For individual tuple fields [[AnyRef.toString]] is used.
+ */
+ def writeAsCsv(
+ filePath: String,
+ rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
+ fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,
+ writeMode: FileSystem.WriteMode = WriteMode.NO_OVERWRITE): DataSink[T] = {
+ Validate.isTrue(set.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
+ val of = new ScalaCsvOutputFormat[Product](new Path(filePath), rowDelimiter, fieldDelimiter)
+ of.setWriteMode(writeMode)
+ output(of.asInstanceOf[OutputFormat[T]])
+ }
+
+ /**
+ * Writes `this` DataSet to the specified location using a custom
+ * [[org.apache.flink.api.common.io.FileOutputFormat]].
+ */
+ def write(
+ outputFormat: FileOutputFormat[T],
+ filePath: String,
+ writeMode: FileSystem.WriteMode = WriteMode.NO_OVERWRITE): DataSink[T] = {
+ Validate.notNull(filePath, "File path must not be null.")
+ Validate.notNull(outputFormat, "Output format must not be null.")
+ outputFormat.setOutputFilePath(new Path(filePath))
+ outputFormat.setWriteMode(writeMode)
+ output(outputFormat)
+ }
+
+ /**
+ * Emits `this` DataSet using a custom [[org.apache.flink.api.common.io.OutputFormat]].
+ */
+ def output(outputFormat: OutputFormat[T]): DataSink[T] = {
+ set.output(outputFormat)
+ }
+
+ /**
+ * Writes a DataSet to the standard output stream (stdout). This uses [[AnyRef.toString]] on
+ * each element.
+ */
+ def print(): DataSink[T] = {
+ output(new PrintingOutputFormat[T](false))
+ }
+
+ /**
+ * Writes a DataSet to the standard error stream (stderr).This uses [[AnyRef.toString]] on
+ * each element.
+ */
+ def printToErr(): DataSink[T] = {
+ output(new PrintingOutputFormat[T](true))
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSink.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSink.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSink.scala
deleted file mode 100644
index 35d0dd0..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSink.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import java.net.URI
-
-import org.apache.flink.api.scala.analysis._
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.types.Record
-import org.apache.flink.api.common.operators.base.GenericDataSinkBase
-import org.apache.flink.api.java.record.operators.FileDataSink
-import org.apache.flink.api.common.io.OutputFormat
-
-
-object DataSinkOperator {
- val DEFAULT_DATASINKOPERATOR_NAME = "<Unnamed Scala Data Sink>"
-
- def write[In](input: DataSet[In], url: String, format: ScalaOutputFormat[In],
- name: String = DEFAULT_DATASINKOPERATOR_NAME): ScalaSink[In]
- = {
- val uri = getUri(url)
-
- val ret = uri.getScheme match {
- case "file" | "hdfs" => new FileDataSink(format.asInstanceOf[FileOutputFormat[Record]], uri.toString,
- input.contract, name) with ScalaOutputOperator[In] {
-
- def getUDF = format.getUDF
- override def persistConfiguration() = format.persistConfiguration(this.getParameters())
- }
- }
- new ScalaSink(ret)
- }
-
- private def getUri(url: String) = {
- val uri = new URI(url)
- if (uri.getScheme == null)
- new URI("file://" + url)
- else
- uri
- }
-}
-
-class ScalaSink[In] private[scala] (private[scala] val sink: GenericDataSinkBase[Record])
-
-trait ScalaOutputFormat[In] { this: OutputFormat[_] =>
- def getUDF: UDF1[In, Nothing]
- def persistConfiguration(config: Configuration) = {}
- def configure(config: Configuration)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSource.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSource.scala
deleted file mode 100644
index e7990a2..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSource.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import java.net.URI
-import collection.JavaConversions._
-import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.functions._
-import org.apache.flink.api.scala.analysis.UDF0
-import org.apache.flink.types._
-import org.apache.flink.types.parser._
-import org.apache.flink.api.java.record.operators.{CollectionDataSource => JavaCollectionDataSource, FileDataSource}
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.api.common.io.FileInputFormat
-import org.apache.flink.api.java.record.operators.{CollectionDataSource => JavaCollectionDataSource}
-import org.apache.flink.api.common.io.InputFormat
-import org.apache.flink.api.scala.operators.TextInputFormat
-
-
-
-object DataSource {
-
- def apply[Out](url: String, format: ScalaInputFormat[Out]): DataSet[Out] with OutputHintable[Out] = {
- val uri = getUri(url)
-
- val ret = uri.getScheme match {
-
- case "file" | "hdfs" => new FileDataSource(format.asInstanceOf[FileInputFormat[Record]], uri.toString)
- with ScalaOperator[Out, Record] {
-
- override def getUDF = format.getUDF
-
- override def persistConfiguration() = format.persistConfiguration(this.getParameters)
- }
-
-// case "ext" => new GenericDataSource[GenericInputFormat[_]](format.asInstanceOf[GenericInputFormat[_]], uri.toString)
-// with ScalaOperator[Out] {
-//
-// override def getUDF = format.getUDF
-// override def persistConfiguration() = format.persistConfiguration(this.getParameters())
-// }
- }
-
- new DataSet[Out](ret) with OutputHintable[Out] {}
- }
-
- private def getUri(url: String) = {
- val uri = new URI(url)
- if (uri.getScheme == null)
- new URI("file://" + url)
- else
- uri
- }
-}
-
-object CollectionDataSource {
- /*
- constructor for collection input
- */
- def apply[Out: UDT](data: Iterable[Out]):DataSet[Out] with OutputHintable[Out] = {
- /*
- reuse the java implementation of collection data by adding scala operator
- */
- val js:java.util.Collection[Out] = data
- val ret = new JavaCollectionDataSource(js)
- with ScalaOperator[Out, Record]{
-
- val udf = new UDF0(implicitly[UDT[Out]])
- override def getUDF = udf
-
- }
-
- new DataSet[Out](ret) with OutputHintable[Out] {}
- }
-
- /*
- constructor for serializable iterator input
- */
- def apply[Out: UDT](data: Iterator[Out] with Serializable) = {
-
- /*
- reuse the java implementation of collection data by adding scala operator
- */
- val ret = new JavaCollectionDataSource(data)
- with ScalaOperator[Out, Record]{
-
- val udf = new UDF0(implicitly[UDT[Out]])
- override def getUDF = udf
-
- }
-
- new DataSet[Out](ret) with OutputHintable[Out] {}
- }
-}
-
-
-
-trait ScalaInputFormat[Out] { this: InputFormat[_, _] =>
- def getUDF: UDF0[Out]
- def persistConfiguration(config: Configuration) = {}
- def configure(config: Configuration)
-}
-
-
-object TextFile {
- def apply(url: String): DataSet[String] with OutputHintable[String] = DataSource(url, TextInputFormat())
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
new file mode 100644
index 0000000..6f44e68
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import java.util.UUID
+
+import org.apache.commons.lang3.Validate
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.java.io._
+import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, BasicTypeInfo}
+import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
+import org.apache.flink.core.fs.Path
+
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv}
+import org.apache.flink.api.common.io.{InputFormat, FileInputFormat}
+
+import org.apache.flink.api.java.operators.DataSource
+import org.apache.flink.types.TypeInformation
+import org.apache.flink.util.{NumberSequenceIterator, SplittableIterator}
+
+import scala.collection.JavaConverters._
+
+import scala.reflect.ClassTag
+
+/**
+ * The ExecutionEnviroment is the context in which a program is executed. A local environment will
+ * cause execution in the current JVM, a remote environment will cause execution on a remote
+ * cluster installation.
+ *
+ * The environment provides methods to control the job execution (such as setting the parallelism)
+ * and to interact with the outside world (data access).
+ *
+ * To get an execution environment use the methods on the companion object:
+ *
+ * - [[ExecutionEnvironment.getExecutionEnvironment]]
+ * - [[ExecutionEnvironment.createLocalEnvironment]]
+ * - [[ExecutionEnvironment.createRemoteEnvironment]]
+ *
+ * Use [[ExecutionEnvironment.getExecutionEnvironment]] to get the correct environment depending
+ * on where the program is executed. If it is run inside an IDE a loca environment will be
+ * created. If the program is submitted to a cluster a remote execution environment will
+ * be created.
+ */
+class ExecutionEnvironment(javaEnv: JavaEnv) {
+
+ /**
+ * Sets the degree of parallelism (DOP) for operations executed through this environment.
+ * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
+ * x parallel instances. This value can be overridden by specific operations using
+ * [[DataSet.setParallelism]].
+ */
+ def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
+ javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+ }
+
+ /**
+ * Returns the default degree of parallelism for this execution environment. Note that this
+ * value can be overridden by individual operations using [[DataSet.setParallelism]]
+ */
+ def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+
+ /**
+ * Gets the UUID by which this environment is identified. The UUID sets the execution context
+ * in the cluster or local environment.
+ */
+ def getId: UUID = {
+ javaEnv.getId
+ }
+
+ /**
+ * Gets the UUID by which this environment is identified, as a string.
+ */
+ def getIdString: String = {
+ javaEnv.getIdString
+ }
+
+ /**
+ * Creates a DataSet of Strings produced by reading the given file line wise.
+ *
+ * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
+ * "hdfs://host:port/file/path").
+ * @param charsetName The name of the character set used to read the file. Default is UTF-0
+ */
+ def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String] = {
+ Validate.notNull(filePath, "The file path may not be null.")
+ val format = new TextInputFormat(new Path(filePath))
+ format.setCharsetName(charsetName)
+ val source = new DataSource[String](javaEnv, format, BasicTypeInfo.STRING_TYPE_INFO)
+ wrap(source)
+ }
+
+ /**
+ * Creates a DataSet by reading the given CSV file. The type parameter must be used to specify
+ * a Tuple type that has the same number of fields as there are fields in the CSV file. If the
+ * number of fields in the CSV file is not the same, the `includedFields` parameter can be used
+ * to only read specific fields.
+ *
+ * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
+ * "hdfs://host:port/file/path"). * @param lineDelimiter
+ * @param lineDelimiter The string that separates lines, defaults to newline.
+ * @param fieldDelimiter The char that separates individual fields, defaults to ','.
+ * @param ignoreFirstLine Whether the first line in the file should be ignored.
+ * @param lenient Whether the parser should silently ignore malformed lines.
+ * @param includedFields The fields in the file that should be read. Per default all fields
+ * are read.
+ */
+ def readCsvFile[T <: Product : ClassTag : TypeInformation](
+ filePath: String,
+ lineDelimiter: String = "\n",
+ fieldDelimiter: Char = ',',
+ ignoreFirstLine: Boolean = false,
+ lenient: Boolean = false,
+ includedFields: Array[Int] = null): DataSet[T] = {
+
+ val typeInfo = implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]]
+
+ val inputFormat = new ScalaCsvInputFormat[T](new Path(filePath), typeInfo)
+ inputFormat.setDelimiter(lineDelimiter)
+ inputFormat.setFieldDelimiter(fieldDelimiter)
+ inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
+ inputFormat.setLenient(lenient)
+
+ val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity)
+ for (i <- 0 until typeInfo.getArity) {
+ classes(i) = typeInfo.getTypeAt(i).getTypeClass
+ }
+ if (includedFields != null) {
+ Validate.isTrue(typeInfo.getArity == includedFields.length, "Number of tuple fields and" +
+ " included fields must match.")
+ inputFormat.setFields(includedFields, classes)
+ } else {
+ inputFormat.setFieldTypes(classes)
+ }
+
+ wrap(new DataSource[T](javaEnv, inputFormat, typeInfo))
+ }
+
+ /**
+ * Creates a new DataSource by reading the specified file using the custom
+ * [[org.apache.flink.api.common.io.FileInputFormat]].
+ */
+ def readFile[T : ClassTag : TypeInformation](
+ inputFormat: FileInputFormat[T],
+ filePath: String): DataSet[T] = {
+ Validate.notNull(inputFormat, "InputFormat must not be null.")
+ Validate.notNull(filePath, "File path must not be null.")
+ inputFormat.setFilePath(new Path(filePath))
+ createInput(inputFormat, implicitly[TypeInformation[T]])
+ }
+
+ /**
+ * Generic method to create an input DataSet with an
+ * [[org.apache.flink.api.common.io.InputFormat]].
+ */
+ def createInput[T : ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T] = {
+ if (inputFormat == null) {
+ throw new IllegalArgumentException("InputFormat must not be null.")
+ }
+ createInput(inputFormat, implicitly[TypeInformation[T]])
+ }
+
+ /**
+ * Generic method to create an input DataSet with an
+ * [[org.apache.flink.api.common.io.InputFormat]].
+ */
+ private def createInput[T: ClassTag](
+ inputFormat: InputFormat[T, _],
+ producedType: TypeInformation[T]): DataSet[T] = {
+ if (inputFormat == null) {
+ throw new IllegalArgumentException("InputFormat must not be null.")
+ }
+ Validate.notNull(producedType, "Produced type must not be null")
+ wrap(new DataSource[T](javaEnv, inputFormat, producedType))
+ }
+
+ /**
+ * Creates a DataSet from the given non-empty [[Seq]]. The elements need to be serializable
+ * because the framework may move the elements into the cluster if needed.
+ *
+ * Note that this operation will result in a non-parallel data source, i.e. a data source with
+ * a degree of parallelism of one.
+ */
+ def fromCollection[T: ClassTag : TypeInformation](
+ data: Seq[T]): DataSet[T] = {
+ Validate.notNull(data, "Data must not be null.")
+
+ val typeInfo = implicitly[TypeInformation[T]]
+ CollectionInputFormat.checkCollection(data.asJavaCollection, typeInfo.getTypeClass)
+ val dataSource = new DataSource[T](
+ javaEnv,
+ new CollectionInputFormat[T](data.asJavaCollection, typeInfo.createSerializer),
+ typeInfo)
+ wrap(dataSource)
+ }
+
+ /**
+ * Creates a DataSet from the given [[Iterator]]. The iterator must be serializable because the
+ * framework might move into the cluster if needed.
+ *
+ * Note that this operation will result in a non-parallel data source, i.e. a data source with
+ * a degree of parallelism of one.
+ */
+ def fromCollection[T: ClassTag : TypeInformation] (
+ data: Iterator[T]): DataSet[T] = {
+ Validate.notNull(data, "Data must not be null.")
+
+ val typeInfo = implicitly[TypeInformation[T]]
+ val dataSource = new DataSource[T](
+ javaEnv,
+ new IteratorInputFormat[T](data.asJava),
+ typeInfo)
+ wrap(dataSource)
+ }
+
+ /**
+ * Creates a new data set that contains the given elements. The elements must all be of the
+ * same type and must be serializable.
+ *
+ * * Note that this operation will result in a non-parallel data source, i.e. a data source with
+ * a degree of parallelism of one.
+ */
+ def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T] = {
+ Validate.notNull(data, "Data must not be null.")
+ val typeInfo = implicitly[TypeInformation[T]]
+ fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
+ }
+
+ /**
+ * Creates a new data set that contains elements in the iterator. The iterator is splittable,
+ * allowing the framework to create a parallel data source that returns the elements in the
+ * iterator. The iterator must be serializable because the execution environment may ship the
+ * elements into the cluster.
+ */
+ def fromParallelCollection[T: ClassTag : TypeInformation](
+ iterator: SplittableIterator[T]): DataSet[T] = {
+ val typeInfo = implicitly[TypeInformation[T]]
+ wrap(new DataSource[T](javaEnv, new ParallelIteratorInputFormat[T](iterator), typeInfo))
+ }
+
+ /**
+ * Creates a new data set that contains a sequence of numbers. The data set will be created in
+ * parallel, so there is no guarantee about the oder of the elements.
+ *
+ * @param from The number to start at (inclusive).
+ * @param to The number to stop at (inclusive).
+ */
+ def generateSequence(from: Long, to: Long): DataSet[Long] = {
+ val iterator = new NumberSequenceIterator(from, to)
+ val source = new DataSource(
+ javaEnv,
+ new ParallelIteratorInputFormat[java.lang.Long](iterator),
+ BasicTypeInfo.LONG_TYPE_INFO)
+ wrap(source).asInstanceOf[DataSet[Long]]
+ }
+
+ /**
+ * Registers a file at the distributed cache under the given name. The file will be accessible
+ * from any user-defined function in the (distributed) runtime under a local path. Files
+ * may be local files (as long as all relevant workers have access to it),
+ * or files in a distributed file system.
+ * The runtime will copy the files temporarily to a local cache, if needed.
+ *
+ * The [[org.apache.flink.api.common.functions.RuntimeContext]] can be obtained inside UDFs
+ * via
+ * [[org.apache.flink.api.common.functions.RichFunction#getRuntimeContext]] and provides
+ * access via
+ * [[org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache]]
+ *
+ * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
+ * "hdfs://host:port/and/path")
+ * @param name The name under which the file is registered.
+ * @param executable Flag indicating whether the file should be executable
+ */
+ def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit = {
+ javaEnv.registerCachedFile(filePath, name, executable)
+ }
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of the program that have
+ * resulted in a "sink" operation. Sink operations are for example printing results
+ * [[DataSet.print]], writing results (e.g. [[DataSet.writeAsText]], [[DataSet.write]], or other
+ * generic data sinks created with [[DataSet.output]].
+ *
+ * The program execution will be logged and displayed with a generated default name.
+ *
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ */
+ def execute(): JobExecutionResult = {
+ javaEnv.execute()
+ }
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of the program that have
+ * resulted in a "sink" operation. Sink operations are for example printing results
+ * [[DataSet.print]], writing results (e.g. [[DataSet.writeAsText]], [[DataSet.write]], or other
+ * generic data sinks created with [[DataSet.output]].
+ *
+ * The program execution will be logged and displayed with the given name.
+ *
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ */
+ def execute(jobName: String): JobExecutionResult = {
+ javaEnv.execute(jobName)
+ }
+
+ /**
+ * Creates the plan with which the system will execute the program, and returns it as a String
+ * using a JSON representation of the execution data flow graph.
+ */
+ def getExecutionPlan() = {
+ javaEnv.getExecutionPlan
+ }
+
+ /**
+ * Creates the program's [[org.apache.flink.api.common.Plan]].
+ * The plan is a description of all data sources, data sinks,
+ * and operations and how they interact, as an isolated unit that can be executed with a
+ * [[org.apache.flink.api.common.PlanExecutor]]. Obtaining a plan and starting it with an
+ * executor is an alternative way to run a program and is only possible if the program only
+ * consists of distributed operations.
+ */
+ def createProgramPlan(jobName: String = "") = {
+ if (jobName.isEmpty) {
+ javaEnv.createProgramPlan()
+ } else
+ javaEnv.createProgramPlan(jobName)
+ }
+}
+
+object ExecutionEnvironment {
+
+ /**
+ * Creates an execution environment that represents the context in which the program is
+ * currently executed. If the program is invoked standalone, this method returns a local
+ * execution environment. If the program is invoked from within the command line client
+ * to be submitted to a cluster, this method returns the execution environment of this cluster.
+ */
+ def getExecutionEnvironment: ExecutionEnvironment = {
+ new ExecutionEnvironment(JavaEnv.getExecutionEnvironment)
+ }
+
+ /**
+ * Creates a local execution environment. The local execution environment will run the program in
+ * a multi-threaded fashion in the same JVM as the environment was created in. The default degree
+ * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
+ */
+ def createLocalEnvironment(
+ degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()) : ExecutionEnvironment = {
+ val javaEnv = JavaEnv.createLocalEnvironment()
+ javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+ new ExecutionEnvironment(javaEnv)
+ }
+
+ /**
+ * Creates a remote execution environment. The remote environment sends (parts of) the program to
+ * a cluster for execution. Note that all file paths used in the program must be accessible from
+ * the cluster. The execution will use the cluster's default degree of parallelism, unless the
+ * parallelism is set explicitly via [[ExecutionEnvironment.setDegreeOfParallelism()]].
+ *
+ * @param host The host name or address of the master (JobManager),
+ * where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should be executed.
+ * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
+ * program uses
+ * user-defined functions, user-defined input formats, or any libraries,
+ * those must be
+ * provided in the JAR files.
+ */
+ def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment = {
+ new ExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*))
+ }
+
+ /**
+ * Creates a remote execution environment. The remote environment sends (parts of) the program
+ * to a cluster for execution. Note that all file paths used in the program must be accessible
+ * from the cluster. The execution will use the specified degree of parallelism.
+ *
+ * @param host The host name or address of the master (JobManager),
+ * where the program should be executed.
+ * @param port The port of the master (JobManager), where the program should be executed.
+ * @param degreeOfParallelism The degree of parallelism to use during the execution.
+ * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
+ * program uses
+ * user-defined functions, user-defined input formats, or any libraries,
+ * those must be
+ * provided in the JAR files.
+ */
+ def createRemoteEnvironment(
+ host: String,
+ port: Int,
+ degreeOfParallelism: Int,
+ jarFiles: String*): ExecutionEnvironment = {
+ val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
+ javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+ new ExecutionEnvironment(javaEnv)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
new file mode 100644
index 0000000..dfd5cf0
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala.operators.ScalaAggregateOperator
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.Validate
+import org.apache.flink.api.common.functions.{GroupReduceFunction, ReduceFunction}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.java.operators._
+import org.apache.flink.types.TypeInformation
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+/**
+ * A [[DataSet]] to which a grouping key was added. Operations work on groups of elements with the
+ * same key (`aggregate`, `reduce`, and `reduceGroup`).
+ *
+ * A secondary sort order can be added with sortGroup, but this is only used when using one
+ * of the group-at-a-time operations, i.e. `reduceGroup`.
+ */
+trait GroupedDataSet[T] {
+
+ /**
+ * Adds a secondary sort key to this [[GroupedDataSet]]. This will only have an effect if you
+ * use one of the group-at-a-time, i.e. `reduceGroup`
+ */
+ def sortGroup(field: Int, order: Order): GroupedDataSet[T]
+
+ /**
+ * Creates a new [[DataSet]] by aggregating the specified tuple field using the given aggregation
+ * function. Since this is a keyed DataSet the aggregation will be performed on groups of
+ * tuples with the same key.
+ *
+ * This only works on Tuple DataSets.
+ */
+ def aggregate(agg: Aggregations, field: Int): DataSet[T]
+
+ /**
+ * Syntactic sugar for [[aggregate]] with `SUM`
+ */
+ def sum(field: Int): DataSet[T]
+
+ /**
+ * Syntactic sugar for [[aggregate]] with `MAX`
+ */
+ def max(field: Int): DataSet[T]
+
+ /**
+ * Syntactic sugar for [[aggregate]] with `MIN`
+ */
+ def min(field: Int): DataSet[T]
+
+ /**
+ * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key)
+ * using an associative reduce function.
+ */
+ def reduce(fun: (T, T) => T): DataSet[T]
+
+ /**
+ * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key)
+ * using an associative reduce function.
+ */
+ def reduce(reducer: ReduceFunction[T]): DataSet[T]
+
+ /**
+ * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+ * of elements to the group reduce function. The function must output one element. The
+ * concatenation of those will form the resulting [[DataSet]].
+ */
+ def reduceGroup[R: TypeInformation: ClassTag](fun: (TraversableOnce[T]) => R): DataSet[R]
+
+ /**
+ * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+ * of elements to the group reduce function. The function can output zero or more elements using
+ * the [[Collector]]. The concatenation of the emitted values will form the resulting [[DataSet]].
+ */
+ def reduceGroup[R: TypeInformation: ClassTag](
+ fun: (TraversableOnce[T], Collector[R]) => Unit): DataSet[R]
+
+ /**
+ * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+ * of elements to the [[GroupReduceFunction]]. The function can output zero or more elements. The
+ * concatenation of the emitted values will form the resulting [[DataSet]].
+ */
+ def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
+}
+
+/**
+ * /**
+ * Private implementation for [[GroupedDataSet]] to keep the implementation details, i.e. the
+ * parameters of the constructor, hidden.
+ */
+ */
+private[flink] class GroupedDataSetImpl[T: ClassTag](
+ private val set: JavaDataSet[T],
+ private val keys: Keys[T])
+ extends GroupedDataSet[T] {
+
+ // These are for optional secondary sort. They are only used
+ // when using a group-at-a-time reduce function.
+ private val groupSortKeyPositions = mutable.MutableList[Int]()
+ private val groupSortOrders = mutable.MutableList[Order]()
+
+ /**
+ * Adds a secondary sort key to this [[GroupedDataSet]]. This will only have an effect if you
+ * use one of the group-at-a-time, i.e. `reduceGroup`
+ */
+ def sortGroup(field: Int, order: Order): GroupedDataSet[T] = {
+ if (!set.getType.isTupleType) {
+ throw new InvalidProgramException("Specifying order keys via field positions is only valid " +
+ "for tuple data types")
+ }
+ if (field >= set.getType.getArity) {
+ throw new IllegalArgumentException("Order key out of tuple bounds.")
+ }
+ groupSortKeyPositions += field
+ groupSortOrders += order
+ this
+ }
+
+ /**
+ * Creates a [[SortedGrouping]] if any secondary sort fields were specified. Otherwise, just
+ * create an [[UnsortedGrouping]].
+ */
+ private def maybeCreateSortedGrouping(): Grouping[T] = {
+ if (groupSortKeyPositions.length > 0) {
+ val grouping = new SortedGrouping[T](set, keys, groupSortKeyPositions(0), groupSortOrders(0))
+ // now manually add the rest of the keys
+ for (i <- 1 until groupSortKeyPositions.length) {
+ grouping.sortGroup(groupSortKeyPositions(i), groupSortOrders(i))
+ }
+ grouping
+ } else {
+ new UnsortedGrouping[T](set, keys)
+ }
+ }
+
+ /** Convenience methods for creating the [[UnsortedGrouping]] */
+ private def createUnsortedGrouping(): Grouping[T] = new UnsortedGrouping[T](set, keys)
+
+ /**
+ * Creates a new [[DataSet]] by aggregating the specified tuple field using the given aggregation
+ * function. Since this is a keyed DataSet the aggregation will be performed on groups of
+ * tuples with the same key.
+ *
+ * This only works on Tuple DataSets.
+ */
+ def aggregate(agg: Aggregations, field: Int): DataSet[T] = set match {
+ case aggregation: ScalaAggregateOperator[T] =>
+ aggregation.and(agg, field)
+ wrap(aggregation)
+
+ case _ => wrap(new ScalaAggregateOperator[T](createUnsortedGrouping(), agg, field))
+ }
+
+ /**
+ * Syntactic sugar for [[aggregate]] with `SUM`
+ */
+ def sum(field: Int): DataSet[T] = {
+ aggregate(Aggregations.SUM, field)
+ }
+
+ /**
+ * Syntactic sugar for [[aggregate]] with `MAX`
+ */
+ def max(field: Int): DataSet[T] = {
+ aggregate(Aggregations.MAX, field)
+ }
+
+ /**
+ * Syntactic sugar for [[aggregate]] with `MIN`
+ */
+ def min(field: Int): DataSet[T] = {
+ aggregate(Aggregations.MIN, field)
+ }
+
+ /**
+ * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key)
+ * using an associative reduce function.
+ */
+ def reduce(fun: (T, T) => T): DataSet[T] = {
+ Validate.notNull(fun, "Reduce function must not be null.")
+ val reducer = new ReduceFunction[T] {
+ def reduce(v1: T, v2: T) = {
+ fun(v1, v2)
+ }
+ }
+ wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer))
+ }
+
+ /**
+ * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key)
+ * using an associative reduce function.
+ */
+ def reduce(reducer: ReduceFunction[T]): DataSet[T] = {
+ Validate.notNull(reducer, "Reduce function must not be null.")
+ wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer))
+ }
+
+ /**
+ * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+ * of elements to the group reduce function. The function must output one element. The
+ * concatenation of those will form the resulting [[DataSet]].
+ */
+ def reduceGroup[R: TypeInformation: ClassTag](
+ fun: (TraversableOnce[T]) => R): DataSet[R] = {
+ Validate.notNull(fun, "Group reduce function must not be null.")
+ val reducer = new GroupReduceFunction[T, R] {
+ def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
+ out.collect(fun(in.iterator().asScala))
+ }
+ }
+ wrap(
+ new GroupReduceOperator[T, R](createUnsortedGrouping(),
+ implicitly[TypeInformation[R]], reducer))
+ }
+
+ /**
+ * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+ * of elements to the group reduce function. The function can output zero or more elements using
+ * the [[Collector]]. The concatenation of the emitted values will form the resulting [[DataSet]].
+ */
+ def reduceGroup[R: TypeInformation: ClassTag](
+ fun: (TraversableOnce[T], Collector[R]) => Unit): DataSet[R] = {
+ Validate.notNull(fun, "Group reduce function must not be null.")
+ val reducer = new GroupReduceFunction[T, R] {
+ def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
+ fun(in.iterator().asScala, out)
+ }
+ }
+ wrap(
+ new GroupReduceOperator[T, R](createUnsortedGrouping(),
+ implicitly[TypeInformation[R]], reducer))
+ }
+
+ /**
+ * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+ * of elements to the [[GroupReduceFunction]]. The function can output zero or more elements. The
+ * concatenation of the emitted values will form the resulting [[DataSet]].
+ */
+ def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R] = {
+ Validate.notNull(reducer, "GroupReduce function must not be null.")
+ wrap(
+ new GroupReduceOperator[T, R](maybeCreateSortedGrouping(),
+ implicitly[TypeInformation[R]], reducer))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaOperator.scala
deleted file mode 100644
index 1a62d31..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaOperator.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import java.lang.annotation.Annotation
-
-import org.apache.flink.api.scala.analysis.UDF
-import org.apache.flink.api.scala.analysis.UDF0
-import org.apache.flink.api.scala.analysis.UDF1
-import org.apache.flink.api.scala.analysis.UDF2
-import org.apache.flink.api.scala.analysis.FieldSelector
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.compiler.dag.OptimizerNode
-import org.apache.flink.api.common.operators.AbstractUdfOperator
-import org.apache.flink.api.common.operators.Union
-import org.apache.flink.types.Record
-import org.apache.flink.types.{Nothing => JavaNothing}
-
-trait ScalaOperator[T, UT] {
- this: Operator[UT] =>
- def getUDF: UDF[T]
- def getKeys: Seq[FieldSelector] = Seq()
- def persistConfiguration(): Unit = {}
-
- var persistHints: () => Unit = { () => }
-
- def persistConfiguration(optimizerNode: Option[OptimizerNode]): Unit = {
-
- ScalaOperator.this match {
-
- case contract: AbstractUdfOperator[_, _] => {
- for ((key, inputNum) <- getKeys.zipWithIndex) {
-
- val source = key.selectedFields.toSerializerIndexArray
- val target = optimizerNode map { _.getRemappedKeys(inputNum) } getOrElse {
- contract.getKeyColumns(inputNum) }
-
- assert(source.length == target.length, "Attempt to write " + source.length +
- " key indexes to an array of size " + target.length)
- System.arraycopy(source, 0, target, 0, source.length)
- }
- }
-
- case _ if getKeys.size > 0 => throw new UnsupportedOperationException("Attempted to set " +
- "keys on a contract that doesn't support them")
-
- case _ =>
- }
-
- persistHints()
- persistConfiguration()
- }
-
- protected def annotations: Seq[Annotation] = Seq()
-
- def getUserCodeAnnotation[A <: Annotation](annotationClass: Class[A]): A = {
- val res = annotations find { _.annotationType().equals(annotationClass) } map {
- _.asInstanceOf[A] } getOrElse null.asInstanceOf[A]
-// println("returning ANOOT: " + res + " FOR: " + annotationClass.toString)
-// res match {
-// case r : FunctionAnnotation.ConstantFieldsFirst => println("CONSTANT FIELDS FIRST: " +
-// r.value().mkString(","))
-// case r : FunctionAnnotation.ConstantFieldsSecond => println("CONSTANT FIELDS SECOND: " +
-// r.value().mkString(","))
-// case _ =>
-// }
- res
- }
-}
-
-trait NoOpScalaOperator[In, Out] extends ScalaOperator[Out, Record] { this: Operator[Record] =>
-}
-
-trait HigherOrderScalaOperator[T] extends ScalaOperator[T, Record] { this: Operator[Record] =>
- override def getUDF: UDF0[T]
-}
-
-trait BulkIterationScalaOperator[T] extends HigherOrderScalaOperator[T] { this: Operator[Record] =>
-}
-
-trait DeltaIterationScalaOperator[T] extends HigherOrderScalaOperator[T] {
- this: Operator[Record] =>
- val key: FieldSelector
-}
-
-trait ScalaOutputOperator[In] extends ScalaOperator[Nothing, JavaNothing] {
- this: Operator[JavaNothing] =>
- override def getUDF: UDF1[In, Nothing]
-}
-
-trait OneInputScalaOperator[In, Out] extends ScalaOperator[Out, Record] { this: Operator[Record] =>
- override def getUDF: UDF1[In, Out]
-}
-
-trait TwoInputScalaOperator[In1, In2, Out] extends ScalaOperator[Out, Record] {
- this: Operator[Record] =>
- override def getUDF: UDF2[In1, In2, Out]
-}
-
-trait UnionScalaOperator[In] extends TwoInputScalaOperator[In, In, In] {
- this: Union[Record] =>
- override def getUDF: UDF2[In, In, In]
-}
-
-trait OneInputKeyedScalaOperator[In, Out] extends OneInputScalaOperator[In, Out] {
- this: Operator[Record] =>
- val key: FieldSelector
- override def getKeys = Seq(key)
-}
-
-trait TwoInputKeyedScalaOperator[LeftIn, RightIn, Out]
- extends TwoInputScalaOperator[LeftIn, RightIn, Out] {
-
- this: Operator[Record] =>
- val leftKey: FieldSelector
- val rightKey: FieldSelector
- override def getKeys = Seq(leftKey, rightKey)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaPlan.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaPlan.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaPlan.scala
deleted file mode 100644
index 2ddd437..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaPlan.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import scala.collection.JavaConversions.asJavaCollection
-
-import java.util.Calendar
-
-import org.apache.flink.api.common.Plan
-import org.apache.flink.compiler.plan.OptimizedPlan
-import org.apache.flink.compiler.postpass.RecordModelPostPass
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.types.Record
-
-import org.apache.flink.api.scala.analysis.GlobalSchemaGenerator
-import org.apache.flink.api.scala.analysis.postPass.GlobalSchemaOptimizer
-
-
-class ScalaPlan(scalaSinks: Seq[ScalaSink[_]], scalaJobName: String = "Flink Scala Job at " + Calendar.getInstance()
- .getTime()) extends Plan(asJavaCollection(ScalaPlan.setAnnotations(scalaSinks) map { _.sink }), scalaJobName) {
- val pactSinks = scalaSinks map { _.sink.asInstanceOf[Operator[Record] with ScalaOperator[_, _]] }
- new GlobalSchemaGenerator().initGlobalSchema(pactSinks)
- override def getPostPassClassName() = "org.apache.flink.api.scala.ScalaPostPass";
-
-}
-
-object ScalaPlan{
- def setAnnotations(sinks: Seq[ScalaSink[_]]): Seq[ScalaSink[_]] = {
- AnnotationUtil.setAnnotations(sinks)
- }
-}
-
-case class Args(argsMap: Map[String, String], defaultParallelism: Int, schemaHints: Boolean, schemaCompaction: Boolean) {
- def apply(key: String): String = argsMap.getOrElse(key, key)
- def apply(key: String, default: => String) = argsMap.getOrElse(key, default)
-}
-
-object Args {
-
- def parse(args: Seq[String]): Args = {
-
- var argsMap = Map[String, String]()
- var defaultParallelism = 1
- var schemaHints = true
- var schemaCompaction = true
-
- val ParamName = "-(.+)".r
-
- def parse(args: Seq[String]): Unit = args match {
- case Seq("-subtasks", value, rest @ _*) => { defaultParallelism = value.toInt; parse(rest) }
- case Seq("-nohints", rest @ _*) => { schemaHints = false; parse(rest) }
- case Seq("-nocompact", rest @ _*) => { schemaCompaction = false; parse(rest) }
- case Seq(ParamName(name), value, rest @ _*) => { argsMap = argsMap.updated(name, value); parse(rest) }
- case Seq() =>
- }
-
- parse(args)
- Args(argsMap, defaultParallelism, schemaHints, schemaCompaction)
- }
-}
-
-//abstract class ScalaProgram extends Program {
-// def getScalaPlan(args: Args): ScalaPlan
-//
-// override def getPlan(args: String*): Plan = {
-// val scalaArgs = Args.parse(args.toSeq)
-//
-// getScalaPlan(scalaArgs)
-// }
-//}
-
-
-class ScalaPostPass extends RecordModelPostPass with GlobalSchemaOptimizer {
- override def postPass(plan: OptimizedPlan): Unit = {
- optimizeSchema(plan, false)
- super.postPass(plan)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/Extractors.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/Extractors.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/Extractors.scala
deleted file mode 100644
index e08cd0f..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/Extractors.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import org.apache.flink.compiler.dag._
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.common.operators.Union
-import org.apache.flink.api.common.operators.DualInputOperator
-import org.apache.flink.api.common.operators.SingleInputOperator
-
-import org.apache.flink.api.common.operators.base.{BulkIterationBase => BulkIteration, DeltaIterationBase => DeltaIteration, GenericDataSinkBase, GenericDataSourceBase}
-
-import org.apache.flink.api.java.record.operators.CrossOperator
-import org.apache.flink.api.java.record.operators.CoGroupOperator
-import org.apache.flink.api.java.record.operators.JoinOperator
-import org.apache.flink.api.java.record.operators.MapOperator
-import org.apache.flink.api.java.record.operators.ReduceOperator
-
-import org.apache.flink.types.Record
-
-import org.apache.flink.api.scala._
-
-
-object Extractors {
-
- object DataSinkNode {
- def unapply(node: Operator[_]): Option[(UDF1[_, _], Operator[Record])] = node match {
- case contract: GenericDataSinkBase[_] with ScalaOutputOperator[_] => {
- Some((contract.getUDF.asInstanceOf[UDF1[_, _]], node.asInstanceOf[GenericDataSinkBase[_]].getInput().asInstanceOf[Operator[Record]]))
- }
- case _ => None
- }
- }
-
- object DataSourceNode {
- def unapply(node: Operator[_]): Option[(UDF0[_])] = node match {
- case contract: GenericDataSourceBase[_, _] with ScalaOperator[_, _] => Some(contract.getUDF.asInstanceOf[UDF0[_]])
- case _ => None
- }
- }
-
- object CoGroupNode {
- def unapply(node: Operator[_]): Option[(UDF2[_, _, _], FieldSelector, FieldSelector, Operator[Record], Operator[Record])] = node match {
- case contract: CoGroupOperator with TwoInputKeyedScalaOperator[_, _, _] => Some((contract.getUDF, contract.leftKey, contract.rightKey, contract.asInstanceOf[DualInputOperator[_, _, _, _]].getFirstInput().asInstanceOf[Operator[Record]], contract.asInstanceOf[DualInputOperator[_, _, _, _]].getSecondInput().asInstanceOf[Operator[Record]]))
- case _ => None
- }
- }
-
- object CrossNode {
- def unapply(node: Operator[_]): Option[(UDF2[_, _, _], Operator[Record], Operator[Record])] = node match {
- case contract: CrossOperator with TwoInputScalaOperator[_, _, _] => Some((contract.getUDF, contract.asInstanceOf[DualInputOperator[_, _, _, _]].getFirstInput().asInstanceOf[Operator[Record]], contract.asInstanceOf[DualInputOperator[_, _, _, _]].getSecondInput().asInstanceOf[Operator[Record]]))
- case _ => None
- }
- }
-
- object JoinNode {
- def unapply(node: Operator[_]): Option[(UDF2[_, _, _], FieldSelector, FieldSelector, Operator[Record], Operator[Record])] = node match {
- case contract: JoinOperator with TwoInputKeyedScalaOperator[ _, _, _] => Some((contract.getUDF, contract.leftKey, contract.rightKey, contract.asInstanceOf[DualInputOperator[_, _, _, _]].getFirstInput().asInstanceOf[Operator[Record]], contract.asInstanceOf[DualInputOperator[_, _, _, _]].getSecondInput().asInstanceOf[Operator[Record]]))
- case _ => None
- }
- }
-
- object MapNode {
- def unapply(node: Operator[_]): Option[(UDF1[_, _], Operator[Record])] = node match {
- case contract: MapOperator with OneInputScalaOperator[_, _] => Some((contract.getUDF, contract.asInstanceOf[SingleInputOperator[_, _, _]].getInput().asInstanceOf[Operator[Record]]))
- case _ => None
- }
- }
-
- object UnionNode {
- def unapply(node: Operator[_]): Option[(UDF2[_, _, _], Operator[Record], Operator[Record])] = node match {
- case contract: Union[_] with UnionScalaOperator[_] => Some((contract.getUDF, contract.asInstanceOf[DualInputOperator[_, _, _, _]].getFirstInput().asInstanceOf[Operator[Record]], contract.asInstanceOf[DualInputOperator[_, _, _, _]].getSecondInput().asInstanceOf[Operator[Record]]))
- case _ => None
- }
- }
-
- object ReduceNode {
- def unapply(node: Operator[_]): Option[(UDF1[_, _], FieldSelector, Operator[Record])] = node match {
- case contract: ReduceOperator with OneInputKeyedScalaOperator[_, _] => Some((contract.getUDF, contract.key, contract.asInstanceOf[SingleInputOperator[_, _, _]].getInput().asInstanceOf[Operator[Record]]))
- case contract: ReduceOperator with OneInputScalaOperator[_, _] => Some((contract.getUDF, new FieldSelector(contract.getUDF.inputUDT, Nil), contract.asInstanceOf[SingleInputOperator[_, _, _]].getInput().asInstanceOf[Operator[Record]]))
- case _ => None
- }
- }
- object DeltaIterationNode {
- def unapply(node: Operator[_]): Option[(UDF0[_], FieldSelector, Operator[Record], Operator[Record])] = node match {
- case contract: DeltaIteration[_, _] with DeltaIterationScalaOperator[_] => Some((contract.getUDF, contract.key, contract.asInstanceOf[DualInputOperator[_, _, _, _]].getFirstInput().asInstanceOf[Operator[Record]], contract.asInstanceOf[DualInputOperator[_, _, _, _]].getSecondInput().asInstanceOf[Operator[Record]]))
- case _ => None
- }
- }
-
- object BulkIterationNode {
- def unapply(node: Operator[_]): Option[(UDF0[_], Operator[Record])] = node match {
- case contract: BulkIteration[_] with BulkIterationScalaOperator[_] => Some((contract.getUDF, contract.asInstanceOf[SingleInputOperator[_, _, _]].getInput().asInstanceOf[Operator[Record]]))
- case _ => None
- }
- }
-}