You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:50 UTC

[08/60] Rewrite the Scala API as (somewhat) thin Layer on Java API

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 11ee48f..041f269 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -16,57 +16,822 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.scala
 
-import language.experimental.macros
-import scala.reflect.macros.Context
-
-import org.apache.flink.api.scala.operators._
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.types.Record
-
-
-class DataSet[T] (val contract: Operator[Record] with ScalaOperator[T, Record]) {
-  
-  def cogroup[RightIn](rightInput: DataSet[RightIn]) =
-    new CoGroupDataSet[T, RightIn](this, rightInput)
-  def cross[RightIn](rightInput: DataSet[RightIn]) = new CrossDataSet[T, RightIn](this, rightInput)
-  def join[RightIn](rightInput: DataSet[RightIn]) = new JoinDataSet[T, RightIn](this, rightInput)
-  
-  def map[Out](fun: T => Out) = macro MapMacros.map[T, Out]
-  def flatMap[Out](fun: T => Iterator[Out]) = macro MapMacros.flatMap[T, Out]
-  def filter(fun: T => Boolean) = macro MapMacros.filter[T]
-  
-  // reduce
-  def groupBy[Key](keyFun: T => Key) = macro ReduceMacros.groupBy[T, Key]
-
-  // reduce without grouping
-  def reduce(fun: (T, T) => T) = macro ReduceMacros.globalReduce[T]
-  def reduceAll[Out](fun: Iterator[T] => Out) = macro ReduceMacros.globalReduceGroup[T, Out]
-  def combinableReduceAll[Out](fun: Iterator[T] => Out) =
-    macro ReduceMacros.combinableGlobalReduceGroup[T]
-
-  def union(secondInput: DataSet[T]) = UnionOperator.impl[T](this, secondInput)
-  
-  def iterateWithDelta[DeltaItem](stepFunction: DataSet[T] => (DataSet[T], DataSet[DeltaItem])) =
-    macro IterateMacros.iterateWithDelta[T, DeltaItem]
-  def iterate(n: Int, stepFunction: DataSet[T] => DataSet[T])= macro IterateMacros.iterate[T]
-  def iterateWithTermination[C](
-      n: Int,
-      stepFunction: DataSet[T] => DataSet[T],
-      terminationFunction: (DataSet[T],DataSet[T]) => DataSet[C]) =
-    macro IterateMacros.iterateWithTermination[T, C]
-  def iterateWithDelta[SolutionKey, WorksetItem](
-      workset: DataSet[WorksetItem],
-      solutionSetKey: T => SolutionKey,
-      stepFunction: (DataSet[T], DataSet[WorksetItem]) =>
-                    (DataSet[T], DataSet[WorksetItem]), maxIterations: Int) =
-    macro WorksetIterateMacros.iterateWithDelta[T, SolutionKey, WorksetItem]
-  
-  def write(url: String, format: ScalaOutputFormat[T]) = DataSinkOperator.write(this, url, format)
-  def write(url: String, format: ScalaOutputFormat[T], name: String) =
-    DataSinkOperator.write(this, url, format, name)
-  
+import org.apache.commons.lang3.Validate
+import org.apache.flink.api.common.aggregators.Aggregator
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.io.{PrintingOutputFormat, TextOutputFormat}
+import org.apache.flink.api.java.operators.JoinOperator.JoinHint
+import org.apache.flink.api.java.operators.Keys.FieldPositionKeys
+import org.apache.flink.api.java.operators._
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+import org.apache.flink.api.scala.operators.{ScalaCsvOutputFormat, ScalaAggregateOperator}
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.core.fs.{FileSystem, Path}
+import org.apache.flink.types.TypeInformation
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+/**
+ * The DataSet, the basic abstraction of Flink. This represents a collection of elements of a
+ * specific type `T`. The operations in this class can be used to create new DataSets and to combine
+ * two DataSets. The methods of [[ExecutionEnvironment]] can be used to create a DataSet from an
+ * external source, such as files in HDFS. The `write*` methods can be used to write the elements
+ * to storage.
+ *
+ * All operations accept either a lambda function or an operation-specific function object for
+ * specifying the operation. For example, using a lambda:
+ * {{{
+ *   val input: DataSet[String] = ...
+ *   val mapped = input flatMap { _.split(" ") }
+ * }}}
+ * And using a `MapFunction`:
+ * {{{
+ *   val input: DataSet[String] = ...
+ *   val mapped = input flatMap { new FlatMapFunction[String, String] {
+ *     def flatMap(in: String, out: Collector[String]): Unit = {
+ *       in.split(" ") foreach { out.collect(_) }
+ *     }
+ *   }
+ * }}}
+ *
+ * A rich function can be used when more control is required, for example for accessing the
+ * `RuntimeContext`. The rich function for `flatMap` is `RichFlatMapFunction`, all other functions
+ * are named similarly. All functions are available in package
+ * `org.apache.flink.api.common.functions`.
+ *
+ * The elements are partitioned depending on the degree of parallelism of the
+ * [[ExecutionEnvironment]] or of one specific DataSet.
+ *
+ * Most of the operations have an implicit [[TypeInformation]] parameter. This is supplied by
+ * an implicit conversion in the `flink.api.scala` Package. For this to work,
+ * [[createTypeInformation]] needs to be imported. This is normally achieved with a
+ * {{{
+ *   import org.apache.flink.api.scala._
+ * }}}
+ *
+ * @tparam T The type of the DataSet, i.e., the type of the elements of the DataSet.
+ */
+class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) {
+  Validate.notNull(set, "Java DataSet must not be null.")
+
+  // --------------------------------------------------------------------------------------------
+  //  General methods
+  // --------------------------------------------------------------------------------------------
+  // These are actually implemented in subclasses of the Java DataSet but we perform checking
+  // here and just pass through the calls to make everything much simpler.
+
+  /**
+   * Sets the name of the DataSet. This will appear in logs and graphical
+   * representations of the execution graph.
+   */
+  def name(name: String) = {
+    set match {
+      case ds: DataSource[_] => ds.name(name)
+      case op: Operator[_, _] => op.name(name)
+      case di: DeltaIterationResultSet[_, _] => di.getIterationHead.name(name)
+      case _ =>
+        throw new UnsupportedOperationException("Operator " + set.toString + " cannot have a name.")
+    }
+    // return this for chaining methods calls
+    this
+  }
+
+  /**
+   * Sets the degree of parallelism of this operation. This must be greater than 1.
+   */
+  def setParallelism(dop: Int) = {
+    set match {
+      case ds: DataSource[_] => ds.setParallelism(dop)
+      case op: Operator[_, _] => op.setParallelism(dop)
+      case di: DeltaIterationResultSet[_, _] => di.getIterationHead.parallelism(dop)
+      case _ =>
+        throw new UnsupportedOperationException("Operator " + set.toString + " cannot have " +
+          "parallelism.")
+    }
+    this
+  }
+
+  /**
+   * Returns the degree of parallelism of this operation.
+   */
+  def getParallelism: Int = set match {
+    case ds: DataSource[_] => ds.getParallelism
+    case op: Operator[_, _] => op.getParallelism
+    case _ =>
+      throw new UnsupportedOperationException("Operator " + set.toString + " does not have " +
+        "parallelism.")
+  }
+
+  /**
+   * Registers an [[org.apache.flink.api.common.aggregators.Aggregator]]
+   * for the iteration. Aggregators can be used to maintain simple statistics during the
+   * iteration, such as number of elements processed. The aggregators compute global aggregates:
+   * After each iteration step, the values are globally aggregated to produce one aggregate that
+   * represents statistics across all parallel instances.
+   * The value of an aggregator can be accessed in the next iteration.
+   *
+   * Aggregators can be accessed inside a function via
+   * [[org.apache.flink.api.common.functions.AbstractRichFunction#getIterationRuntimeContext]].
+   *
+   * @param name The name under which the aggregator is registered.
+   * @param aggregator The aggregator class.
+   */
+  def registerAggregator(name: String, aggregator: Aggregator[_]): DataSet[T] = {
+    set match {
+      case di: DeltaIterationResultSet[_, _] =>
+        di.getIterationHead.registerAggregator(name, aggregator)
+      case _ =>
+        throw new UnsupportedOperationException("Operator " + set.toString + " cannot have " +
+          "aggregators.")
+    }
+    this
+  }
+
+  /**
+   * Adds a certain data set as a broadcast set to this operator. Broadcast data sets are
+   * available at all
+   * parallel instances of this operator. A broadcast data set is registered under a certain
+   * name, and can be
+   * retrieved under that name from the operators runtime context via
+   * `org.apache.flink.api.common.functions.RuntimeContext.getBroadCastVariable(String)`
+   *
+   * The runtime context itself is available in all UDFs via
+   * `org.apache.flink.api.common.functions.AbstractRichFunction#getRuntimeContext()`
+   *
+   * @param data The data set to be broadcasted.
+   * @param name The name under which the broadcast data set retrieved.
+   * @return The operator itself, to allow chaining function calls.
+   */
+  def withBroadcastSet(data: DataSet[_], name: String) = {
+    set match {
+      case udfOp: UdfOperator[_] => udfOp.withBroadcastSet(data.set, name)
+      case _ =>
+        throw new UnsupportedOperationException("Operator " + set.toString + " cannot have " +
+          "broadcast variables.")
+    }
+    this
+  }
+
+  def withConstantSet(constantSets: String*) = {
+    set match {
+      case op: SingleInputUdfOperator[_, _, _] => op.withConstantSet(constantSets: _*)
+      case _ =>
+        throw new UnsupportedOperationException("Cannot specify constant sets on Operator " +
+          set.toString + ".")
+    }
+    this
+  }
+
+  def withConstantSetFirst(constantSets: String*) = {
+    set match {
+      case op: TwoInputUdfOperator[_, _, _, _] => op.withConstantSetFirst(constantSets: _*)
+      case _ =>
+        throw new UnsupportedOperationException("Cannot specify constant sets on Operator " + set
+          .toString + ".")
+    }
+    this
+  }
+
+  def withConstantSetSecond(constantSets: String*) = {
+    set match {
+      case op: TwoInputUdfOperator[_, _, _, _] => op.withConstantSetSecond(constantSets: _*)
+      case _ =>
+        throw new UnsupportedOperationException("Cannot specify constant sets on Operator " + set
+          .toString + ".")
+    }
+    this
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Filter & Transformations
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Creates a new DataSet by applying the given function to every element of this DataSet.
+   */
+  def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataSet[R] = {
+    if (mapper == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+    wrap(new MapOperator[T, R](set, implicitly[TypeInformation[R]], mapper))
+  }
+
+  /**
+   * Creates a new DataSet by applying the given function to every element of this DataSet.
+   */
+  def map[R: TypeInformation: ClassTag](fun: (T) => R): DataSet[R] = {
+    if (fun == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+    val mapper = new MapFunction[T, R] {
+      def map(in: T): R = fun(in)
+    }
+    wrap(new MapOperator[T, R](set, implicitly[TypeInformation[R]], mapper))
+  }
+
+  /**
+   * Creates a new DataSet by applying the given function to each parallel partition of the
+   * DataSet.
+   *
+   * This function is intended for operations that cannot transform individual elements and
+   * requires no grouping of elements. To transform individual elements,
+   * the use of [[map]] and [[flatMap]] is preferable.
+   */
+  def mapPartition[R: TypeInformation: ClassTag](
+      partitionMapper: MapPartitionFunction[T, R]): DataSet[R] = {
+    if (partitionMapper == null) {
+      throw new NullPointerException("MapPartition function must not be null.")
+    }
+    wrap(new MapPartitionOperator[T, R](set, implicitly[TypeInformation[R]], partitionMapper))
+  }
+
+  /**
+   * Creates a new DataSet by applying the given function to each parallel partition of the
+   * DataSet.
+   *
+   * This function is intended for operations that cannot transform individual elements and
+   * requires no grouping of elements. To transform individual elements,
+   * the use of [[map]] and [[flatMap]] is preferable.
+   */
+  def mapPartition[R: TypeInformation: ClassTag](
+      fun: (TraversableOnce[T], Collector[R]) => Unit): DataSet[R] = {
+    if (fun == null) {
+      throw new NullPointerException("MapPartition function must not be null.")
+    }
+    val partitionMapper = new MapPartitionFunction[T, R] {
+      def mapPartition(in: java.lang.Iterable[T], out: Collector[R]) {
+        fun(in.iterator().asScala, out)
+      }
+    }
+    wrap(new MapPartitionOperator[T, R](set, implicitly[TypeInformation[R]], partitionMapper))
+  }
+
+  /**
+   * Creates a new DataSet by applying the given function to each parallel partition of the
+   * DataSet.
+   *
+   * This function is intended for operations that cannot transform individual elements and
+   * requires no grouping of elements. To transform individual elements,
+   * the use of [[map]] and [[flatMap]] is preferable.
+   */
+  def mapPartition[R: TypeInformation: ClassTag](
+      fun: (TraversableOnce[T]) => TraversableOnce[R]): DataSet[R] = {
+    if (fun == null) {
+      throw new NullPointerException("MapPartition function must not be null.")
+    }
+    val partitionMapper = new MapPartitionFunction[T, R] {
+      def mapPartition(in: java.lang.Iterable[T], out: Collector[R]) {
+        fun(in.iterator().asScala) foreach out.collect
+      }
+    }
+    wrap(new MapPartitionOperator[T, R](set, implicitly[TypeInformation[R]], partitionMapper))
+  }
+
+  /**
+   * Creates a new DataSet by applying the given function to every element and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataSet[R] = {
+    if (flatMapper == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    wrap(new FlatMapOperator[T, R](set, implicitly[TypeInformation[R]], flatMapper))
+  }
+
+  /**
+   * Creates a new DataSet by applying the given function to every element and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataSet[R] = {
+    if (fun == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    val flatMapper = new FlatMapFunction[T, R] {
+      def flatMap(in: T, out: Collector[R]) { fun(in, out) }
+    }
+    wrap(new FlatMapOperator[T, R](set, implicitly[TypeInformation[R]], flatMapper))
+  }
+
+  /**
+   * Creates a new DataSet by applying the given function to every element and flattening
+   * the results.
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun: (T) => TraversableOnce[R]): DataSet[R] = {
+    if (fun == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    val flatMapper = new FlatMapFunction[T, R] {
+      def flatMap(in: T, out: Collector[R]) { fun(in) foreach out.collect }
+    }
+    wrap(new FlatMapOperator[T, R](set, implicitly[TypeInformation[R]], flatMapper))
+  }
+
+  /**
+   * Creates a new DataSet that contains only the elements satisfying the given filter predicate.
+   */
+  def filter(filter: FilterFunction[T]): DataSet[T] = {
+    if (filter == null) {
+      throw new NullPointerException("Filter function must not be null.")
+    }
+    wrap(new FilterOperator[T](set, filter))
+  }
+
+  /**
+   * Creates a new DataSet that contains only the elements satisfying the given filter predicate.
+   */
+  def filter(fun: (T) => Boolean): DataSet[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Filter function must not be null.")
+    }
+    val filter = new FilterFunction[T] {
+      def filter(in: T) = fun(in)
+    }
+    wrap(new FilterOperator[T](set, filter))
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Non-grouped aggregations
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Creates a new [[DataSet]] by aggregating the specified tuple field using the given aggregation
+   * function. Since this is not a keyed DataSet the aggregation will be performed on the whole
+   * collection of elements.
+   *
+   * This only works on Tuple DataSets.
+   */
+  def aggregate(agg: Aggregations, field: Int): DataSet[T] = set match {
+    case aggregation: ScalaAggregateOperator[T] =>
+      aggregation.and(agg, field)
+      wrap(aggregation)
+
+    case _ => wrap(new ScalaAggregateOperator[T](set, agg, field))
+  }
+
+  /**
+   * Syntactic sugar for [[aggregate]] with `SUM`
+   */
+  def sum(field: Int) = {
+    aggregate(Aggregations.SUM, field)
+  }
+
+  /**
+   * Syntactic sugar for [[aggregate]] with `MAX`
+   */
+  def max(field: Int) = {
+    aggregate(Aggregations.MAX, field)
+  }
+
+  /**
+   * Syntactic sugar for [[aggregate]] with `MIN`
+   */
+  def min(field: Int) = {
+    aggregate(Aggregations.MIN, field)
+  }
+
+  /**
+   * Creates a new [[DataSet]] by merging the elements of this DataSet using an associative reduce
+   * function.
+   */
+  def reduce(reducer: ReduceFunction[T]): DataSet[T] = {
+    if (reducer == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    wrap(new ReduceOperator[T](set, reducer))
+  }
+
+  /**
+   * Creates a new [[DataSet]] by merging the elements of this DataSet using an associative reduce
+   * function.
+   */
+  def reduce(fun: (T, T) => T): DataSet[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = { fun(v1, v2) }
+    }
+    wrap(new ReduceOperator[T](set, reducer))
+  }
+
+  /**
+   * Creates a new [[DataSet]] by passing all elements in this DataSet to the group reduce function.
+   * The function can output zero or more elements using the [[Collector]]. The concatenation of the
+   * emitted values will form the resulting [[DataSet]].
+   */
+  def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R] = {
+    if (reducer == null) {
+      throw new NullPointerException("GroupReduce function must not be null.")
+    }
+    wrap(new GroupReduceOperator[T, R](set, implicitly[TypeInformation[R]], reducer))
+  }
+
+  /**
+   * Creates a new [[DataSet]] by passing all elements in this DataSet to the group reduce function.
+   * The function can output zero or more elements using the [[Collector]]. The concatenation of the
+   * emitted values will form the resulting [[DataSet]].
+   */
+  def reduceGroup[R: TypeInformation: ClassTag](
+      fun: (TraversableOnce[T], Collector[R]) => Unit): DataSet[R] = {
+    if (fun == null) {
+      throw new NullPointerException("GroupReduce function must not be null.")
+    }
+    val reducer = new GroupReduceFunction[T, R] {
+      def reduce(in: java.lang.Iterable[T], out: Collector[R]) { fun(in.iterator().asScala, out) }
+    }
+    wrap(new GroupReduceOperator[T, R](set, implicitly[TypeInformation[R]], reducer))
+  }
+
+  /**
+   * Creates a new [[DataSet]] by passing all elements in this DataSet to the group reduce function.
+   */
+  def reduceGroup[R: TypeInformation: ClassTag](fun: (TraversableOnce[T]) => R): DataSet[R] = {
+    if (fun == null) {
+      throw new NullPointerException("GroupReduce function must not be null.")
+    }
+    val reducer = new GroupReduceFunction[T, R] {
+      def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
+        out.collect(fun(in.iterator().asScala))
+      }
+    }
+    wrap(new GroupReduceOperator[T, R](set, implicitly[TypeInformation[R]], reducer))
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  distinct
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether
+   * two elements are distinct or not is made using the return value of the given function.
+   */
+  def distinct[K: TypeInformation](fun: (T) => K): DataSet[T] = {
+    val keyExtractor = new KeySelector[T, K] {
+      def getKey(in: T) = fun(in)
+    }
+    wrap(new DistinctOperator[T](
+      set,
+      new Keys.SelectorFunctionKeys[T, K](
+        keyExtractor, set.getType, implicitly[TypeInformation[K]])))
+  }
+
+  /**
+   * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether
+   * two elements are distinct or not is made based on only the specified tuple fields.
+   *
+   * This only works if this DataSet contains Tuples.
+   */
+  def distinct(fields: Int*): DataSet[T] = {
+    wrap(new DistinctOperator[T](
+      set,
+      new Keys.FieldPositionKeys[T](fields.toArray, set.getType, true)))
+  }
+
+  /**
+   * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether
+   * two elements are distinct or not is made based on all tuple fields.
+   *
+   * This only works if this DataSet contains Tuples.
+   */
+  def distinct: DataSet[T] = {
+    wrap(new DistinctOperator[T](set, null))
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Keyed DataSet
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Creates a [[GroupedDataSet]] which provides operations on groups of elements. Elements are
+   * grouped based on the value returned by the given function.
+   *
+   * This will not create a new DataSet, it will just attach the key function which will be used
+   * for grouping when executing a grouped operation.
+   */
+  def groupBy[K: TypeInformation](fun: (T) => K): GroupedDataSet[T] = {
+    val keyType = implicitly[TypeInformation[K]]
+    val keyExtractor = new KeySelector[T, K] {
+      def getKey(in: T) = fun(in)
+    }
+    new GroupedDataSetImpl[T](set,
+      new Keys.SelectorFunctionKeys[T, K](keyExtractor, set.getType, keyType))
+  }
+
+  /**
+   * Creates a [[GroupedDataSet]] which provides operations on groups of elements. Elements are
+   * grouped based on the given tuple fields.
+   *
+   * This will not create a new DataSet, it will just attach the tuple field positions which will be
+   * used for grouping when executing a grouped operation.
+   *
+   * This only works on Tuple DataSets.
+   */
+  def groupBy(fields: Int*): GroupedDataSet[T] = {
+    new GroupedDataSetImpl[T](
+      set,
+      new Keys.FieldPositionKeys[T](fields.toArray, set.getType,false))
+  }
+
+  //	public UnsortedGrouping<T> groupBy(String... fields) {
+  //		new UnsortedGrouping<T>(this, new Keys.ExpressionKeys<T>(fields, getType()));
+  //	}
+
+  // --------------------------------------------------------------------------------------------
+  //  Joining
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Creates a new DataSet by joining `this` DataSet with the `other` DataSet. To specify the join
+   * keys the `where` and `isEqualTo` methods must be used. For example:
+   * {{{
+   *   val left: DataSet[(String, Int, Int)] = ...
+   *   val right: DataSet[(Int, String, Int)] = ...
+   *   val joined = left.join(right).where(0).isEqualTo(1)
+   * }}}
+   *
+   * The default join result is a DataSet with 2-Tuples of the joined values. In the above example
+   * that would be `((String, Int, Int), (Int, String, Int))`. A custom join function can be used
+   * if more control over the result is required. This can either be given as a lambda or a
+   * custom [[JoinFunction]]. For example:
+   * {{{
+   *   val left: DataSet[(String, Int, Int)] = ...
+   *   val right: DataSet[(Int, String, Int)] = ...
+   *   val joined = left.join(right).where(0).isEqualTo(1) { (l, r) =>
+   *     if (l._2 > 4) {
+   *       Some((l._2, r._3))
+   *     } else {
+   *       None
+   *     }
+   *   }
+   * }}}
+   * This can be used to implement a filter directly in the join or to output more than one values:
+   * {{{
+   *   val left: DataSet[(String, Int, Int)] = ...
+   *   val right: DataSet[(Int, String, Int)] = ...
+   *   val joined = left.join(right).where(0).isEqualTo(1) {
+   *     (l, r, out: Collector[(String, Int)]) =>
+   *       if (l._2 > 4) {
+   *         out.collect((l._1, r._3))
+   *         out.collect((l._1, r._1))
+   *       } else {
+   *         None
+   *       }
+   *     }
+   * }}}
+   */
+  def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
+    new UnfinishedJoinOperationImpl(this.set, other.set, JoinHint.OPTIMIZER_CHOOSES)
+
+  /**
+   * Special [[join]] operation for explicitly telling the system that the right side is assumed
+   * to be a lot smaller than the left side of the join.
+   */
+  def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
+    new UnfinishedJoinOperationImpl(this.set, other.set, JoinHint.BROADCAST_HASH_SECOND)
+
+  /**
+   * Special [[join]] operation for explicitly telling the system that the left side is assumed
+   * to be a lot smaller than the right side of the join.
+   */
+  def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
+    new UnfinishedJoinOperationImpl(this.set, other.set, JoinHint.BROADCAST_HASH_FIRST)
+
+  // --------------------------------------------------------------------------------------------
+  //  Co-Group
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * For each key in `this` DataSet and the `other` DataSet, create a tuple containing a list
+   * of elements for that key from both DataSets. To specify the join keys the `where` and
+   * `isEqualTo` methods must be used. For example:
+   * {{{
+   *   val left: DataSet[(String, Int, Int)] = ...
+   *   val right: DataSet[(Int, String, Int)] = ...
+   *   val coGrouped = left.coGroup(right).where(0).isEqualTo(1)
+   * }}}
+   *
+   * A custom coGroup function can be used
+   * if more control over the result is required. This can either be given as a lambda or a
+   * custom [[CoGroupFunction]]. For example:
+   * {{{
+   *   val left: DataSet[(String, Int, Int)] = ...
+   *   val right: DataSet[(Int, String, Int)] = ...
+   *   val coGrouped = left.coGroup(right).where(0).isEqualTo(1) { (l, r) =>
+   *     // l and r are of type TraversableOnce
+   *     Some((l.min, r.max))
+   *   }
+   * }}}
+   * This can be used to implement a filter directly in the coGroup or to output more than one
+   * values:
+   * {{{
+   *   val left: DataSet[(String, Int, Int)] = ...
+   *   val right: DataSet[(Int, String, Int)] = ...
+   *   val coGrouped = left.coGroup(right).where(0).isEqualTo(1) {
+   *     (l, r, out: Collector[(String, Int)]) =>
+   *       out.collect((l.min, r.max))
+   *       out.collect(l.max, r.min))
+   *     }
+   * }}}
+   */
+  def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O] =
+    new UnfinishedCoGroupOperationImpl(this.set, other.set)
+
+  // --------------------------------------------------------------------------------------------
+  //  Cross
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Creates a new DataSet by forming the cartesian product of `this` DataSet and the `other`
+   * DataSet.
+   *
+   * The default cross result is a DataSet with 2-Tuples of the combined values. A custom cross
+   * function can be used if more control over the result is required. This can either be given as
+   * a lambda or a custom [[CrossFunction]]. For example:
+   * {{{
+   *   val left: DataSet[(String, Int, Int)] = ...
+   *   val right: DataSet[(Int, String, Int)] = ...
+   *   val product = left.cross(right) { (l, r) => (l._2, r._3) }
+   *   }
+   * }}}
+   */
+  def cross[O](other: DataSet[O]): CrossDataSet[T, O] =
+    CrossDataSetImpl.createCrossOperator(this.set, other.set)
+
+  /**
+   * Special [[cross]] operation for explicitly telling the system that the right side is assumed
+   * to be a lot smaller than the left side of the cartesian product.
+   */
+  def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O] =
+    CrossDataSetImpl.createCrossOperator(this.set, other.set)
+
+  /**
+   * Special [[cross]] operation for explicitly telling the system that the left side is assumed
+   * to be a lot smaller than the right side of the cartesian product.
+   */
+  def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O] =
+    CrossDataSetImpl.createCrossOperator(this.set, other.set)
+
+  // --------------------------------------------------------------------------------------------
+  //  Iterations
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Creates a new DataSet by performing bulk iterations using the given step function. The
+   * iterations terminate when `maxIterations` iterations have been performed.
+   *
+   * For example:
+   * {{{
+   *   val input: DataSet[(String, Int)] = ...
+   *   val iterated = input.iterate(5) { previous =>
+   *     val next = previous.map { x => (x._1, x._2 + 1) }
+   *     next
+   *   }
+   * }}}
+   *
+   * This example will simply increase the second field of the tuple by 5.
+   */
+  def iterate(maxIterations: Int)(stepFunction: (DataSet[T]) => DataSet[T]): DataSet[T] = {
+    val iterativeSet =
+      new IterativeDataSet[T](set.getExecutionEnvironment, set.getType, set,maxIterations)
+
+    val resultSet = stepFunction(wrap(iterativeSet))
+    val result = iterativeSet.closeWith(resultSet.set)
+    wrap(result)
+  }
+
+  /**
+   * Creates a new DataSet by performing bulk iterations using the given step function. The first
+   * DataSet the step function returns is the input for the next iteration, the second DataSet is
+   * the termination criterion. The iterations terminate when either the termination criterion
+   * DataSet contains no elements or when `maxIterations` iterations have been performed.
+   *
+   *  For example:
+   * {{{
+   *   val input: DataSet[(String, Int)] = ...
+   *   val iterated = input.iterateWithTermination(5) { previous =>
+   *     val next = previous.map { x => (x._1, x._2 + 1) }
+   *     val term = next.filter { _._2 <  3 }
+   *     (next, term)
+   *   }
+   * }}}
+   *
+   * This example will simply increase the second field of the Tuples until they are no longer
+   * smaller than 3.
+   */
+  def iterateWithTermination(maxIterations: Int)(
+    stepFunction: (DataSet[T]) => (DataSet[T], DataSet[_])): DataSet[T] = {
+    val iterativeSet =
+      new IterativeDataSet[T](set.getExecutionEnvironment, set.getType, set,maxIterations)
+
+    val (resultSet, terminationCriterion) = stepFunction(wrap(iterativeSet))
+    val result = iterativeSet.closeWith(resultSet.set, terminationCriterion.set)
+    wrap(result)
+  }
+
+  /**
+   * Creates a new DataSet by performing delta (or workset) iterations using the given step
+   * function. At the beginning `this` DataSet is the solution set and `workset` is  the Workset.
+   * The iteration step function gets the current solution set and workset and must output the
+   * delta for the solution set and the workset for the next iteration.
+   *
+   * Note: The syntax of delta iterations are very likely going to change soon.
+   */
+  def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[Int])(
+      stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
+    val key = new FieldPositionKeys[T](keyFields, set.getType, false)
+    val iterativeSet = new DeltaIteration[T, R](
+      set.getExecutionEnvironment, set.getType, set, workset.set, key, maxIterations)
+    val (newSolution, newWorkset) = stepFunction(
+      wrap(iterativeSet.getSolutionSet),
+      wrap(iterativeSet.getWorkset))
+    val result = iterativeSet.closeWith(newSolution.set, newWorkset.set)
+    wrap(result)
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Union
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Creates a new DataSet containing the elements from both `this` DataSet and the `other`
+   * DataSet.
+   */
+  def union(other: DataSet[T]): DataSet[T] = wrap(new UnionOperator[T](set, other.set))
+
+  // --------------------------------------------------------------------------------------------
+  //  Result writing
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Writes `this` DataSet to the specified location. This uses [[AnyRef.toString]] on
+   * each element.
+   */
+  def writeAsText(
+      filePath: String,
+      writeMode: FileSystem.WriteMode = WriteMode.NO_OVERWRITE): DataSink[T] = {
+    val tof: TextOutputFormat[T] = new TextOutputFormat[T](new Path(filePath))
+    tof.setWriteMode(writeMode)
+    output(tof)
+  }
+
+  /**
+   * Writes `this` DataSet to the specified location as a CSV file.
+   *
+   * This only works on Tuple DataSets. For individual tuple fields [[AnyRef.toString]] is used.
+   */
+  def writeAsCsv(
+      filePath: String,
+      rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
+      fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,
+      writeMode: FileSystem.WriteMode = WriteMode.NO_OVERWRITE): DataSink[T] = {
+    Validate.isTrue(set.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
+    val of = new ScalaCsvOutputFormat[Product](new Path(filePath), rowDelimiter, fieldDelimiter)
+    of.setWriteMode(writeMode)
+    output(of.asInstanceOf[OutputFormat[T]])
+  }
+
+  /**
+   * Writes `this` DataSet to the specified location using a custom
+   * [[org.apache.flink.api.common.io.FileOutputFormat]].
+   */
+  def write(
+      outputFormat: FileOutputFormat[T],
+      filePath: String,
+      writeMode: FileSystem.WriteMode = WriteMode.NO_OVERWRITE): DataSink[T] = {
+    Validate.notNull(filePath, "File path must not be null.")
+    Validate.notNull(outputFormat, "Output format must not be null.")
+    outputFormat.setOutputFilePath(new Path(filePath))
+    outputFormat.setWriteMode(writeMode)
+    output(outputFormat)
+  }
+
+  /**
+   * Emits `this` DataSet using a custom [[org.apache.flink.api.common.io.OutputFormat]].
+   */
+  def output(outputFormat: OutputFormat[T]): DataSink[T] = {
+    set.output(outputFormat)
+  }
+
+  /**
+   * Writes a DataSet to the standard output stream (stdout). This uses [[AnyRef.toString]] on
+   * each element.
+   */
+  def print(): DataSink[T] = {
+    output(new PrintingOutputFormat[T](false))
+  }
+
+  /**
+   * Writes a DataSet to the standard error stream (stderr).This uses [[AnyRef.toString]] on
+   * each element.
+   */
+  def printToErr(): DataSink[T] = {
+    output(new PrintingOutputFormat[T](true))
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSink.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSink.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSink.scala
deleted file mode 100644
index 35d0dd0..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSink.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import java.net.URI
-
-import org.apache.flink.api.scala.analysis._
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.types.Record
-import org.apache.flink.api.common.operators.base.GenericDataSinkBase
-import org.apache.flink.api.java.record.operators.FileDataSink
-import org.apache.flink.api.common.io.OutputFormat
-
-
-object DataSinkOperator {
-  val DEFAULT_DATASINKOPERATOR_NAME = "<Unnamed Scala Data Sink>"
-
-  def write[In](input: DataSet[In], url: String, format: ScalaOutputFormat[In],
-                name: String = DEFAULT_DATASINKOPERATOR_NAME): ScalaSink[In]
-  = {
-    val uri = getUri(url)
-
-    val ret = uri.getScheme match {
-      case "file" | "hdfs" => new FileDataSink(format.asInstanceOf[FileOutputFormat[Record]], uri.toString,
-        input.contract, name) with ScalaOutputOperator[In] {
-
-        def getUDF = format.getUDF
-        override def persistConfiguration() = format.persistConfiguration(this.getParameters())
-      }
-    }
-    new ScalaSink(ret)
-  }
-
-  private def getUri(url: String) = {
-    val uri = new URI(url)
-    if (uri.getScheme == null)
-      new URI("file://" + url)
-    else
-      uri
-  }
-}
-
-class ScalaSink[In] private[scala] (private[scala] val sink: GenericDataSinkBase[Record])
-
-trait ScalaOutputFormat[In] { this: OutputFormat[_] =>
-  def getUDF: UDF1[In, Nothing]
-  def persistConfiguration(config: Configuration) = {}
-  def configure(config: Configuration)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSource.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSource.scala
deleted file mode 100644
index e7990a2..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSource.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import java.net.URI
-import collection.JavaConversions._
-import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.functions._
-import org.apache.flink.api.scala.analysis.UDF0
-import org.apache.flink.types._
-import org.apache.flink.types.parser._
-import org.apache.flink.api.java.record.operators.{CollectionDataSource => JavaCollectionDataSource, FileDataSource}
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.api.common.io.FileInputFormat
-import org.apache.flink.api.java.record.operators.{CollectionDataSource => JavaCollectionDataSource}
-import org.apache.flink.api.common.io.InputFormat
-import org.apache.flink.api.scala.operators.TextInputFormat
-
-
-
-object DataSource {
-
-  def apply[Out](url: String, format: ScalaInputFormat[Out]): DataSet[Out] with OutputHintable[Out] = {
-    val uri = getUri(url)
-    
-    val ret = uri.getScheme match {
-
-      case "file" | "hdfs" => new FileDataSource(format.asInstanceOf[FileInputFormat[Record]], uri.toString)
-          with ScalaOperator[Out, Record] {
-
-        override def getUDF = format.getUDF
-
-        override def persistConfiguration() = format.persistConfiguration(this.getParameters)
-      }
-
-//      case "ext" => new GenericDataSource[GenericInputFormat[_]](format.asInstanceOf[GenericInputFormat[_]], uri.toString)
-//          with ScalaOperator[Out] {
-//
-//        override def getUDF = format.getUDF
-//        override def persistConfiguration() = format.persistConfiguration(this.getParameters())
-//      }
-    }
-    
-    new DataSet[Out](ret) with OutputHintable[Out] {}
-  }
-
-  private def getUri(url: String) = {
-    val uri = new URI(url)
-    if (uri.getScheme == null)
-      new URI("file://" + url)
-    else
-      uri
-  }
-}
-
-object CollectionDataSource {
-  /*
-  constructor for collection input
-   */
-  def apply[Out: UDT](data: Iterable[Out]):DataSet[Out] with OutputHintable[Out] = {
-    /*
-    reuse the java implementation of collection data by adding scala operator
-    */
-    val js:java.util.Collection[Out] = data
-    val ret = new JavaCollectionDataSource(js)
-    	with ScalaOperator[Out, Record]{
-       
-       val udf = new UDF0(implicitly[UDT[Out]])
-       override def getUDF = udf
-
-    }
-    
-    new DataSet[Out](ret) with OutputHintable[Out] {}
-  }
-  
-  /*
-  constructor for serializable iterator input
-   */
-  def apply[Out: UDT](data: Iterator[Out] with Serializable) = {
-
-    /*
-    reuse the java implementation of collection data by adding scala operator
-     */
-    val ret = new JavaCollectionDataSource(data)
-    	with ScalaOperator[Out, Record]{
-       
-       val udf = new UDF0(implicitly[UDT[Out]])
-       override def getUDF = udf
-
-    }
-    
-    new DataSet[Out](ret) with OutputHintable[Out] {}
-  }
-}
-
-
-
-trait ScalaInputFormat[Out] { this: InputFormat[_, _] =>
-  def getUDF: UDF0[Out]
-  def persistConfiguration(config: Configuration) = {}
-  def configure(config: Configuration)
-}
-
-
-object TextFile {
-  def apply(url: String): DataSet[String] with OutputHintable[String] = DataSource(url, TextInputFormat())
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
new file mode 100644
index 0000000..6f44e68
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import java.util.UUID
+
+import org.apache.commons.lang3.Validate
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.java.io._
+import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, BasicTypeInfo}
+import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
+import org.apache.flink.core.fs.Path
+
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv}
+import org.apache.flink.api.common.io.{InputFormat, FileInputFormat}
+
+import org.apache.flink.api.java.operators.DataSource
+import org.apache.flink.types.TypeInformation
+import org.apache.flink.util.{NumberSequenceIterator, SplittableIterator}
+
+import scala.collection.JavaConverters._
+
+import scala.reflect.ClassTag
+
+/**
+ * The ExecutionEnviroment is the context in which a program is executed. A local environment will
+ * cause execution in the current JVM, a remote environment will cause execution on a remote
+ * cluster installation.
+ *
+ * The environment provides methods to control the job execution (such as setting the parallelism)
+ * and to interact with the outside world (data access).
+ *
+ * To get an execution environment use the methods on the companion object:
+ *
+ *  - [[ExecutionEnvironment.getExecutionEnvironment]]
+ *  - [[ExecutionEnvironment.createLocalEnvironment]]
+ *  - [[ExecutionEnvironment.createRemoteEnvironment]]
+ *
+ *  Use [[ExecutionEnvironment.getExecutionEnvironment]] to get the correct environment depending
+ *  on where the program is executed. If it is run inside an IDE a loca environment will be
+ *  created. If the program is submitted to a cluster a remote execution environment will
+ *  be created.
+ */
+class ExecutionEnvironment(javaEnv: JavaEnv) {
+
+  /**
+   * Sets the degree of parallelism (DOP) for operations executed through this environment.
+   * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
+   * x parallel instances. This value can be overridden by specific operations using
+   * [[DataSet.setParallelism]].
+   */
+  def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
+    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+  }
+
+  /**
+   * Returns the default degree of parallelism for this execution environment. Note that this
+   * value can be overridden by individual operations using [[DataSet.setParallelism]]
+   */
+  def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+
+  /**
+   * Gets the UUID by which this environment is identified. The UUID sets the execution context
+   * in the cluster or local environment.
+   */
+  def getId: UUID = {
+    javaEnv.getId
+  }
+
+  /**
+   * Gets the UUID by which this environment is identified, as a string.
+   */
+  def getIdString: String = {
+    javaEnv.getIdString
+  }
+
+  /**
+   * Creates a DataSet of Strings produced by reading the given file line wise.
+   *
+   * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
+   *                 "hdfs://host:port/file/path").
+   * @param charsetName The name of the character set used to read the file. Default is UTF-0
+   */
+  def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String] = {
+    Validate.notNull(filePath, "The file path may not be null.")
+    val format = new TextInputFormat(new Path(filePath))
+    format.setCharsetName(charsetName)
+    val source = new DataSource[String](javaEnv, format, BasicTypeInfo.STRING_TYPE_INFO)
+    wrap(source)
+  }
+
+  /**
+   * Creates a DataSet by reading the given CSV file. The type parameter must be used to specify
+   * a Tuple type that has the same number of fields as there are fields in the CSV file. If the
+   * number of fields in the CSV file is not the same, the `includedFields` parameter can be used
+   * to only read specific fields.
+   *
+   * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
+   *                 "hdfs://host:port/file/path").   * @param lineDelimiter
+   * @param lineDelimiter The string that separates lines, defaults to newline.
+   * @param fieldDelimiter The char that separates individual fields, defaults to ','.
+   * @param ignoreFirstLine Whether the first line in the file should be ignored.
+   * @param lenient Whether the parser should silently ignore malformed lines.
+   * @param includedFields The fields in the file that should be read. Per default all fields
+   *                       are read.
+   */
+  def readCsvFile[T <: Product : ClassTag : TypeInformation](
+      filePath: String,
+      lineDelimiter: String = "\n",
+      fieldDelimiter: Char = ',',
+      ignoreFirstLine: Boolean = false,
+      lenient: Boolean = false,
+      includedFields: Array[Int] = null): DataSet[T] = {
+
+    val typeInfo = implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]]
+
+    val inputFormat = new ScalaCsvInputFormat[T](new Path(filePath), typeInfo)
+    inputFormat.setDelimiter(lineDelimiter)
+    inputFormat.setFieldDelimiter(fieldDelimiter)
+    inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
+    inputFormat.setLenient(lenient)
+
+    val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity)
+    for (i <- 0 until typeInfo.getArity) {
+      classes(i) = typeInfo.getTypeAt(i).getTypeClass
+    }
+    if (includedFields != null) {
+      Validate.isTrue(typeInfo.getArity == includedFields.length, "Number of tuple fields and" +
+        " included fields must match.")
+      inputFormat.setFields(includedFields, classes)
+    } else {
+      inputFormat.setFieldTypes(classes)
+    }
+
+    wrap(new DataSource[T](javaEnv, inputFormat, typeInfo))
+  }
+
+  /**
+   * Creates a new DataSource by reading the specified file using the custom
+   * [[org.apache.flink.api.common.io.FileInputFormat]].
+   */
+  def readFile[T : ClassTag : TypeInformation](
+      inputFormat: FileInputFormat[T],
+      filePath: String): DataSet[T] = {
+    Validate.notNull(inputFormat, "InputFormat must not be null.")
+    Validate.notNull(filePath, "File path must not be null.")
+    inputFormat.setFilePath(new Path(filePath))
+    createInput(inputFormat, implicitly[TypeInformation[T]])
+  }
+
+  /**
+   * Generic method to create an input DataSet with an
+   * [[org.apache.flink.api.common.io.InputFormat]].
+   */
+  def createInput[T : ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T] = {
+    if (inputFormat == null) {
+      throw new IllegalArgumentException("InputFormat must not be null.")
+    }
+    createInput(inputFormat, implicitly[TypeInformation[T]])
+  }
+
+  /**
+   * Generic method to create an input DataSet with an
+   * [[org.apache.flink.api.common.io.InputFormat]].
+   */
+  private def createInput[T: ClassTag](
+      inputFormat: InputFormat[T, _],
+      producedType: TypeInformation[T]): DataSet[T] = {
+    if (inputFormat == null) {
+      throw new IllegalArgumentException("InputFormat must not be null.")
+    }
+    Validate.notNull(producedType, "Produced type must not be null")
+    wrap(new DataSource[T](javaEnv, inputFormat, producedType))
+  }
+
+  /**
+   * Creates a DataSet from the given non-empty [[Seq]]. The elements need to be serializable
+   * because the framework may move the elements into the cluster if needed.
+   *
+   * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a degree of parallelism of one.
+   */
+  def fromCollection[T: ClassTag : TypeInformation](
+      data: Seq[T]): DataSet[T] = {
+    Validate.notNull(data, "Data must not be null.")
+
+    val typeInfo = implicitly[TypeInformation[T]]
+    CollectionInputFormat.checkCollection(data.asJavaCollection, typeInfo.getTypeClass)
+    val dataSource = new DataSource[T](
+      javaEnv,
+      new CollectionInputFormat[T](data.asJavaCollection, typeInfo.createSerializer),
+      typeInfo)
+    wrap(dataSource)
+  }
+
+  /**
+   * Creates a DataSet from the given [[Iterator]]. The iterator must be serializable because the
+   * framework might move into the cluster if needed.
+   *
+   * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a degree of parallelism of one.
+   */
+  def fromCollection[T: ClassTag : TypeInformation] (
+    data: Iterator[T]): DataSet[T] = {
+    Validate.notNull(data, "Data must not be null.")
+
+    val typeInfo = implicitly[TypeInformation[T]]
+    val dataSource = new DataSource[T](
+      javaEnv,
+      new IteratorInputFormat[T](data.asJava),
+      typeInfo)
+    wrap(dataSource)
+  }
+
+  /**
+   * Creates a new data set that contains the given elements. The elements must all be of the
+   * same type and must be serializable.
+   *
+   * * Note that this operation will result in a non-parallel data source, i.e. a data source with
+   * a degree of parallelism of one.
+   */
+  def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T] = {
+    Validate.notNull(data, "Data must not be null.")
+    val typeInfo = implicitly[TypeInformation[T]]
+    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
+  }
+
+  /**
+   * Creates a new data set that contains elements in the iterator. The iterator is splittable,
+   * allowing the framework to create a parallel data source that returns the elements in the
+   * iterator. The iterator must be serializable because the execution environment may ship the
+   * elements into the cluster.
+   */
+  def fromParallelCollection[T: ClassTag : TypeInformation](
+      iterator: SplittableIterator[T]): DataSet[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    wrap(new DataSource[T](javaEnv, new ParallelIteratorInputFormat[T](iterator), typeInfo))
+  }
+
+  /**
+   * Creates a new data set that contains a sequence of numbers. The data set will be created in
+   * parallel, so there is no guarantee about the oder of the elements.
+   *
+   * @param from The number to start at (inclusive).
+   * @param to The number to stop at (inclusive).
+   */
+  def generateSequence(from: Long, to: Long): DataSet[Long] = {
+    val iterator = new NumberSequenceIterator(from, to)
+    val source = new DataSource(
+      javaEnv,
+      new ParallelIteratorInputFormat[java.lang.Long](iterator),
+      BasicTypeInfo.LONG_TYPE_INFO)
+    wrap(source).asInstanceOf[DataSet[Long]]
+  }
+
+  /**
+   * Registers a file at the distributed cache under the given name. The file will be accessible
+   * from any user-defined function in the (distributed) runtime under a local path. Files
+   * may be local files (as long as all relevant workers have access to it),
+   * or files in a distributed file system.
+   * The runtime will copy the files temporarily to a local cache, if needed.
+   *
+   * The [[org.apache.flink.api.common.functions.RuntimeContext]] can be obtained inside UDFs
+   * via
+   * [[org.apache.flink.api.common.functions.RichFunction#getRuntimeContext]] and provides
+   * access via
+   * [[org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache]]
+   *
+   * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
+   *                 "hdfs://host:port/and/path")
+   * @param name The name under which the file is registered.
+   * @param executable Flag indicating whether the file should be executable
+   */
+  def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit = {
+    javaEnv.registerCachedFile(filePath, name, executable)
+  }
+
+  /**
+   * Triggers the program execution. The environment will execute all parts of the program that have
+   * resulted in a "sink" operation. Sink operations are for example printing results
+   * [[DataSet.print]], writing results (e.g. [[DataSet.writeAsText]], [[DataSet.write]], or other
+   * generic data sinks created with [[DataSet.output]].
+   *
+   * The program execution will be logged and displayed with a generated default name.
+   *
+   * @return The result of the job execution, containing elapsed time and accumulators.
+   */
+  def execute(): JobExecutionResult = {
+    javaEnv.execute()
+  }
+
+  /**
+   * Triggers the program execution. The environment will execute all parts of the program that have
+   * resulted in a "sink" operation. Sink operations are for example printing results
+   * [[DataSet.print]], writing results (e.g. [[DataSet.writeAsText]], [[DataSet.write]], or other
+   * generic data sinks created with [[DataSet.output]].
+   *
+   * The program execution will be logged and displayed with the given name.
+   *
+   * @return The result of the job execution, containing elapsed time and accumulators.
+   */
+  def execute(jobName: String): JobExecutionResult = {
+    javaEnv.execute(jobName)
+  }
+
+  /**
+   * Creates the plan with which the system will execute the program, and returns it as  a String
+   * using a JSON representation of the execution data flow graph.
+   */
+  def getExecutionPlan() = {
+    javaEnv.getExecutionPlan
+  }
+
+  /**
+   * Creates the program's [[org.apache.flink.api.common.Plan]].
+   * The plan is a description of all data sources, data sinks,
+   * and operations and how they interact, as an isolated unit that can be executed with a
+   * [[org.apache.flink.api.common.PlanExecutor]]. Obtaining a plan and starting it with an
+   * executor is an alternative way to run a program and is only possible if the program only
+   * consists of distributed operations.
+   */
+  def createProgramPlan(jobName: String = "") = {
+    if (jobName.isEmpty) {
+      javaEnv.createProgramPlan()
+    } else
+      javaEnv.createProgramPlan(jobName)
+  }
+}
+
+object ExecutionEnvironment {
+
+  /**
+   * Creates an execution environment that represents the context in which the program is
+   * currently executed. If the program is invoked standalone, this method returns a local
+   * execution environment. If the program is invoked from within the command line client
+   * to be submitted to a cluster, this method returns the execution environment of this cluster.
+   */
+  def getExecutionEnvironment: ExecutionEnvironment = {
+    new ExecutionEnvironment(JavaEnv.getExecutionEnvironment)
+  }
+
+  /**
+   * Creates a local execution environment. The local execution environment will run the program in
+   * a multi-threaded fashion in the same JVM as the environment was created in. The default degree
+   * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
+   */
+  def createLocalEnvironment(
+      degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()) : ExecutionEnvironment = {
+    val javaEnv = JavaEnv.createLocalEnvironment()
+    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+    new ExecutionEnvironment(javaEnv)
+  }
+
+  /**
+   * Creates a remote execution environment. The remote environment sends (parts of) the program to
+   * a cluster for execution. Note that all file paths used in the program must be accessible from
+   * the cluster. The execution will use the cluster's default degree of parallelism, unless the
+   * parallelism is set explicitly via [[ExecutionEnvironment.setDegreeOfParallelism()]].
+   *
+   * @param host The host name or address of the master (JobManager),
+   *             where the program should be executed.
+   * @param port The port of the master (JobManager), where the program should be executed.
+   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
+   *                 program uses
+   *                 user-defined functions, user-defined input formats, or any libraries,
+   *                 those must be
+   *                 provided in the JAR files.
+   */
+  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment = {
+    new ExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*))
+  }
+
+  /**
+   * Creates a remote execution environment. The remote environment sends (parts of) the program
+   * to a cluster for execution. Note that all file paths used in the program must be accessible
+   * from the cluster. The execution will use the specified degree of parallelism.
+   *
+   * @param host The host name or address of the master (JobManager),
+   *             where the program should be executed.
+   * @param port The port of the master (JobManager), where the program should be executed.
+   * @param degreeOfParallelism The degree of parallelism to use during the execution.
+   * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
+   *                 program uses
+   *                 user-defined functions, user-defined input formats, or any libraries,
+   *                 those must be
+   *                 provided in the JAR files.
+   */
+  def createRemoteEnvironment(
+      host: String,
+      port: Int,
+      degreeOfParallelism: Int,
+      jarFiles: String*): ExecutionEnvironment = {
+    val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
+    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+    new ExecutionEnvironment(javaEnv)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
new file mode 100644
index 0000000..dfd5cf0
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala.operators.ScalaAggregateOperator
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.Validate
+import org.apache.flink.api.common.functions.{GroupReduceFunction, ReduceFunction}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.java.operators._
+import org.apache.flink.types.TypeInformation
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+/**
+ * A [[DataSet]] to which a grouping key was added. Operations work on groups of elements with the
+ * same key (`aggregate`, `reduce`, and `reduceGroup`).
+ *
+ * A secondary sort order can be added with sortGroup, but this is only used when using one
+ * of the group-at-a-time operations, i.e. `reduceGroup`.
+ */
+trait GroupedDataSet[T] {
+
+  /**
+   * Adds a secondary sort key to this [[GroupedDataSet]]. This will only have an effect if you
+   * use one of the group-at-a-time, i.e. `reduceGroup`
+   */
+  def sortGroup(field: Int, order: Order): GroupedDataSet[T]
+
+  /**
+   * Creates a new [[DataSet]] by aggregating the specified tuple field using the given aggregation
+   * function. Since this is a keyed DataSet the aggregation will be performed on groups of
+   * tuples with the same key.
+   *
+   * This only works on Tuple DataSets.
+   */
+  def aggregate(agg: Aggregations, field: Int): DataSet[T]
+
+  /**
+   * Syntactic sugar for [[aggregate]] with `SUM`
+   */
+  def sum(field: Int): DataSet[T]
+
+  /**
+   * Syntactic sugar for [[aggregate]] with `MAX`
+   */
+  def max(field: Int): DataSet[T]
+
+  /**
+   * Syntactic sugar for [[aggregate]] with `MIN`
+   */
+  def min(field: Int): DataSet[T]
+
+  /**
+   * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key)
+   * using an associative reduce function.
+   */
+  def reduce(fun: (T, T) => T): DataSet[T]
+
+  /**
+   * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key)
+   * using an associative reduce function.
+   */
+  def reduce(reducer: ReduceFunction[T]): DataSet[T]
+
+  /**
+   * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+   * of elements to the group reduce function. The function must output one element. The
+   * concatenation of those will form the resulting [[DataSet]].
+   */
+  def reduceGroup[R: TypeInformation: ClassTag](fun: (TraversableOnce[T]) => R): DataSet[R]
+
+  /**
+   * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+   * of elements to the group reduce function. The function can output zero or more elements using
+   * the [[Collector]]. The concatenation of the emitted values will form the resulting [[DataSet]].
+   */
+  def reduceGroup[R: TypeInformation: ClassTag](
+      fun: (TraversableOnce[T], Collector[R]) => Unit): DataSet[R]
+
+  /**
+   * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+   * of elements to the [[GroupReduceFunction]]. The function can output zero or more elements. The
+   * concatenation of the emitted values will form the resulting [[DataSet]].
+   */
+  def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R]
+}
+
+/**
+ * /**
+ * Private implementation for [[GroupedDataSet]] to keep the implementation details, i.e. the
+ * parameters of the constructor, hidden.
+ */
+ */
+private[flink] class GroupedDataSetImpl[T: ClassTag](
+    private val set: JavaDataSet[T],
+    private val keys: Keys[T])
+  extends GroupedDataSet[T] {
+
+  // These are for optional secondary sort. They are only used
+  // when using a group-at-a-time reduce function.
+  private val groupSortKeyPositions = mutable.MutableList[Int]()
+  private val groupSortOrders = mutable.MutableList[Order]()
+
+  /**
+   * Adds a secondary sort key to this [[GroupedDataSet]]. This will only have an effect if you
+   * use one of the group-at-a-time, i.e. `reduceGroup`
+   */
+  def sortGroup(field: Int, order: Order): GroupedDataSet[T] = {
+    if (!set.getType.isTupleType) {
+      throw new InvalidProgramException("Specifying order keys via field positions is only valid " +
+        "for tuple data types")
+    }
+    if (field >= set.getType.getArity) {
+      throw new IllegalArgumentException("Order key out of tuple bounds.")
+    }
+    groupSortKeyPositions += field
+    groupSortOrders += order
+    this
+  }
+
+  /**
+   * Creates a [[SortedGrouping]] if any secondary sort fields were specified. Otherwise, just
+   * create an [[UnsortedGrouping]].
+   */
+  private def maybeCreateSortedGrouping(): Grouping[T] = {
+    if (groupSortKeyPositions.length > 0) {
+      val grouping = new SortedGrouping[T](set, keys, groupSortKeyPositions(0), groupSortOrders(0))
+      // now manually add the rest of the keys
+      for (i <- 1 until groupSortKeyPositions.length) {
+        grouping.sortGroup(groupSortKeyPositions(i), groupSortOrders(i))
+      }
+      grouping
+    } else {
+      new UnsortedGrouping[T](set, keys)
+    }
+  }
+
+  /** Convenience methods for creating the [[UnsortedGrouping]] */
+  private def createUnsortedGrouping(): Grouping[T] = new UnsortedGrouping[T](set, keys)
+
+  /**
+   * Creates a new [[DataSet]] by aggregating the specified tuple field using the given aggregation
+   * function. Since this is a keyed DataSet the aggregation will be performed on groups of
+   * tuples with the same key.
+   *
+   * This only works on Tuple DataSets.
+   */
+  def aggregate(agg: Aggregations, field: Int): DataSet[T] = set match {
+    case aggregation: ScalaAggregateOperator[T] =>
+      aggregation.and(agg, field)
+      wrap(aggregation)
+
+    case _ => wrap(new ScalaAggregateOperator[T](createUnsortedGrouping(), agg, field))
+  }
+
+  /**
+   * Syntactic sugar for [[aggregate]] with `SUM`
+   */
+  def sum(field: Int): DataSet[T] = {
+    aggregate(Aggregations.SUM, field)
+  }
+
+  /**
+   * Syntactic sugar for [[aggregate]] with `MAX`
+   */
+  def max(field: Int): DataSet[T] = {
+    aggregate(Aggregations.MAX, field)
+  }
+
+  /**
+   * Syntactic sugar for [[aggregate]] with `MIN`
+   */
+  def min(field: Int): DataSet[T] = {
+    aggregate(Aggregations.MIN, field)
+  }
+
+  /**
+   * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key)
+   * using an associative reduce function.
+   */
+  def reduce(fun: (T, T) => T): DataSet[T] = {
+    Validate.notNull(fun, "Reduce function must not be null.")
+    val reducer = new ReduceFunction[T] {
+      def reduce(v1: T, v2: T) = {
+        fun(v1, v2)
+      }
+    }
+    wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer))
+  }
+
+  /**
+   * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key)
+   * using an associative reduce function.
+   */
+  def reduce(reducer: ReduceFunction[T]): DataSet[T] = {
+    Validate.notNull(reducer, "Reduce function must not be null.")
+    wrap(new ReduceOperator[T](createUnsortedGrouping(), reducer))
+  }
+
+  /**
+   * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+   * of elements to the group reduce function. The function must output one element. The
+   * concatenation of those will form the resulting [[DataSet]].
+   */
+  def reduceGroup[R: TypeInformation: ClassTag](
+                                                 fun: (TraversableOnce[T]) => R): DataSet[R] = {
+    Validate.notNull(fun, "Group reduce function must not be null.")
+    val reducer = new GroupReduceFunction[T, R] {
+      def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
+        out.collect(fun(in.iterator().asScala))
+      }
+    }
+    wrap(
+      new GroupReduceOperator[T, R](createUnsortedGrouping(),
+        implicitly[TypeInformation[R]], reducer))
+  }
+
+  /**
+   * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+   * of elements to the group reduce function. The function can output zero or more elements using
+   * the [[Collector]]. The concatenation of the emitted values will form the resulting [[DataSet]].
+   */
+  def reduceGroup[R: TypeInformation: ClassTag](
+                                                 fun: (TraversableOnce[T], Collector[R]) => Unit): DataSet[R] = {
+    Validate.notNull(fun, "Group reduce function must not be null.")
+    val reducer = new GroupReduceFunction[T, R] {
+      def reduce(in: java.lang.Iterable[T], out: Collector[R]) {
+        fun(in.iterator().asScala, out)
+      }
+    }
+    wrap(
+      new GroupReduceOperator[T, R](createUnsortedGrouping(),
+        implicitly[TypeInformation[R]], reducer))
+  }
+
+  /**
+   * Creates a new [[DataSet]] by passing for each group (elements with the same key) the list
+   * of elements to the [[GroupReduceFunction]]. The function can output zero or more elements. The
+   * concatenation of the emitted values will form the resulting [[DataSet]].
+   */
+  def reduceGroup[R: TypeInformation: ClassTag](reducer: GroupReduceFunction[T, R]): DataSet[R] = {
+    Validate.notNull(reducer, "GroupReduce function must not be null.")
+    wrap(
+      new GroupReduceOperator[T, R](maybeCreateSortedGrouping(),
+        implicitly[TypeInformation[R]], reducer))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaOperator.scala
deleted file mode 100644
index 1a62d31..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaOperator.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import java.lang.annotation.Annotation
-
-import org.apache.flink.api.scala.analysis.UDF
-import org.apache.flink.api.scala.analysis.UDF0
-import org.apache.flink.api.scala.analysis.UDF1
-import org.apache.flink.api.scala.analysis.UDF2
-import org.apache.flink.api.scala.analysis.FieldSelector
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.compiler.dag.OptimizerNode
-import org.apache.flink.api.common.operators.AbstractUdfOperator
-import org.apache.flink.api.common.operators.Union
-import org.apache.flink.types.Record
-import org.apache.flink.types.{Nothing => JavaNothing}
-
-trait ScalaOperator[T, UT] {
-  this: Operator[UT] =>
-  def getUDF: UDF[T]
-  def getKeys: Seq[FieldSelector] = Seq()
-  def persistConfiguration(): Unit = {}
-  
-  var persistHints: () => Unit = { () => }
-  
-  def persistConfiguration(optimizerNode: Option[OptimizerNode]): Unit = {
-
-    ScalaOperator.this match {
-      
-      case contract: AbstractUdfOperator[_, _] => {
-        for ((key, inputNum) <- getKeys.zipWithIndex) {
-          
-          val source = key.selectedFields.toSerializerIndexArray
-          val target = optimizerNode map { _.getRemappedKeys(inputNum) } getOrElse {
-            contract.getKeyColumns(inputNum) }
-
-          assert(source.length == target.length, "Attempt to write " + source.length +
-            " key indexes to an array of size " + target.length)
-          System.arraycopy(source, 0, target, 0, source.length)
-        }
-      }
-      
-      case _ if getKeys.size > 0 => throw new UnsupportedOperationException("Attempted to set " +
-        "keys on a contract that doesn't support them")
-      
-      case _ =>
-    }
-
-    persistHints()
-    persistConfiguration()
-  }
-  
-  protected def annotations: Seq[Annotation] = Seq()
-
-  def getUserCodeAnnotation[A <: Annotation](annotationClass: Class[A]): A = {
-    val res = annotations find { _.annotationType().equals(annotationClass) } map {
-      _.asInstanceOf[A] } getOrElse null.asInstanceOf[A]
-//    println("returning ANOOT: " + res + " FOR: " + annotationClass.toString)
-//    res match {
-//      case r : FunctionAnnotation.ConstantFieldsFirst => println("CONSTANT FIELDS FIRST: " +
-//        r.value().mkString(","))
-//      case r : FunctionAnnotation.ConstantFieldsSecond => println("CONSTANT FIELDS SECOND: " +
-//        r.value().mkString(","))
-//      case _ =>
-//    }
-    res
-  }
-}
-
-trait NoOpScalaOperator[In, Out] extends ScalaOperator[Out, Record] { this: Operator[Record] =>
-}
-
-trait HigherOrderScalaOperator[T] extends ScalaOperator[T, Record] { this: Operator[Record] =>
-  override def getUDF: UDF0[T]
-}
-
-trait BulkIterationScalaOperator[T] extends HigherOrderScalaOperator[T] { this: Operator[Record] =>
-}
-
-trait DeltaIterationScalaOperator[T] extends HigherOrderScalaOperator[T] {
-  this: Operator[Record] =>
-  val key: FieldSelector
-}
-
-trait ScalaOutputOperator[In] extends ScalaOperator[Nothing, JavaNothing] {
-  this: Operator[JavaNothing] =>
-  override def getUDF: UDF1[In, Nothing]
-}
-
-trait OneInputScalaOperator[In, Out] extends ScalaOperator[Out, Record] { this: Operator[Record] =>
-  override def getUDF: UDF1[In, Out]
-}
-
-trait TwoInputScalaOperator[In1, In2, Out] extends ScalaOperator[Out, Record] {
-  this: Operator[Record] =>
-  override def getUDF: UDF2[In1, In2, Out]
-}
-
-trait UnionScalaOperator[In] extends TwoInputScalaOperator[In, In, In] {
-  this: Union[Record] =>
-  override def getUDF: UDF2[In, In, In]
-}
-
-trait OneInputKeyedScalaOperator[In, Out] extends OneInputScalaOperator[In, Out] {
-  this: Operator[Record] =>
-  val key: FieldSelector
-  override def getKeys = Seq(key)
-}
-
-trait TwoInputKeyedScalaOperator[LeftIn, RightIn, Out]
-  extends TwoInputScalaOperator[LeftIn, RightIn, Out] {
-
-  this: Operator[Record] =>
-  val leftKey: FieldSelector
-  val rightKey: FieldSelector
-  override def getKeys = Seq(leftKey, rightKey)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaPlan.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaPlan.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaPlan.scala
deleted file mode 100644
index 2ddd437..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaPlan.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import scala.collection.JavaConversions.asJavaCollection
-
-import java.util.Calendar
-
-import org.apache.flink.api.common.Plan
-import org.apache.flink.compiler.plan.OptimizedPlan
-import org.apache.flink.compiler.postpass.RecordModelPostPass
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.types.Record
-
-import org.apache.flink.api.scala.analysis.GlobalSchemaGenerator
-import org.apache.flink.api.scala.analysis.postPass.GlobalSchemaOptimizer
-
-
-class ScalaPlan(scalaSinks: Seq[ScalaSink[_]], scalaJobName: String = "Flink Scala Job at " + Calendar.getInstance()
-  .getTime()) extends Plan(asJavaCollection(ScalaPlan.setAnnotations(scalaSinks) map { _.sink }), scalaJobName) {
-  val pactSinks = scalaSinks map { _.sink.asInstanceOf[Operator[Record] with ScalaOperator[_, _]] }
-  new GlobalSchemaGenerator().initGlobalSchema(pactSinks)
-  override def getPostPassClassName() = "org.apache.flink.api.scala.ScalaPostPass";
-
-}
-
-object ScalaPlan{
-  def setAnnotations(sinks: Seq[ScalaSink[_]]): Seq[ScalaSink[_]] = {
-    AnnotationUtil.setAnnotations(sinks)
-  }
-}
-
-case class Args(argsMap: Map[String, String], defaultParallelism: Int, schemaHints: Boolean, schemaCompaction: Boolean) {
-  def apply(key: String): String = argsMap.getOrElse(key, key)
-  def apply(key: String, default: => String) = argsMap.getOrElse(key, default)
-}
-
-object Args {
-
-  def parse(args: Seq[String]): Args = {
-
-    var argsMap = Map[String, String]()
-    var defaultParallelism = 1
-    var schemaHints = true
-    var schemaCompaction = true
-
-    val ParamName = "-(.+)".r
-
-    def parse(args: Seq[String]): Unit = args match {
-      case Seq("-subtasks", value, rest @ _*)     => { defaultParallelism = value.toInt; parse(rest) }
-      case Seq("-nohints", rest @ _*)             => { schemaHints = false; parse(rest) }
-      case Seq("-nocompact", rest @ _*)           => { schemaCompaction = false; parse(rest) }
-      case Seq(ParamName(name), value, rest @ _*) => { argsMap = argsMap.updated(name, value); parse(rest) }
-      case Seq()                                  =>
-    }
-
-    parse(args)
-    Args(argsMap, defaultParallelism, schemaHints, schemaCompaction)
-  }
-}
-
-//abstract class ScalaProgram extends Program {
-//  def getScalaPlan(args: Args): ScalaPlan
-//  
-//  override def getPlan(args: String*): Plan = {
-//    val scalaArgs = Args.parse(args.toSeq)
-//    
-//    getScalaPlan(scalaArgs)
-//  }
-//}
-
-
-class ScalaPostPass extends RecordModelPostPass with GlobalSchemaOptimizer {
-  override def postPass(plan: OptimizedPlan): Unit = {
-    optimizeSchema(plan, false)
-    super.postPass(plan)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/Extractors.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/Extractors.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/Extractors.scala
deleted file mode 100644
index e08cd0f..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/Extractors.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import org.apache.flink.compiler.dag._
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.common.operators.Union
-import org.apache.flink.api.common.operators.DualInputOperator
-import org.apache.flink.api.common.operators.SingleInputOperator
-
-import org.apache.flink.api.common.operators.base.{BulkIterationBase => BulkIteration, DeltaIterationBase => DeltaIteration, GenericDataSinkBase, GenericDataSourceBase}
-
-import org.apache.flink.api.java.record.operators.CrossOperator
-import org.apache.flink.api.java.record.operators.CoGroupOperator
-import org.apache.flink.api.java.record.operators.JoinOperator
-import org.apache.flink.api.java.record.operators.MapOperator
-import org.apache.flink.api.java.record.operators.ReduceOperator
-
-import org.apache.flink.types.Record
-
-import org.apache.flink.api.scala._
-
-
-object Extractors {
-
-  object DataSinkNode {
-    def unapply(node: Operator[_]): Option[(UDF1[_, _], Operator[Record])] = node match {
-      case contract: GenericDataSinkBase[_] with ScalaOutputOperator[_] => {
-        Some((contract.getUDF.asInstanceOf[UDF1[_, _]], node.asInstanceOf[GenericDataSinkBase[_]].getInput().asInstanceOf[Operator[Record]]))
-      }
-      case _                               => None
-    }
-  }
-
-  object DataSourceNode {
-    def unapply(node: Operator[_]): Option[(UDF0[_])] = node match {
-      case contract: GenericDataSourceBase[_, _] with ScalaOperator[_, _] => Some(contract.getUDF.asInstanceOf[UDF0[_]])
-      case _                                 => None
-    }
-  }
-
-  object CoGroupNode {
-    def unapply(node: Operator[_]): Option[(UDF2[_, _, _], FieldSelector, FieldSelector, Operator[Record], Operator[Record])] = node match {
-      case contract: CoGroupOperator with TwoInputKeyedScalaOperator[_, _, _] => Some((contract.getUDF, contract.leftKey, contract.rightKey, contract.asInstanceOf[DualInputOperator[_, _, _, _]].getFirstInput().asInstanceOf[Operator[Record]], contract.asInstanceOf[DualInputOperator[_, _, _, _]].getSecondInput().asInstanceOf[Operator[Record]]))
-      case _                                       => None
-    }
-  }
-
-  object CrossNode {
-    def unapply(node: Operator[_]): Option[(UDF2[_, _, _], Operator[Record], Operator[Record])] = node match {
-      case contract: CrossOperator with TwoInputScalaOperator[_, _, _] => Some((contract.getUDF, contract.asInstanceOf[DualInputOperator[_, _, _, _]].getFirstInput().asInstanceOf[Operator[Record]], contract.asInstanceOf[DualInputOperator[_, _, _, _]].getSecondInput().asInstanceOf[Operator[Record]]))
-      case _                                  => None
-    }
-  }
-
-  object JoinNode {
-    def unapply(node: Operator[_]): Option[(UDF2[_, _, _], FieldSelector, FieldSelector, Operator[Record], Operator[Record])] = node match {
-      case contract: JoinOperator with TwoInputKeyedScalaOperator[ _, _, _] => Some((contract.getUDF, contract.leftKey, contract.rightKey, contract.asInstanceOf[DualInputOperator[_, _, _, _]].getFirstInput().asInstanceOf[Operator[Record]], contract.asInstanceOf[DualInputOperator[_, _, _, _]].getSecondInput().asInstanceOf[Operator[Record]]))
-      case _                                    => None
-    }
-  }
-
-  object MapNode {
-    def unapply(node: Operator[_]): Option[(UDF1[_, _], Operator[Record])] = node match {
-      case contract: MapOperator with OneInputScalaOperator[_, _] => Some((contract.getUDF, contract.asInstanceOf[SingleInputOperator[_, _, _]].getInput().asInstanceOf[Operator[Record]]))
-      case _                             => None
-    }
-  }
-  
-  object UnionNode {
-    def unapply(node: Operator[_]): Option[(UDF2[_, _, _], Operator[Record], Operator[Record])] = node match {
-      case contract: Union[_] with UnionScalaOperator[_] => Some((contract.getUDF, contract.asInstanceOf[DualInputOperator[_, _, _, _]].getFirstInput().asInstanceOf[Operator[Record]], contract.asInstanceOf[DualInputOperator[_, _, _, _]].getSecondInput().asInstanceOf[Operator[Record]]))
-      case _                             => None
-    }
-  }
-
-  object ReduceNode {
-    def unapply(node: Operator[_]): Option[(UDF1[_, _], FieldSelector, Operator[Record])] = node match {
-      case contract: ReduceOperator with OneInputKeyedScalaOperator[_, _] => Some((contract.getUDF, contract.key, contract.asInstanceOf[SingleInputOperator[_, _, _]].getInput().asInstanceOf[Operator[Record]]))
-      case contract: ReduceOperator with OneInputScalaOperator[_, _] => Some((contract.getUDF, new FieldSelector(contract.getUDF.inputUDT, Nil), contract.asInstanceOf[SingleInputOperator[_, _, _]].getInput().asInstanceOf[Operator[Record]]))
-      case _                                   => None
-    }
-  }
- object DeltaIterationNode {
-    def unapply(node: Operator[_]): Option[(UDF0[_], FieldSelector, Operator[Record], Operator[Record])] = node match {
-        case contract: DeltaIteration[_, _] with DeltaIterationScalaOperator[_] => Some((contract.getUDF, contract.key, contract.asInstanceOf[DualInputOperator[_, _, _, _]].getFirstInput().asInstanceOf[Operator[Record]], contract.asInstanceOf[DualInputOperator[_, _, _, _]].getSecondInput().asInstanceOf[Operator[Record]]))
-        case _                                  => None
-      }
-  }
-  
-  object BulkIterationNode {
-    def unapply(node: Operator[_]): Option[(UDF0[_], Operator[Record])] = node match {
-      case contract: BulkIteration[_] with BulkIterationScalaOperator[_] => Some((contract.getUDF, contract.asInstanceOf[SingleInputOperator[_, _, _]].getInput().asInstanceOf[Operator[Record]]))
-      case _ => None
-    }
-  } 
-}