You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:45 UTC

[03/60] Rewrite the Scala API as (somewhat) thin Layer on Java API

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/JoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/JoinOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/JoinOperator.scala
deleted file mode 100644
index a92d37a..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/JoinOperator.scala
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.operators
-
-import language.experimental.macros
-import scala.reflect.macros.Context
-
-import java.util.{ Iterator => JIterator }
-
-import org.apache.flink.types.Record
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.java.record.operators.JoinOperator
-import org.apache.flink.api.java.record.functions.{JoinFunction => JJoinFunction}
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper
-import org.apache.flink.configuration.Configuration
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.codegen.{MacroContextHolder, Util}
-import org.apache.flink.api.scala.functions.{JoinFunctionBase, JoinFunction, FlatJoinFunction}
-import org.apache.flink.api.scala.analysis.FieldSelector
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.api.scala.TwoInputHintable
-
-class JoinDataSet[LeftIn, RightIn](val leftInput: DataSet[LeftIn], val rightInput: DataSet[RightIn]) {
-  def where[Key](keyFun: LeftIn => Key) = macro JoinMacros.whereImpl[LeftIn, RightIn, Key]
-}
-
-class JoinDataSetWithWhere[LeftIn, RightIn, Key](val leftKey: List[Int], val leftInput: DataSet[LeftIn], val rightInput: DataSet[RightIn]) {
-  def isEqualTo[Key](keyFun: RightIn => Key) = macro JoinMacros.isEqualToImpl[LeftIn, RightIn, Key]
-}
-
-class JoinDataSetWithWhereAndEqual[LeftIn, RightIn](val leftKey: List[Int], val rightKey: List[Int], val leftInput: DataSet[LeftIn], val rightInput: DataSet[RightIn]) {
-  def map[Out](fun: (LeftIn, RightIn) => Out): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro JoinMacros.map[LeftIn, RightIn, Out]
-  def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro JoinMacros.flatMap[LeftIn, RightIn, Out]
-  def filter(fun: (LeftIn, RightIn) => Boolean): DataSet[(LeftIn, RightIn)] with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)] = macro JoinMacros.filter[LeftIn, RightIn]
-}
-
-class NoKeyMatchBuilder(s: JJoinFunction) extends JoinOperator.Builder(new UserCodeObjectWrapper(s))
-
-object JoinMacros {
-  
-  def whereImpl[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = JoinDataSet[LeftIn, RightIn] })(keyFun: c.Expr[LeftIn => Key]): c.Expr[JoinDataSetWithWhere[LeftIn, RightIn, Key]] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val keySelection = slave.getSelector(keyFun)
-
-    val helper = reify {
-      val helper = c.prefix.splice
-      new JoinDataSetWithWhere[LeftIn, RightIn, Key](keySelection.splice, helper.leftInput, helper.rightInput)
-    }
-
-    return helper
-  }
-  
-  def isEqualToImpl[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = JoinDataSetWithWhere[LeftIn, RightIn, Key] })(keyFun: c.Expr[RightIn => Key]): c.Expr[JoinDataSetWithWhereAndEqual[LeftIn, RightIn]] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val keySelection = slave.getSelector(keyFun)
-
-    val helper = reify {
-      val helper = c.prefix.splice
-      new JoinDataSetWithWhereAndEqual[LeftIn, RightIn](helper.leftKey, keySelection.splice, helper.leftInput, helper.rightInput)
-    }
-
-    return helper
-  }
-
-  def map[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = JoinDataSetWithWhereAndEqual[LeftIn, RightIn] })(fun: c.Expr[(LeftIn, RightIn) => Out]): c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val (udtLeftIn, createUdtLeftIn) = slave.mkUdtClass[LeftIn]
-    val (udtRightIn, createUdtRightIn) = slave.mkUdtClass[RightIn]
-    val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
-
-    val stub: c.Expr[JoinFunctionBase[LeftIn, RightIn, Out]] = if (fun.actualType <:< weakTypeOf[JoinFunction[LeftIn, RightIn, Out]])
-      reify { fun.splice.asInstanceOf[JoinFunctionBase[LeftIn, RightIn, Out]] }
-    else reify {
-      implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
-      implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
-      implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
-      new JoinFunctionBase[LeftIn, RightIn, Out] {
-        override def join(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
-          val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
-          val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
-          val output = fun.splice.apply(left, right)
-
-          leftRecord.setNumFields(outputLength)
-          for (field <- leftDiscard)
-            leftRecord.setNull(field)
-
-          leftRecord.copyFrom(rightRecord, rightForwardFrom, rightForwardTo)
-          leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
-
-          serializer.serialize(output, leftRecord)
-          out.collect(leftRecord)
-        }
-      }
-    }
-    val contract = reify {
-      val helper: JoinDataSetWithWhereAndEqual[LeftIn, RightIn] = c.prefix.splice
-      val leftInput = helper.leftInput.contract
-      val rightInput = helper.rightInput.contract
-      val generatedStub = ClosureCleaner.clean(stub.splice)
-      val leftKeySelector = new FieldSelector(generatedStub.leftInputUDT, helper.leftKey)
-      val rightKeySelector = new FieldSelector(generatedStub.rightInputUDT, helper.rightKey)
-
-      val builder = new NoKeyMatchBuilder(generatedStub).input1(leftInput).input2(rightInput)
-
-      val leftKeyPositions = leftKeySelector.selectedFields.toIndexArray
-      val rightKeyPositions = rightKeySelector.selectedFields.toIndexArray
-
-      val keyTypes = generatedStub.leftInputUDT.getKeySet(leftKeyPositions)
-      // global indexes haven't been computed yet...
-      0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), leftKeyPositions(i), rightKeyPositions(i)) }
-
-      
-      
-      val ret = new JoinOperator(builder) with TwoInputKeyedScalaOperator[LeftIn, RightIn, Out] {
-        override val leftKey: FieldSelector = leftKeySelector
-        override val rightKey: FieldSelector = rightKeySelector
-        override def getUDF = generatedStub.udf
-        override def annotations = Seq(
-          Annotations.getConstantFieldsFirst(
-            Util.filterNonForwards(getUDF.getLeftForwardIndexArrayFrom, getUDF.getLeftForwardIndexArrayTo)),
-          Annotations.getConstantFieldsSecond(
-            Util.filterNonForwards(getUDF.getRightForwardIndexArrayFrom, getUDF.getRightForwardIndexArrayTo)))
-      }
-      new DataSet[Out](ret) with TwoInputHintable[LeftIn, RightIn, Out] {}
-    }
-    
-    val result = c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]](Block(List(udtLeftIn, udtRightIn, udtOut), contract.tree))
-    
-    return result
-  }
-  
-  def flatMap[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = JoinDataSetWithWhereAndEqual[LeftIn, RightIn] })(fun: c.Expr[(LeftIn, RightIn) => Iterator[Out]]): c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val (udtLeftIn, createUdtLeftIn) = slave.mkUdtClass[LeftIn]
-    val (udtRightIn, createUdtRightIn) = slave.mkUdtClass[RightIn]
-    val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
-
-    val stub: c.Expr[JoinFunctionBase[LeftIn, RightIn, Out]] = if (fun.actualType <:< weakTypeOf[JoinFunction[LeftIn, RightIn, Out]])
-      reify { fun.splice.asInstanceOf[JoinFunctionBase[LeftIn, RightIn, Out]] }
-    else reify {
-      implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
-      implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
-      implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
-      new JoinFunctionBase[LeftIn, RightIn, Out] {
-        override def join(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
-          val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
-          val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
-          val output = fun.splice.apply(left, right)
-
-          if (output.nonEmpty) {
-
-            leftRecord.setNumFields(outputLength)
-
-            for (field <- leftDiscard)
-              leftRecord.setNull(field)
-
-            leftRecord.copyFrom(rightRecord, rightForwardFrom, rightForwardTo)
-            leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
-
-            for (item <- output) {
-              serializer.serialize(item, leftRecord)
-              out.collect(leftRecord)
-            }
-          }
-        }
-      }
-    }
-    val contract = reify {
-      val helper: JoinDataSetWithWhereAndEqual[LeftIn, RightIn] = c.prefix.splice
-      val leftInput = helper.leftInput.contract
-      val rightInput = helper.rightInput.contract
-      val generatedStub = ClosureCleaner.clean(stub.splice)
-      val leftKeySelector = new FieldSelector(generatedStub.leftInputUDT, helper.leftKey)
-      val rightKeySelector = new FieldSelector(generatedStub.rightInputUDT, helper.rightKey)
-      val builder = new NoKeyMatchBuilder(generatedStub).input1(leftInput).input2(rightInput)
-
-      val leftKeyPositions = leftKeySelector.selectedFields.toIndexArray
-      val rightKeyPositions = rightKeySelector.selectedFields.toIndexArray
-      val keyTypes = generatedStub.leftInputUDT.getKeySet(leftKeyPositions)
-      // global indexes haven't been computed yet...
-      0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), leftKeyPositions(i), rightKeyPositions(i)) }
-      
-      
-      val ret = new JoinOperator(builder) with TwoInputKeyedScalaOperator[LeftIn, RightIn, Out] {
-        override val leftKey: FieldSelector = leftKeySelector
-        override val rightKey: FieldSelector = rightKeySelector
-        override def getUDF = generatedStub.udf
-        override def annotations = Seq(
-          Annotations.getConstantFieldsFirst(
-            Util.filterNonForwards(getUDF.getLeftForwardIndexArrayFrom, getUDF.getLeftForwardIndexArrayTo)),
-          Annotations.getConstantFieldsSecond(
-            Util.filterNonForwards(getUDF.getRightForwardIndexArrayFrom, getUDF.getRightForwardIndexArrayTo)))
-      }
-      new DataSet[Out](ret) with TwoInputHintable[LeftIn, RightIn, Out] {}
-    }
-
-    val result = c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]](Block(List(udtLeftIn, udtRightIn, udtOut), contract.tree))
-    
-    return result
-  }
-  
-  def filter[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag](c: Context { type PrefixType = JoinDataSetWithWhereAndEqual[LeftIn, RightIn] })(fun: c.Expr[(LeftIn, RightIn) => Boolean]): c.Expr[DataSet[(LeftIn, RightIn)] with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)]] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val (udtLeftIn, createUdtLeftIn) = slave.mkUdtClass[LeftIn]
-    val (udtRightIn, createUdtRightIn) = slave.mkUdtClass[RightIn]
-    val (udtOut, createUdtOut) = slave.mkUdtClass[(LeftIn, RightIn)]
-
-    val stub: c.Expr[JoinFunctionBase[LeftIn, RightIn, (LeftIn, RightIn)]] = if (fun.actualType <:< weakTypeOf[JoinFunction[LeftIn, RightIn, (LeftIn, RightIn)]])
-      reify { fun.splice.asInstanceOf[JoinFunctionBase[LeftIn, RightIn, (LeftIn, RightIn)]] }
-    else reify {
-      implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
-      implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
-      implicit val outputUDT: UDT[(LeftIn, RightIn)] = c.Expr[UDT[(LeftIn, RightIn)]](createUdtOut).splice
-      new JoinFunctionBase[LeftIn, RightIn, (LeftIn, RightIn)] {
-        override def join(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
-          val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
-          val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
-          if (fun.splice.apply(left, right)) {
-            val output = (left, right)
-            leftRecord.setNumFields(outputLength)
-            serializer.serialize(output, leftRecord)
-            out.collect(leftRecord)
-          }
-        }
-      }
-    }
-    val contract = reify {
-      val helper: JoinDataSetWithWhereAndEqual[LeftIn, RightIn] = c.prefix.splice
-      val leftInput = helper.leftInput.contract
-      val rightInput = helper.rightInput.contract
-      val generatedStub = ClosureCleaner.clean(stub.splice)
-      val leftKeySelector = new FieldSelector(generatedStub.leftInputUDT, helper.leftKey)
-      val rightKeySelector = new FieldSelector(generatedStub.rightInputUDT, helper.rightKey)
-      val builder = new NoKeyMatchBuilder(generatedStub).input1(leftInput).input2(rightInput)
-
-      val leftKeyPositions = leftKeySelector.selectedFields.toIndexArray
-      val rightKeyPositions = rightKeySelector.selectedFields.toIndexArray
-      val keyTypes = generatedStub.leftInputUDT.getKeySet(leftKeyPositions)
-      // global indexes haven't been computed yet...
-      0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), leftKeyPositions(i), rightKeyPositions(i)) }
-      
-      
-      val ret = new JoinOperator(builder) with TwoInputKeyedScalaOperator[LeftIn, RightIn, (LeftIn, RightIn)] {
-        override val leftKey: FieldSelector = leftKeySelector
-        override val rightKey: FieldSelector = rightKeySelector
-        override def getUDF = generatedStub.udf
-        override def annotations = Seq(
-          Annotations.getConstantFieldsFirst(
-            Util.filterNonForwards(getUDF.getLeftForwardIndexArrayFrom, getUDF.getLeftForwardIndexArrayTo)),
-          Annotations.getConstantFieldsSecond(
-            Util.filterNonForwards(getUDF.getRightForwardIndexArrayFrom, getUDF.getRightForwardIndexArrayTo)))
-      }
-      new DataSet[(LeftIn, RightIn)](ret) with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)] {}
-    }
-
-    val result = c.Expr[DataSet[(LeftIn, RightIn)] with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)]](Block(List(udtLeftIn, udtRightIn, udtOut), contract.tree))
-    
-    return result
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/MapOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/MapOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/MapOperator.scala
deleted file mode 100644
index 824eb9d..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/MapOperator.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.operators
-
-import language.experimental.macros
-
-import scala.reflect.macros.Context
-
-import org.apache.flink.api.scala.codegen.{MacroContextHolder, Util}
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.api.scala.OneInputHintable
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.functions.{MapFunction, MapFunctionBase}
-import org.apache.flink.api.scala.functions.{FlatMapFunction, FilterFunction}
-
-import org.apache.flink.api.java.record.operators.MapOperator
-import org.apache.flink.types.Record
-import org.apache.flink.util.Collector
-
-
-object MapMacros {
-
-  def map[In: c.WeakTypeTag, Out: c.WeakTypeTag]
-      (c: Context { type PrefixType = DataSet[In] })
-      (fun: c.Expr[In => Out]): c.Expr[DataSet[Out]
-    with OneInputHintable[In, Out]] = {
-
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-//    val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
-    val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
-    val (udtOut, createUdtOut) = slave.mkUdtClass[Out]()
-
-    val stub: c.Expr[MapFunctionBase[In, Out]] =
-      if (fun.actualType <:< weakTypeOf[MapFunction[In, Out]])
-        reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
-      else
-        reify {
-          implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
-          implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
-          new MapFunctionBase[In, Out] {
-            // val userFun = ClosureCleaner.clean(fun.splice)
-            // val userFun = fun.splice
-            override def map(record: Record, out: Collector[Record]) = {
-              val input = deserializer.deserializeRecyclingOn(record)
-              val output = fun.splice.apply(input)
-
-              record.setNumFields(outputLength)
-
-              for (field <- discard)
-                record.setNull(field)
-
-              serializer.serialize(output, record)
-              out.collect(record)
-            }
-          }
-        }
-
-    val contract = reify {
-      val input = c.prefix.splice.contract
-      val generatedStub = ClosureCleaner.clean(stub.splice)
-      val builder = MapOperator.builder(generatedStub).input(input)
-      
-      val contract = new MapOperator(builder) with OneInputScalaOperator[In, Out] {
-        override def getUDF = generatedStub.udf
-        override def annotations = Seq(
-          Annotations.getConstantFields(
-            Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
-      }
-      val stream = new DataSet[Out](contract) with OneInputHintable[In, Out] {}
-      contract.persistHints = { () => stream.applyHints(contract) }
-      stream
-    }
-
-    val result = c.Expr[DataSet[Out]
-      with OneInputHintable[In, Out]](Block(List(udtIn, udtOut), contract.tree))
-
-    result
-  }
-  
-  def flatMap[In: c.WeakTypeTag, Out: c.WeakTypeTag]
-      (c: Context { type PrefixType = DataSet[In] })
-      (fun: c.Expr[In => Iterator[Out]]): c.Expr[DataSet[Out]
-    with OneInputHintable[In, Out]] = {
-
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-//    val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
-    val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
-    val (udtOut, createUdtOut) = slave.mkUdtClass[Out]()
-
-    val stub: c.Expr[MapFunctionBase[In, Out]] =
-      if (fun.actualType <:< weakTypeOf[FlatMapFunction[In, Out]])
-        reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
-      else reify {
-        implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
-        implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
-        new MapFunctionBase[In, Out] {
-          override def map(record: Record, out: Collector[Record]) = {
-            val input = deserializer.deserializeRecyclingOn(record)
-            val output = fun.splice.apply(input)
-
-            if (output.nonEmpty) {
-
-              record.setNumFields(outputLength)
-
-              for (field <- discard)
-                record.setNull(field)
-
-              for (item <- output) {
-
-                serializer.serialize(item, record)
-                out.collect(record)
-              }
-            }
-          }
-        }
-      }
-    val contract = reify {
-      val input = c.prefix.splice.contract
-      val generatedStub = ClosureCleaner.clean(stub.splice)
-      val builder = MapOperator.builder(generatedStub).input(input)
-      
-      val contract = new MapOperator(builder) with OneInputScalaOperator[In, Out] {
-        override def getUDF = generatedStub.udf
-        override def annotations = Seq(
-          Annotations.getConstantFields(
-            Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
-      }
-      val stream = new DataSet[Out](contract) with OneInputHintable[In, Out] {}
-      contract.persistHints = { () => stream.applyHints(contract) }
-      stream
-    }
-
-    val result = c.Expr[DataSet[Out] with OneInputHintable[In, Out]](Block(List(udtIn, udtOut),
-      contract.tree))
-
-    result
-  }
-  
-  def filter[In: c.WeakTypeTag]
-      (c: Context { type PrefixType = DataSet[In] })
-      (fun: c.Expr[In => Boolean]): c.Expr[DataSet[In]
-
-    with OneInputHintable[In, In]] = {
-
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-
-    //    val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
-    val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
-
-    val stub: c.Expr[MapFunctionBase[In, In]] =
-      if (fun.actualType <:< weakTypeOf[FilterFunction[In, In]])
-        reify { fun.splice.asInstanceOf[MapFunctionBase[In, In]] }
-      else
-        reify {
-          implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
-          new MapFunctionBase[In, In] {
-            override def map(record: Record, out: Collector[Record]) = {
-              val input = deserializer.deserializeRecyclingOn(record)
-              if (fun.splice.apply(input)) {
-        	      out.collect(record)
-              }
-            }
-          }
-        }
-    val contract = reify {
-      val input = c.prefix.splice.contract
-      val generatedStub = ClosureCleaner.clean(stub.splice)
-      val builder = MapOperator.builder(generatedStub).input(input)
-      
-      val contract = new MapOperator(builder) with OneInputScalaOperator[In, In] {
-        override def getUDF = generatedStub.udf
-        override def annotations = Seq(
-          Annotations.getConstantFields(
-            Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
-      }
-      val stream = new DataSet[In](contract) with OneInputHintable[In, In] {}
-      contract.persistHints = { () =>
-        stream.applyHints(contract)
-        0 until generatedStub.udf.getOutputLength foreach { i => stream.markCopied(i, i) }
-      }
-      stream
-    }
-
-    val result = c.Expr[DataSet[In]
-      with OneInputHintable[In, In]](Block(List(udtIn), contract.tree))
-
-    result
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
deleted file mode 100644
index f25b5a3..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
+++ /dev/null
@@ -1,363 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.operators
-
-import language.experimental.macros
-import scala.language.reflectiveCalls
-import scala.reflect.macros.Context
-
-import java.util.{ Iterator => JIterator }
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.api.scala.OneInputHintable
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.codegen.{MacroContextHolder, Util}
-import org.apache.flink.api.scala.functions.{ReduceFunction, ReduceFunctionBase, CombinableGroupReduceFunction, GroupReduceFunction}
-import org.apache.flink.api.scala.analysis.UDF1
-import org.apache.flink.api.scala.analysis.FieldSelector
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.java.record.operators.MapOperator
-import org.apache.flink.types.Record
-import org.apache.flink.types.IntValue
-import org.apache.flink.api.java.record.operators.ReduceOperator
-import org.apache.flink.api.java.record.functions.{ReduceFunction => JReduceFunction}
-
-
-class KeyedDataSet[In](val keySelection: List[Int], val input: DataSet[In]) {
-  def reduceGroup[Out](fun: Iterator[In] => Out): DataSet[Out] with OneInputHintable[In, Out] = macro ReduceMacros.reduceGroup[In, Out]
-  def combinableReduceGroup(fun: Iterator[In] => In): DataSet[In] with OneInputHintable[In, In] = macro ReduceMacros.combinableReduceGroup[In]
-  
-  def reduce(fun: (In, In) => In): DataSet[In] with OneInputHintable[In, In] = macro ReduceMacros.reduce[In]
-  
-  def count() : DataSet[(In, Int)] with OneInputHintable[In, (In, Int)] = macro ReduceMacros.count[In]
-}
-
-object ReduceMacros {
-  
-  def groupBy[In: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })
-                                                    (keyFun: c.Expr[In => Key]): c.Expr[KeyedDataSet[In]] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-    val keySelection = slave.getSelector(keyFun)
-
-    val helper = reify {
-    	new KeyedDataSet[In](keySelection.splice, c.prefix.splice)
-    }
-
-    return helper
-  }
-
-  def reduce[In: c.WeakTypeTag](c: Context { type PrefixType = KeyedDataSet[In] })
-                               (fun: c.Expr[(In, In) => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
-    import c.universe._
-
-    reduceImpl(c)(c.prefix, fun)
-  }
-
-  def globalReduce[In: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })(fun: c.Expr[(In, In) => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
-    import c.universe._
-
-    reduceImpl(c)(reify { new KeyedDataSet[In](List[Int](), c.prefix.splice) }, fun)
-  }
-
-  def reduceGroup[In: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = KeyedDataSet[In] })
-                                                        (fun: c.Expr[Iterator[In] => Out]): c.Expr[DataSet[Out] with OneInputHintable[In, Out]] = {
-    import c.universe._
-
-    reduceGroupImpl(c)(c.prefix, fun)
-  }
-
-  def globalReduceGroup[In: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })(fun: c.Expr[Iterator[In] => Out]): c.Expr[DataSet[Out] with OneInputHintable[In, Out]] = {
-    import c.universe._
-
-    reduceGroupImpl(c)(reify { new KeyedDataSet[In](List[Int](), c.prefix.splice) }, fun)
-  }
-
-  def combinableReduceGroup[In: c.WeakTypeTag](c: Context { type PrefixType = KeyedDataSet[In] })
-                                              (fun: c.Expr[Iterator[In] => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
-    import c.universe._
-
-    combinableReduceGroupImpl(c)(c.prefix, fun)
-  }
-
-  def combinableGlobalReduceGroup[In: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })
-                                              (fun: c.Expr[Iterator[In] => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
-    import c.universe._
-
-    combinableReduceGroupImpl(c)(reify { new KeyedDataSet[In](List[Int](), c.prefix.splice) }, fun)
-  }
-
-  def reduceImpl[In: c.WeakTypeTag](c: Context)
-                               (groupedInput: c.Expr[KeyedDataSet[In]], fun: c.Expr[(In, In) => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-//    val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
-    val (udtIn, createUdtIn) = slave.mkUdtClass[In]
-
-    val stub: c.Expr[ReduceFunctionBase[In, In]] = if (fun.actualType <:< weakTypeOf[ReduceFunction[In]])
-      reify { fun.splice.asInstanceOf[ReduceFunctionBase[In, In]] }
-    else reify {
-      implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
-
-      new ReduceFunctionBase[In, In] {
-        override def combine(records: JIterator[Record], out: Collector[Record]) = {
-          reduce(records, out)
-        }
-
-        override def reduce(records: JIterator[Record], out: Collector[Record]) = {
-          if (records.hasNext) {
-            val firstRecord = reduceIterator.initialize(records)
-            reduceRecord.copyFrom(firstRecord, reduceForwardFrom, reduceForwardTo)
-
-            val output = reduceIterator.reduce(fun.splice)
-
-            reduceSerializer.serialize(output, reduceRecord)
-            out.collect(reduceRecord)
-          }
-        }
-      }
-
-    }
-    val contract = reify {
-      val helper = groupedInput.splice
-      val input = helper.input.contract
-      val generatedStub = ClosureCleaner.clean(stub.splice)
-      val keySelection = helper.keySelection
-      val keySelector = new FieldSelector(generatedStub.inputUDT, keySelection)
-
-      val builder = ReduceOperator.builder(generatedStub).input(input)
-
-      val keyPositions = keySelector.selectedFields.toIndexArray
-      val keyTypes = generatedStub.inputUDT.getKeySet(keyPositions)
-      // global indexes haven't been computed yet...
-      0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), keyPositions(i)) }
-      
-      val ret = new ReduceOperator(builder) with OneInputKeyedScalaOperator[In, In] {
-        override val key: FieldSelector = keySelector
-        override def getUDF = generatedStub.udf
-        override def annotations = Annotations.getCombinable() +: Seq(
-          Annotations.getConstantFields(
-            Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
-      }
-      new DataSet[In](ret) with OneInputHintable[In, In] {}
-    }
-
-    val result = c.Expr[DataSet[In] with OneInputHintable[In, In]](Block(List(udtIn), contract.tree))
-    
-    return result
-  }
-
-  def reduceGroupImpl[In: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context)
-                                                            (groupedInput: c.Expr[KeyedDataSet[In]], fun: c.Expr[Iterator[In] => Out]): c.Expr[DataSet[Out] with OneInputHintable[In, Out]] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-//    val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
-    val (udtIn, createUdtIn) = slave.mkUdtClass[In]
-    val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
-
-    val stub: c.Expr[ReduceFunctionBase[In, Out]] = if (fun.actualType <:< weakTypeOf[GroupReduceFunction[In, Out]])
-      reify { fun.splice.asInstanceOf[ReduceFunctionBase[In, Out]] }
-    else reify {
-      implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
-      implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
-
-      new ReduceFunctionBase[In, Out] {
-        override def reduce(recordsIterable: JIterator[Record], out: Collector[Record]) = {
-          val records: JIterator[Record] = recordsIterable
-          
-          if (records.hasNext) {
-            val firstRecord = reduceIterator.initialize(records)
-            reduceRecord.copyFrom(firstRecord, reduceForwardFrom, reduceForwardTo)
-
-            val output = fun.splice.apply(reduceIterator)
-
-            reduceSerializer.serialize(output, reduceRecord)
-            out.collect(reduceRecord)
-          }
-        }
-      }
-    }
-    val contract = reify {
-      val helper = groupedInput.splice
-      val input = helper.input.contract
-      val generatedStub = ClosureCleaner.clean(stub.splice)
-      val keySelection = helper.keySelection
-      val keySelector = new FieldSelector(generatedStub.inputUDT, keySelection)
-      val builder = ReduceOperator.builder(generatedStub).input(input)
-
-      val keyPositions = keySelector.selectedFields.toIndexArray
-      val keyTypes = generatedStub.inputUDT.getKeySet(keyPositions)
-      // global indexes haven't been computed yet...
-      0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), keyPositions(i)) }
-      
-      val ret = new ReduceOperator(builder) with OneInputKeyedScalaOperator[In, Out] {
-        override val key: FieldSelector = keySelector
-        override def getUDF = generatedStub.udf
-        override def annotations = Seq(
-          Annotations.getConstantFields(
-            Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
-      }
-      new DataSet[Out](ret) with OneInputHintable[In, Out] {}
-    }
-
-    val result = c.Expr[DataSet[Out] with OneInputHintable[In, Out]](Block(List(udtIn, udtOut), contract.tree))
-    
-    return result
-  }
-  
-  def combinableReduceGroupImpl[In: c.WeakTypeTag](c: Context)
-                                              (groupedInput: c.Expr[KeyedDataSet[In]], fun: c.Expr[Iterator[In] => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-    
-//    val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
-    val (udtIn, createUdtIn) = slave.mkUdtClass[In]
-
-    val stub: c.Expr[ReduceFunctionBase[In, In]] = if (fun.actualType <:< weakTypeOf[CombinableGroupReduceFunction[In, In]])
-      reify { fun.splice.asInstanceOf[ReduceFunctionBase[In, In]] }
-    else reify {
-      implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
-
-      new ReduceFunctionBase[In, In] {
-        override def combine(records: JIterator[Record], out: Collector[Record]) = {
-          reduce(records, out)
-        }
-
-        override def reduce(records: JIterator[Record], out: Collector[Record]) = {
-          val firstRecord = reduceIterator.initialize(records)
-          reduceRecord.copyFrom(firstRecord, reduceForwardFrom, reduceForwardTo)
-
-          val output = fun.splice.apply(reduceIterator)
-
-          reduceSerializer.serialize(output, reduceRecord)
-          out.collect(reduceRecord)
-        }
-      }
-    }
-    val contract = reify {
-      val helper = groupedInput.splice
-      val input = helper.input.contract
-      val generatedStub = ClosureCleaner.clean(stub.splice)
-      val keySelection = helper.keySelection
-      val keySelector = new FieldSelector(generatedStub.inputUDT, keySelection)
-      val builder = ReduceOperator.builder(generatedStub).input(input)
-
-      val keyPositions = keySelector.selectedFields.toIndexArray
-      val keyTypes = generatedStub.inputUDT.getKeySet(keyPositions)
-      // global indexes haven't been computed yet...
-      0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), keyPositions(i)) }
-      
-      val ret = new ReduceOperator(builder) with OneInputKeyedScalaOperator[In, In] {
-        override val key: FieldSelector = keySelector
-        override def getUDF = generatedStub.udf
-        override def annotations = Annotations.getCombinable() +: Seq(
-          Annotations.getConstantFields(
-            Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
-      }
-      new DataSet[In](ret) with OneInputHintable[In, In] {}
-    }
-
-    val result = c.Expr[DataSet[In] with OneInputHintable[In, In]](Block(List(udtIn), contract.tree))
-    
-    return result
-  }
-
-  def count[In: c.WeakTypeTag](c: Context { type PrefixType = KeyedDataSet[In] })() : c.Expr[DataSet[(In, Int)] with OneInputHintable[In, (In, Int)]] = {
-    import c.universe._
-
-    val slave = MacroContextHolder.newMacroHelper(c)
-
-    val (udtIn, createUdtIn) = slave.mkUdtClass[In]
-    val (udtOut, createUdtOut) = slave.mkUdtClass[(In, Int)]
-    
-    val contract = reify {
-      val helper: KeyedDataSet[In] = c.prefix.splice
-      val keySelection = helper.keySelection
-
-      val generatedStub = new JReduceFunction with Serializable {
-        val inputUDT = c.Expr[UDT[In]](createUdtIn).splice
-        val outputUDT = c.Expr[UDT[(In, Int)]](createUdtOut).splice
-        val keySelector = new FieldSelector(inputUDT, keySelection)
-        val udf: UDF1[In, (In, Int)] = new UDF1(inputUDT, outputUDT)
-        
-        private val reduceRecord = new Record()
-        private val pactInt = new IntValue()
-
-        private var countPosition: Int = 0;
-
-        override def open(config: Configuration) = {
-          super.open(config)
-          this.countPosition = udf.getOutputLength - 1;
-        }
-        
-        override def reduce(records: JIterator[Record], result: Collector[Record]) : Unit = {
-          
-          var record : Record = null
-          var counter: Int = 0
-          while (records.hasNext()) {
-            record = records.next()
-            val count = if (record.getNumFields() <= countPosition || record.isNull(countPosition)) 1 else record.getField(countPosition, pactInt).getValue()
-            counter = counter + count
-          }
-          
-          pactInt.setValue(counter)
-          record.setField(countPosition, pactInt)
-          result.collect(record)
-        }
-        
-        override def combine(records: JIterator[Record], result: Collector[Record]) : Unit = {
-          reduce(records, result)
-        }
-
-      }
-      
-      val builder = ReduceOperator.builder(generatedStub).input(helper.input.contract)
-
-      val keyPositions = generatedStub.keySelector.selectedFields.toIndexArray
-      val keyTypes = generatedStub.inputUDT.getKeySet(keyPositions)
-      // global indexes haven't been computed yet...
-      0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), keyPositions(i)) }
-      
-      val ret = new ReduceOperator(builder) with OneInputKeyedScalaOperator[In, (In, Int)] {
-        override val key: FieldSelector = generatedStub.keySelector
-        override def getUDF = generatedStub.udf
-        override def annotations = Annotations.getCombinable() +: Seq(Annotations.getConstantFieldsExcept(Array[Int]()))
-      }
-      new DataSet[(In, Int)](ret) with OneInputHintable[In, (In, Int)] {}
-    }
-
-    val result = c.Expr[DataSet[(In, Int)] with OneInputHintable[In, (In, Int)]](Block(List(udtIn, udtOut), contract.tree))
-    
-    return result
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/UnionOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/UnionOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/UnionOperator.scala
deleted file mode 100644
index 5926eef..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/UnionOperator.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.operators
-
-import language.experimental.macros
-
-import org.apache.flink.api.scala.UnionScalaOperator
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.api.scala.analysis.UDF2
-
-import org.apache.flink.api.common.operators.Union
-import org.apache.flink.types.Record
-
-object UnionOperator {
-
-  def impl[In](firstInput: DataSet[In], secondInput: DataSet[In]): DataSet[In] = {
-    val union = new Union[Record](firstInput.contract, secondInput.contract)
-      with UnionScalaOperator[In] {
-
-      private val inputUDT = firstInput.contract.getUDF.outputUDT
-      private val udf: UDF2[In, In, In] = new UDF2(inputUDT, inputUDT, inputUDT)
-
-      override def getUDF = udf
-    }
-    new DataSet(union)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/package.scala
deleted file mode 100644
index 5a20940..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/package.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import scala.language.implicitConversions
-
-import scala.collection.TraversableOnce
-
-package object operators {
-
-  implicit def traversableToIterator[T](i: TraversableOnce[T]): Iterator[T] = i.toIterator
-  implicit def optionToIterator[T](opt: Option[T]): Iterator[T] = opt.iterator
-  implicit def arrayToIterator[T](arr: Array[T]): Iterator[T] = arr.iterator
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
new file mode 100644
index 0000000..405158f
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api
+
+import _root_.scala.reflect.ClassTag
+import language.experimental.macros
+import org.apache.flink.types.TypeInformation
+import org.apache.flink.api.scala.typeutils.TypeUtils
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+
+package object scala {
+  // We have this here so that we always have generated TypeInformationS when
+  // using the Scala API
+  implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
+
+  // We need to wrap Java DataSet because we need the scala operations
+  private[flink] def wrap[R: ClassTag](set: JavaDataSet[R]) = new DataSet[R](set)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala
new file mode 100644
index 0000000..3d21f05
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
+import org.apache.flink.core.memory.MemorySegment
+import org.apache.flink.types.{KeyFieldOutOfBoundsException, NullKeyFieldException}
+;
+
+/**
+ * Comparator for Scala Tuples. Access is different from
+ * our Java Tuples so we have to treat them differently.
+ */
+class ScalaTupleComparator[T <: Product](
+    keys: Array[Int],
+    scalaComparators: Array[TypeComparator[_]],
+    scalaSerializers: Array[TypeSerializer[_]] )
+  extends TupleComparatorBase[T](keys, scalaComparators, scalaSerializers) {
+
+  private val extractedKeys = new Array[AnyRef](keys.length)
+
+  // We cannot use the Clone Constructor from Scala so we have to do it manually
+  def duplicate: TypeComparator[T] = {
+    // ensure that the serializers are available
+    instantiateDeserializationUtils()
+    val result = new ScalaTupleComparator[T](keyPositions, comparators, serializers)
+    result.privateDuplicate(this)
+    result
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Comparator Methods
+  // --------------------------------------------------------------------------------------------
+
+  def hash(value: T): Int = {
+    val comparator = comparators(0).asInstanceOf[TypeComparator[Any]]
+    var code: Int = comparator.hash(value.productElement(keyPositions(0)))
+    for (i <- 1 until keyPositions.length) {
+      try {
+        code *= TupleComparatorBase.HASH_SALT(i & 0x1F)
+        val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+        code += comparator.hash(value.productElement(keyPositions(i)))
+      } catch {
+        case npex: NullPointerException =>
+          throw new NullKeyFieldException(keyPositions(i))
+        case iobex: IndexOutOfBoundsException =>
+          throw new KeyFieldOutOfBoundsException(keyPositions(i))
+      }
+    }
+    code
+  }
+
+  def setReference(toCompare: T) {
+    for (i <- 0 until keyPositions.length) {
+      try {
+        val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+        comparator.setReference(toCompare.productElement(keyPositions(i)))
+      } catch {
+        case npex: NullPointerException =>
+          throw new NullKeyFieldException(keyPositions(i))
+        case iobex: IndexOutOfBoundsException =>
+          throw new KeyFieldOutOfBoundsException(keyPositions(i))
+      }
+    }
+  }
+
+  def equalToReference(candidate: T): Boolean = {
+    for (i <- 0 until keyPositions.length) {
+      try {
+        val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+        if (!comparator.equalToReference(candidate.productElement(keyPositions(i)))) {
+          return false
+        }
+      } catch {
+        case npex: NullPointerException =>
+          throw new NullKeyFieldException(keyPositions(i))
+        case iobex: IndexOutOfBoundsException =>
+          throw new KeyFieldOutOfBoundsException(keyPositions(i))
+      }
+    }
+    true
+  }
+
+  def compare(first: T, second: T): Int = {
+    for (i <- 0 until keyPositions.length) {
+      try {
+        val keyPos: Int = keyPositions(i)
+        val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+        val cmp: Int = comparator.compare(
+          first.productElement(keyPos),
+          second.productElement(keyPos))
+        if (cmp != 0) {
+          return cmp
+        }
+      } catch {
+        case npex: NullPointerException =>
+          throw new NullKeyFieldException(keyPositions(i))
+        case iobex: IndexOutOfBoundsException =>
+          throw new KeyFieldOutOfBoundsException(keyPositions(i))
+      }
+    }
+    0
+  }
+
+  def putNormalizedKey(value: T, target: MemorySegment, offsetParam: Int, numBytesParam: Int) {
+    var numBytes = numBytesParam
+    var offset = offsetParam
+    var i: Int = 0
+    try {
+      while (i < numLeadingNormalizableKeys && numBytes > 0) {
+        {
+          var len: Int = normalizedKeyLengths(i)
+          len = if (numBytes >= len) len else numBytes
+          val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+          comparator.putNormalizedKey(value.productElement(keyPositions(i)), target, offset, len)
+          numBytes -= len
+          offset += len
+        }
+        i += 1
+      }
+    } catch {
+      case npex: NullPointerException => throw new NullKeyFieldException(keyPositions(i))
+    }
+  }
+
+  def extractKeys(value: T) = {
+    for (i <- 0 until keyPositions.length ) {
+      extractedKeys(i) = value.productElement(keyPositions(i)).asInstanceOf[AnyRef]
+    }
+    extractedKeys
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala
new file mode 100644
index 0000000..a63bb35
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+;
+
+/**
+ * Serializer for Scala Tuples. Creation and access is different from
+ * our Java Tuples so we have to treat them differently.
+ */
+abstract class ScalaTupleSerializer[T <: Product](
+    tupleClass: Class[T],
+    scalaFieldSerializers: Array[TypeSerializer[_]])
+  extends TupleSerializerBase[T](tupleClass, scalaFieldSerializers) {
+
+  def createInstance: T = {
+    val fields: Array[AnyRef] = new Array(arity)
+    for (i <- 0 until arity) {
+      fields(i) = fieldSerializers(i).createInstance()
+    }
+    createInstance(fields)
+  }
+
+  def copy(from: T, reuse: T): T = {
+    val fields: Array[AnyRef] = new Array(arity)
+    for (i <- 0 until arity) {
+      fields(i) = from.productElement(i).asInstanceOf[AnyRef]
+    }
+    createInstance(fields)
+  }
+
+  def serialize(value: T, target: DataOutputView) {
+    for (i <- 0 until arity) {
+      val serializer = fieldSerializers(i).asInstanceOf[TypeSerializer[Any]]
+      serializer.serialize(value.productElement(i), target)
+    }
+  }
+
+  def deserialize(reuse: T, source: DataInputView): T = {
+    val fields: Array[AnyRef] = new Array(arity)
+    for (i <- 0 until arity) {
+      val field = reuse.productElement(i).asInstanceOf[AnyRef]
+      fields(i) = fieldSerializers(i).deserialize(field, source)
+    }
+    createInstance(fields)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala
new file mode 100644
index 0000000..069e03b
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, AtomicType, TupleTypeInfoBase}
+import org.apache.flink.types.TypeInformation
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+
+/**
+ * TypeInformation for Scala Tuples. Creation and access is different from
+ * our Java Tuples so we have to treat them differently.
+ */
+abstract class ScalaTupleTypeInfo[T <: Product](
+    tupleClass: Class[T],
+    fieldTypes: Seq[TypeInformation[_]])
+  extends TupleTypeInfoBase[T](tupleClass, fieldTypes: _*) {
+
+  def createComparator(logicalKeyFields: Array[Int], orders: Array[Boolean]): TypeComparator[T] = {
+    // sanity checks
+    if (logicalKeyFields == null || orders == null
+      || logicalKeyFields.length != orders.length || logicalKeyFields.length > types.length) {
+      throw new IllegalArgumentException
+    }
+
+    // No special handling of leading Key field as in JavaTupleComparator for now
+
+    // --- general case ---
+    var maxKey: Int = -1
+
+    for (key <- logicalKeyFields) {
+      maxKey = Math.max(key, maxKey)
+    }
+
+    if (maxKey >= types.length) {
+      throw new IllegalArgumentException("The key position " + maxKey + " is out of range for " +
+        "Tuple" + types.length)
+    }
+
+    // create the comparators for the individual fields
+    val fieldComparators: Array[TypeComparator[_]] = new Array(logicalKeyFields.length)
+
+    for (i <- 0 until logicalKeyFields.length) {
+      val keyPos = logicalKeyFields(i)
+      if (types(keyPos).isKeyType && types(keyPos).isInstanceOf[AtomicType[_]]) {
+        fieldComparators(i) = types(keyPos).asInstanceOf[AtomicType[_]].createComparator(orders(i))
+      } else {
+        throw new IllegalArgumentException(
+          "The field at position " + i + " (" + types(keyPos) + ") is no atomic key type.")
+      }
+    }
+
+    // create the serializers for the prefix up to highest key position
+    val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](maxKey + 1)
+
+    for (i <- 0 to maxKey) {
+      fieldSerializers(i) = types(i).createSerializer
+    }
+
+    new ScalaTupleComparator[T](logicalKeyFields, fieldComparators, fieldSerializers)
+  }
+
+  override def toString = "Scala " + super.toString
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala
new file mode 100644
index 0000000..5901e48
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import scala.reflect.macros.Context
+import org.apache.flink.api.scala.codegen.MacroContextHolder
+import org.apache.flink.types.TypeInformation
+
+private[flink] object TypeUtils {
+
+  def createTypeInfo[T: c.WeakTypeTag](c: Context): c.Expr[TypeInformation[T]] = {
+    val slave = MacroContextHolder.newMacroHelper(c)
+    slave.mkTypeInfo[T]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
new file mode 100644
index 0000000..f8cbb62
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.operators.Keys
+import org.apache.flink.api.java.operators.Keys.FieldPositionKeys
+import org.apache.flink.types.TypeInformation
+
+/**
+ * This is for dealing with operations that require keys and use a fluent interface (join, and
+ * coGroup for now). For each operation we need a subclass that implements `finish` to
+ * create the actual operation using the provided keys.
+ *
+ * This way, we have a central point where all the key-providing happens and don't need to change
+ * the specific operations if the supported key types change.
+ *
+ * We use the type parameter `R` to specify the type of the resulting operation. For join
+ * this would be `JoinDataSet[T, O]` and for coGroup it would be `CoGroupDataSet[T, O]`. This
+ * way the user gets the correct type for the finished operation.
+ *
+ * @tparam T Type of the left input [[DataSet]].
+ * @tparam O Type of the right input [[DataSet]].
+ * @tparam R The type of the resulting Operation.
+ */
+private[flink] abstract class UnfinishedKeyPairOperation[T, O, R](
+    private[flink] val leftSet: JavaDataSet[T],
+    private[flink] val rightSet: JavaDataSet[O]) {
+
+  private[flink] def finish(leftKey: Keys[T], rightKey: Keys[O]): R
+
+  /**
+   * Specify the key fields for the left side of the key based operation. This returns
+   * a [[HalfUnfinishedKeyPairOperation]] on which `isEqualTo` must be called to specify the
+   * key for the right side. The result after specifying the right side key is the finished
+   * operation.
+   *
+   * This only works on a Tuple [[DataSet]].
+   */
+  def where(leftKeys: Int*) = {
+    val leftKey = new FieldPositionKeys[T](leftKeys.toArray, leftSet.getType)
+    new HalfUnfinishedKeyPairOperation[T, O, R](this, leftKey)
+  }
+
+  /**
+   * Specify the key selector function for the left side of the key based operation. This returns
+   * a [[HalfUnfinishedKeyPairOperation]] on which `isEqualTo` must be called to specify the
+   * key for the right side. The result after specifying the right side key is the finished
+   * operation.
+   */
+  def where[K: TypeInformation](fun: (T) => K) = {
+    val keyType = implicitly[TypeInformation[K]]
+    val keyExtractor = new KeySelector[T, K] {
+      def getKey(in: T) = fun(in)
+    }
+    val leftKey = new Keys.SelectorFunctionKeys[T, K](keyExtractor, leftSet.getType, keyType)
+    new HalfUnfinishedKeyPairOperation[T, O, R](this, leftKey)
+  }
+}
+
+private[flink] class HalfUnfinishedKeyPairOperation[T, O, R](
+    unfinished: UnfinishedKeyPairOperation[T, O, R], leftKey: Keys[T]) {
+
+  /**
+   * Specify the key fields for the right side of the key based operation. This returns
+   * the finished operation.
+   *
+   * This only works on a Tuple [[DataSet]].
+   */
+  def equalTo(rightKeys: Int*): R = {
+    val rightKey = new FieldPositionKeys[O](rightKeys.toArray, unfinished.rightSet.getType)
+    if (!leftKey.areCompatibale(rightKey)) {
+      throw new InvalidProgramException("The types of the key fields do not match. Left: " +
+        leftKey + " Right: " + rightKey)
+    }
+    unfinished.finish(leftKey, rightKey)
+  }
+
+  /**
+   * Specify the key selector function for the right side of the key based operation. This returns
+   * the finished operation.
+   */
+  def equalTo[K: TypeInformation](fun: (O) => K) = {
+    val keyType = implicitly[TypeInformation[K]]
+    val keyExtractor = new KeySelector[O, K] {
+      def getKey(in: O) = fun(in)
+    }
+    val rightKey =
+      new Keys.SelectorFunctionKeys[O, K](keyExtractor, unfinished.rightSet.getType, keyType)
+    if (!leftKey.areCompatibale(rightKey)) {
+      throw new InvalidProgramException("The types of the key fields do not match. Left: " +
+        leftKey + " Right: " + rightKey)
+    }
+    unfinished.finish(leftKey, rightKey)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/CollectionDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/CollectionDataSourceTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/CollectionDataSourceTest.scala
deleted file mode 100644
index 19feb3a..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/CollectionDataSourceTest.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import org.apache.flink.api.java.record.operators.{CollectionDataSource => JCollectionDataSource}
-import org.apache.flink.types.{DoubleValue, Record}
-import org.scalatest.junit.AssertionsForJUnit
-import org.junit.Assert._
-import org.junit.Test
-
-
-class CollectionDataSourceTest extends AssertionsForJUnit {
-  @Test def testScalaCollectionInput() {
-    val expected = List(1.0, 2.0, 3.0)
-    val datasource = org.apache.flink.api.scala.CollectionDataSource(expected)
-
-    val javaCDS = datasource.contract.asInstanceOf[JCollectionDataSource]
-
-    val inputFormat = javaCDS.getFormatWrapper.getUserCodeObject()
-    val splits = inputFormat.createInputSplits(1)
-    inputFormat.open(splits(0))
-
-    val record = new Record()
-    var result = List[Double]()
-
-    while(!inputFormat.reachedEnd()){
-      inputFormat.nextRecord(record)
-      assertTrue(record.getNumFields == 1)
-      val value = record.getField[DoubleValue](0, classOf[DoubleValue])
-      result = value.getValue :: result
-    }
-
-    assertEquals(expected, result.reverse)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
index b4927b4..d5b0d24 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -1,192 +1,192 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import org.junit.Test
-import org.apache.flink.api.common.InvalidProgramException
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.operators._
-import org.scalatest.junit.AssertionsForJUnit
-
-// Verify that the sanity checking in delta iterations works. We just
-// have a dummy job that is not meant to be executed. Only verify that
-// the join/coGroup inside the iteration is checked.
-class DeltaIterationSanityCheckTest extends Serializable {
-
-  @Test
-  def testCorrectJoinWithSolution1 {
-    val solutionInput = CollectionDataSource(Array((1, "1")))
-    val worksetInput = CollectionDataSource(Array((2, "2")))
-
-    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
-      val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
-      (result, ws)
-    }
-    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
-    val output = iteration.write("/dummy", CsvOutputFormat())
-
-    val plan = new ScalaPlan(Seq(output))
-  }
-
-  @Test
-  def testCorrectJoinWithSolution2 {
-    val solutionInput = CollectionDataSource(Array((1, "1")))
-    val worksetInput = CollectionDataSource(Array((2, "2")))
-
-    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
-      val result = ws join s where {_._1} isEqualTo {_._1} map { (l, r) => l }
-      (result, ws)
-    }
-    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
-    val output = iteration.write("/dummy", CsvOutputFormat())
-
-    val plan = new ScalaPlan(Seq(output))
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectJoinWithSolution1 {
-    val solutionInput = CollectionDataSource(Array((1, "1")))
-    val worksetInput = CollectionDataSource(Array((2, "2")))
-
-    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
-      val result = s join ws where {_._2} isEqualTo {_._2} map { (l, r) => l }
-      (result, ws)
-    }
-    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
-    val output = iteration.write("/dummy", CsvOutputFormat())
-
-    val plan = new ScalaPlan(Seq(output))
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectJoinWithSolution2 {
-    val solutionInput = CollectionDataSource(Array((1, "1")))
-    val worksetInput = CollectionDataSource(Array((2, "2")))
-
-    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
-      val result = ws join s where {_._2} isEqualTo {_._2} map { (l, r) => l }
-      (result, ws)
-    }
-    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
-    val output = iteration.write("/dummy", CsvOutputFormat())
-
-    val plan = new ScalaPlan(Seq(output))
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectJoinWithSolution3 {
-    val solutionInput = CollectionDataSource(Array((1, "1")))
-    val worksetInput = CollectionDataSource(Array((2, "2")))
-
-    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
-      val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
-      (result, ws)
-    }
-    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
-
-    val output = iteration.write("/dummy", CsvOutputFormat())
-
-    val plan = new ScalaPlan(Seq(output))
-   }
-
-  @Test
-  def testCorrectCoGroupWithSolution1 {
-    val solutionInput = CollectionDataSource(Array((1, "1")))
-    val worksetInput = CollectionDataSource(Array((2, "2")))
-
-    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
-      val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
-      (result, ws)
-    }
-    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
-    val output = iteration.write("/dummy", CsvOutputFormat())
-
-    val plan = new ScalaPlan(Seq(output))
-  }
-
-  @Test
-  def testCorrectCoGroupWithSolution2 {
-    val solutionInput = CollectionDataSource(Array((1, "1")))
-    val worksetInput = CollectionDataSource(Array((2, "2")))
-
-    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
-      val result = ws cogroup s where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
-      (result, ws)
-    }
-    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
-    val output = iteration.write("/dummy", CsvOutputFormat())
-
-    val plan = new ScalaPlan(Seq(output))
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectCoGroupWithSolution1 {
-    val solutionInput = CollectionDataSource(Array((1, "1")))
-    val worksetInput = CollectionDataSource(Array((2, "2")))
-
-    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
-      val result = s cogroup ws where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
-      (result, ws)
-    }
-    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
-    val output = iteration.write("/dummy", CsvOutputFormat())
-
-    val plan = new ScalaPlan(Seq(output))
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectCoGroupWithSolution2 {
-    val solutionInput = CollectionDataSource(Array((1, "1")))
-    val worksetInput = CollectionDataSource(Array((2, "2")))
-
-    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
-      val result = ws cogroup s where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
-      (result, ws)
-    }
-    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
-    val output = iteration.write("/dummy", CsvOutputFormat())
-
-    val plan = new ScalaPlan(Seq(output))
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectCoGroupWithSolution3 {
-    val solutionInput = CollectionDataSource(Array((1, "1")))
-    val worksetInput = CollectionDataSource(Array((2, "2")))
-
-    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
-      val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
-      (result, ws)
-    }
-    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
-
-    val output = iteration.write("/dummy", CsvOutputFormat())
-
-    val plan = new ScalaPlan(Seq(output))
-  }
-}
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.api.scala
+//
+//import org.junit.Test
+//import org.apache.flink.api.common.InvalidProgramException
+//
+//import org.apache.flink.api.scala._
+//import org.apache.flink.api.scala.operators._
+//import org.scalatest.junit.AssertionsForJUnit
+//
+//// Verify that the sanity checking in delta iterations works. We just
+//// have a dummy job that is not meant to be executed. Only verify that
+//// the join/coGroup inside the iteration is checked.
+//class DeltaIterationSanityCheckTest extends Serializable {
+//
+//  @Test
+//  def testCorrectJoinWithSolution1 {
+//    val solutionInput = CollectionDataSource(Array((1, "1")))
+//    val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+//      val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
+//      (result, ws)
+//    }
+//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+//    val output = iteration.write("/dummy", CsvOutputFormat())
+//
+//    val plan = new ScalaPlan(Seq(output))
+//  }
+//
+//  @Test
+//  def testCorrectJoinWithSolution2 {
+//    val solutionInput = CollectionDataSource(Array((1, "1")))
+//    val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+//      val result = ws join s where {_._1} isEqualTo {_._1} map { (l, r) => l }
+//      (result, ws)
+//    }
+//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+//    val output = iteration.write("/dummy", CsvOutputFormat())
+//
+//    val plan = new ScalaPlan(Seq(output))
+//  }
+//
+//  @Test(expected = classOf[InvalidProgramException])
+//  def testIncorrectJoinWithSolution1 {
+//    val solutionInput = CollectionDataSource(Array((1, "1")))
+//    val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+//      val result = s join ws where {_._2} isEqualTo {_._2} map { (l, r) => l }
+//      (result, ws)
+//    }
+//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+//    val output = iteration.write("/dummy", CsvOutputFormat())
+//
+//    val plan = new ScalaPlan(Seq(output))
+//  }
+//
+//  @Test(expected = classOf[InvalidProgramException])
+//  def testIncorrectJoinWithSolution2 {
+//    val solutionInput = CollectionDataSource(Array((1, "1")))
+//    val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+//      val result = ws join s where {_._2} isEqualTo {_._2} map { (l, r) => l }
+//      (result, ws)
+//    }
+//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+//    val output = iteration.write("/dummy", CsvOutputFormat())
+//
+//    val plan = new ScalaPlan(Seq(output))
+//  }
+//
+//  @Test(expected = classOf[InvalidProgramException])
+//  def testIncorrectJoinWithSolution3 {
+//    val solutionInput = CollectionDataSource(Array((1, "1")))
+//    val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+//      val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
+//      (result, ws)
+//    }
+//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
+//
+//    val output = iteration.write("/dummy", CsvOutputFormat())
+//
+//    val plan = new ScalaPlan(Seq(output))
+//   }
+//
+//  @Test
+//  def testCorrectCoGroupWithSolution1 {
+//    val solutionInput = CollectionDataSource(Array((1, "1")))
+//    val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+//      val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+//      (result, ws)
+//    }
+//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+//    val output = iteration.write("/dummy", CsvOutputFormat())
+//
+//    val plan = new ScalaPlan(Seq(output))
+//  }
+//
+//  @Test
+//  def testCorrectCoGroupWithSolution2 {
+//    val solutionInput = CollectionDataSource(Array((1, "1")))
+//    val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+//      val result = ws cogroup s where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+//      (result, ws)
+//    }
+//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+//    val output = iteration.write("/dummy", CsvOutputFormat())
+//
+//    val plan = new ScalaPlan(Seq(output))
+//  }
+//
+//  @Test(expected = classOf[InvalidProgramException])
+//  def testIncorrectCoGroupWithSolution1 {
+//    val solutionInput = CollectionDataSource(Array((1, "1")))
+//    val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+//      val result = s cogroup ws where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
+//      (result, ws)
+//    }
+//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+//    val output = iteration.write("/dummy", CsvOutputFormat())
+//
+//    val plan = new ScalaPlan(Seq(output))
+//  }
+//
+//  @Test(expected = classOf[InvalidProgramException])
+//  def testIncorrectCoGroupWithSolution2 {
+//    val solutionInput = CollectionDataSource(Array((1, "1")))
+//    val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+//      val result = ws cogroup s where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
+//      (result, ws)
+//    }
+//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+//    val output = iteration.write("/dummy", CsvOutputFormat())
+//
+//    val plan = new ScalaPlan(Seq(output))
+//  }
+//
+//  @Test(expected = classOf[InvalidProgramException])
+//  def testIncorrectCoGroupWithSolution3 {
+//    val solutionInput = CollectionDataSource(Array((1, "1")))
+//    val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+//    def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+//      val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+//      (result, ws)
+//    }
+//    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
+//
+//    val output = iteration.write("/dummy", CsvOutputFormat())
+//
+//    val plan = new ScalaPlan(Seq(output))
+//  }
+//}