You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:45 UTC
[03/60] Rewrite the Scala API as (somewhat) thin Layer on Java API
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/JoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/JoinOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/JoinOperator.scala
deleted file mode 100644
index a92d37a..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/JoinOperator.scala
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.operators
-
-import language.experimental.macros
-import scala.reflect.macros.Context
-
-import java.util.{ Iterator => JIterator }
-
-import org.apache.flink.types.Record
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.java.record.operators.JoinOperator
-import org.apache.flink.api.java.record.functions.{JoinFunction => JJoinFunction}
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper
-import org.apache.flink.configuration.Configuration
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.codegen.{MacroContextHolder, Util}
-import org.apache.flink.api.scala.functions.{JoinFunctionBase, JoinFunction, FlatJoinFunction}
-import org.apache.flink.api.scala.analysis.FieldSelector
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.api.scala.TwoInputHintable
-
-class JoinDataSet[LeftIn, RightIn](val leftInput: DataSet[LeftIn], val rightInput: DataSet[RightIn]) {
- def where[Key](keyFun: LeftIn => Key) = macro JoinMacros.whereImpl[LeftIn, RightIn, Key]
-}
-
-class JoinDataSetWithWhere[LeftIn, RightIn, Key](val leftKey: List[Int], val leftInput: DataSet[LeftIn], val rightInput: DataSet[RightIn]) {
- def isEqualTo[Key](keyFun: RightIn => Key) = macro JoinMacros.isEqualToImpl[LeftIn, RightIn, Key]
-}
-
-class JoinDataSetWithWhereAndEqual[LeftIn, RightIn](val leftKey: List[Int], val rightKey: List[Int], val leftInput: DataSet[LeftIn], val rightInput: DataSet[RightIn]) {
- def map[Out](fun: (LeftIn, RightIn) => Out): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro JoinMacros.map[LeftIn, RightIn, Out]
- def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out] = macro JoinMacros.flatMap[LeftIn, RightIn, Out]
- def filter(fun: (LeftIn, RightIn) => Boolean): DataSet[(LeftIn, RightIn)] with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)] = macro JoinMacros.filter[LeftIn, RightIn]
-}
-
-class NoKeyMatchBuilder(s: JJoinFunction) extends JoinOperator.Builder(new UserCodeObjectWrapper(s))
-
-object JoinMacros {
-
- def whereImpl[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = JoinDataSet[LeftIn, RightIn] })(keyFun: c.Expr[LeftIn => Key]): c.Expr[JoinDataSetWithWhere[LeftIn, RightIn, Key]] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val keySelection = slave.getSelector(keyFun)
-
- val helper = reify {
- val helper = c.prefix.splice
- new JoinDataSetWithWhere[LeftIn, RightIn, Key](keySelection.splice, helper.leftInput, helper.rightInput)
- }
-
- return helper
- }
-
- def isEqualToImpl[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = JoinDataSetWithWhere[LeftIn, RightIn, Key] })(keyFun: c.Expr[RightIn => Key]): c.Expr[JoinDataSetWithWhereAndEqual[LeftIn, RightIn]] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val keySelection = slave.getSelector(keyFun)
-
- val helper = reify {
- val helper = c.prefix.splice
- new JoinDataSetWithWhereAndEqual[LeftIn, RightIn](helper.leftKey, keySelection.splice, helper.leftInput, helper.rightInput)
- }
-
- return helper
- }
-
- def map[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = JoinDataSetWithWhereAndEqual[LeftIn, RightIn] })(fun: c.Expr[(LeftIn, RightIn) => Out]): c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val (udtLeftIn, createUdtLeftIn) = slave.mkUdtClass[LeftIn]
- val (udtRightIn, createUdtRightIn) = slave.mkUdtClass[RightIn]
- val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
-
- val stub: c.Expr[JoinFunctionBase[LeftIn, RightIn, Out]] = if (fun.actualType <:< weakTypeOf[JoinFunction[LeftIn, RightIn, Out]])
- reify { fun.splice.asInstanceOf[JoinFunctionBase[LeftIn, RightIn, Out]] }
- else reify {
- implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
- implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
- implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
- new JoinFunctionBase[LeftIn, RightIn, Out] {
- override def join(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
- val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
- val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
- val output = fun.splice.apply(left, right)
-
- leftRecord.setNumFields(outputLength)
- for (field <- leftDiscard)
- leftRecord.setNull(field)
-
- leftRecord.copyFrom(rightRecord, rightForwardFrom, rightForwardTo)
- leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
-
- serializer.serialize(output, leftRecord)
- out.collect(leftRecord)
- }
- }
- }
- val contract = reify {
- val helper: JoinDataSetWithWhereAndEqual[LeftIn, RightIn] = c.prefix.splice
- val leftInput = helper.leftInput.contract
- val rightInput = helper.rightInput.contract
- val generatedStub = ClosureCleaner.clean(stub.splice)
- val leftKeySelector = new FieldSelector(generatedStub.leftInputUDT, helper.leftKey)
- val rightKeySelector = new FieldSelector(generatedStub.rightInputUDT, helper.rightKey)
-
- val builder = new NoKeyMatchBuilder(generatedStub).input1(leftInput).input2(rightInput)
-
- val leftKeyPositions = leftKeySelector.selectedFields.toIndexArray
- val rightKeyPositions = rightKeySelector.selectedFields.toIndexArray
-
- val keyTypes = generatedStub.leftInputUDT.getKeySet(leftKeyPositions)
- // global indexes haven't been computed yet...
- 0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), leftKeyPositions(i), rightKeyPositions(i)) }
-
-
-
- val ret = new JoinOperator(builder) with TwoInputKeyedScalaOperator[LeftIn, RightIn, Out] {
- override val leftKey: FieldSelector = leftKeySelector
- override val rightKey: FieldSelector = rightKeySelector
- override def getUDF = generatedStub.udf
- override def annotations = Seq(
- Annotations.getConstantFieldsFirst(
- Util.filterNonForwards(getUDF.getLeftForwardIndexArrayFrom, getUDF.getLeftForwardIndexArrayTo)),
- Annotations.getConstantFieldsSecond(
- Util.filterNonForwards(getUDF.getRightForwardIndexArrayFrom, getUDF.getRightForwardIndexArrayTo)))
- }
- new DataSet[Out](ret) with TwoInputHintable[LeftIn, RightIn, Out] {}
- }
-
- val result = c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]](Block(List(udtLeftIn, udtRightIn, udtOut), contract.tree))
-
- return result
- }
-
- def flatMap[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = JoinDataSetWithWhereAndEqual[LeftIn, RightIn] })(fun: c.Expr[(LeftIn, RightIn) => Iterator[Out]]): c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val (udtLeftIn, createUdtLeftIn) = slave.mkUdtClass[LeftIn]
- val (udtRightIn, createUdtRightIn) = slave.mkUdtClass[RightIn]
- val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
-
- val stub: c.Expr[JoinFunctionBase[LeftIn, RightIn, Out]] = if (fun.actualType <:< weakTypeOf[JoinFunction[LeftIn, RightIn, Out]])
- reify { fun.splice.asInstanceOf[JoinFunctionBase[LeftIn, RightIn, Out]] }
- else reify {
- implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
- implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
- implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
- new JoinFunctionBase[LeftIn, RightIn, Out] {
- override def join(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
- val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
- val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
- val output = fun.splice.apply(left, right)
-
- if (output.nonEmpty) {
-
- leftRecord.setNumFields(outputLength)
-
- for (field <- leftDiscard)
- leftRecord.setNull(field)
-
- leftRecord.copyFrom(rightRecord, rightForwardFrom, rightForwardTo)
- leftRecord.copyFrom(leftRecord, leftForwardFrom, leftForwardTo)
-
- for (item <- output) {
- serializer.serialize(item, leftRecord)
- out.collect(leftRecord)
- }
- }
- }
- }
- }
- val contract = reify {
- val helper: JoinDataSetWithWhereAndEqual[LeftIn, RightIn] = c.prefix.splice
- val leftInput = helper.leftInput.contract
- val rightInput = helper.rightInput.contract
- val generatedStub = ClosureCleaner.clean(stub.splice)
- val leftKeySelector = new FieldSelector(generatedStub.leftInputUDT, helper.leftKey)
- val rightKeySelector = new FieldSelector(generatedStub.rightInputUDT, helper.rightKey)
- val builder = new NoKeyMatchBuilder(generatedStub).input1(leftInput).input2(rightInput)
-
- val leftKeyPositions = leftKeySelector.selectedFields.toIndexArray
- val rightKeyPositions = rightKeySelector.selectedFields.toIndexArray
- val keyTypes = generatedStub.leftInputUDT.getKeySet(leftKeyPositions)
- // global indexes haven't been computed yet...
- 0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), leftKeyPositions(i), rightKeyPositions(i)) }
-
-
- val ret = new JoinOperator(builder) with TwoInputKeyedScalaOperator[LeftIn, RightIn, Out] {
- override val leftKey: FieldSelector = leftKeySelector
- override val rightKey: FieldSelector = rightKeySelector
- override def getUDF = generatedStub.udf
- override def annotations = Seq(
- Annotations.getConstantFieldsFirst(
- Util.filterNonForwards(getUDF.getLeftForwardIndexArrayFrom, getUDF.getLeftForwardIndexArrayTo)),
- Annotations.getConstantFieldsSecond(
- Util.filterNonForwards(getUDF.getRightForwardIndexArrayFrom, getUDF.getRightForwardIndexArrayTo)))
- }
- new DataSet[Out](ret) with TwoInputHintable[LeftIn, RightIn, Out] {}
- }
-
- val result = c.Expr[DataSet[Out] with TwoInputHintable[LeftIn, RightIn, Out]](Block(List(udtLeftIn, udtRightIn, udtOut), contract.tree))
-
- return result
- }
-
- def filter[LeftIn: c.WeakTypeTag, RightIn: c.WeakTypeTag](c: Context { type PrefixType = JoinDataSetWithWhereAndEqual[LeftIn, RightIn] })(fun: c.Expr[(LeftIn, RightIn) => Boolean]): c.Expr[DataSet[(LeftIn, RightIn)] with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)]] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val (udtLeftIn, createUdtLeftIn) = slave.mkUdtClass[LeftIn]
- val (udtRightIn, createUdtRightIn) = slave.mkUdtClass[RightIn]
- val (udtOut, createUdtOut) = slave.mkUdtClass[(LeftIn, RightIn)]
-
- val stub: c.Expr[JoinFunctionBase[LeftIn, RightIn, (LeftIn, RightIn)]] = if (fun.actualType <:< weakTypeOf[JoinFunction[LeftIn, RightIn, (LeftIn, RightIn)]])
- reify { fun.splice.asInstanceOf[JoinFunctionBase[LeftIn, RightIn, (LeftIn, RightIn)]] }
- else reify {
- implicit val leftInputUDT: UDT[LeftIn] = c.Expr[UDT[LeftIn]](createUdtLeftIn).splice
- implicit val rightInputUDT: UDT[RightIn] = c.Expr[UDT[RightIn]](createUdtRightIn).splice
- implicit val outputUDT: UDT[(LeftIn, RightIn)] = c.Expr[UDT[(LeftIn, RightIn)]](createUdtOut).splice
- new JoinFunctionBase[LeftIn, RightIn, (LeftIn, RightIn)] {
- override def join(leftRecord: Record, rightRecord: Record, out: Collector[Record]) = {
- val left = leftDeserializer.deserializeRecyclingOn(leftRecord)
- val right = rightDeserializer.deserializeRecyclingOn(rightRecord)
- if (fun.splice.apply(left, right)) {
- val output = (left, right)
- leftRecord.setNumFields(outputLength)
- serializer.serialize(output, leftRecord)
- out.collect(leftRecord)
- }
- }
- }
- }
- val contract = reify {
- val helper: JoinDataSetWithWhereAndEqual[LeftIn, RightIn] = c.prefix.splice
- val leftInput = helper.leftInput.contract
- val rightInput = helper.rightInput.contract
- val generatedStub = ClosureCleaner.clean(stub.splice)
- val leftKeySelector = new FieldSelector(generatedStub.leftInputUDT, helper.leftKey)
- val rightKeySelector = new FieldSelector(generatedStub.rightInputUDT, helper.rightKey)
- val builder = new NoKeyMatchBuilder(generatedStub).input1(leftInput).input2(rightInput)
-
- val leftKeyPositions = leftKeySelector.selectedFields.toIndexArray
- val rightKeyPositions = rightKeySelector.selectedFields.toIndexArray
- val keyTypes = generatedStub.leftInputUDT.getKeySet(leftKeyPositions)
- // global indexes haven't been computed yet...
- 0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), leftKeyPositions(i), rightKeyPositions(i)) }
-
-
- val ret = new JoinOperator(builder) with TwoInputKeyedScalaOperator[LeftIn, RightIn, (LeftIn, RightIn)] {
- override val leftKey: FieldSelector = leftKeySelector
- override val rightKey: FieldSelector = rightKeySelector
- override def getUDF = generatedStub.udf
- override def annotations = Seq(
- Annotations.getConstantFieldsFirst(
- Util.filterNonForwards(getUDF.getLeftForwardIndexArrayFrom, getUDF.getLeftForwardIndexArrayTo)),
- Annotations.getConstantFieldsSecond(
- Util.filterNonForwards(getUDF.getRightForwardIndexArrayFrom, getUDF.getRightForwardIndexArrayTo)))
- }
- new DataSet[(LeftIn, RightIn)](ret) with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)] {}
- }
-
- val result = c.Expr[DataSet[(LeftIn, RightIn)] with TwoInputHintable[LeftIn, RightIn, (LeftIn, RightIn)]](Block(List(udtLeftIn, udtRightIn, udtOut), contract.tree))
-
- return result
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/MapOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/MapOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/MapOperator.scala
deleted file mode 100644
index 824eb9d..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/MapOperator.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.operators
-
-import language.experimental.macros
-
-import scala.reflect.macros.Context
-
-import org.apache.flink.api.scala.codegen.{MacroContextHolder, Util}
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.api.scala.OneInputHintable
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.functions.{MapFunction, MapFunctionBase}
-import org.apache.flink.api.scala.functions.{FlatMapFunction, FilterFunction}
-
-import org.apache.flink.api.java.record.operators.MapOperator
-import org.apache.flink.types.Record
-import org.apache.flink.util.Collector
-
-
-object MapMacros {
-
- def map[In: c.WeakTypeTag, Out: c.WeakTypeTag]
- (c: Context { type PrefixType = DataSet[In] })
- (fun: c.Expr[In => Out]): c.Expr[DataSet[Out]
- with OneInputHintable[In, Out]] = {
-
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
-// val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
- val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
- val (udtOut, createUdtOut) = slave.mkUdtClass[Out]()
-
- val stub: c.Expr[MapFunctionBase[In, Out]] =
- if (fun.actualType <:< weakTypeOf[MapFunction[In, Out]])
- reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
- else
- reify {
- implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
- implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
- new MapFunctionBase[In, Out] {
- // val userFun = ClosureCleaner.clean(fun.splice)
- // val userFun = fun.splice
- override def map(record: Record, out: Collector[Record]) = {
- val input = deserializer.deserializeRecyclingOn(record)
- val output = fun.splice.apply(input)
-
- record.setNumFields(outputLength)
-
- for (field <- discard)
- record.setNull(field)
-
- serializer.serialize(output, record)
- out.collect(record)
- }
- }
- }
-
- val contract = reify {
- val input = c.prefix.splice.contract
- val generatedStub = ClosureCleaner.clean(stub.splice)
- val builder = MapOperator.builder(generatedStub).input(input)
-
- val contract = new MapOperator(builder) with OneInputScalaOperator[In, Out] {
- override def getUDF = generatedStub.udf
- override def annotations = Seq(
- Annotations.getConstantFields(
- Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
- }
- val stream = new DataSet[Out](contract) with OneInputHintable[In, Out] {}
- contract.persistHints = { () => stream.applyHints(contract) }
- stream
- }
-
- val result = c.Expr[DataSet[Out]
- with OneInputHintable[In, Out]](Block(List(udtIn, udtOut), contract.tree))
-
- result
- }
-
- def flatMap[In: c.WeakTypeTag, Out: c.WeakTypeTag]
- (c: Context { type PrefixType = DataSet[In] })
- (fun: c.Expr[In => Iterator[Out]]): c.Expr[DataSet[Out]
- with OneInputHintable[In, Out]] = {
-
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
-// val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
- val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
- val (udtOut, createUdtOut) = slave.mkUdtClass[Out]()
-
- val stub: c.Expr[MapFunctionBase[In, Out]] =
- if (fun.actualType <:< weakTypeOf[FlatMapFunction[In, Out]])
- reify { fun.splice.asInstanceOf[MapFunctionBase[In, Out]] }
- else reify {
- implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
- implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
- new MapFunctionBase[In, Out] {
- override def map(record: Record, out: Collector[Record]) = {
- val input = deserializer.deserializeRecyclingOn(record)
- val output = fun.splice.apply(input)
-
- if (output.nonEmpty) {
-
- record.setNumFields(outputLength)
-
- for (field <- discard)
- record.setNull(field)
-
- for (item <- output) {
-
- serializer.serialize(item, record)
- out.collect(record)
- }
- }
- }
- }
- }
- val contract = reify {
- val input = c.prefix.splice.contract
- val generatedStub = ClosureCleaner.clean(stub.splice)
- val builder = MapOperator.builder(generatedStub).input(input)
-
- val contract = new MapOperator(builder) with OneInputScalaOperator[In, Out] {
- override def getUDF = generatedStub.udf
- override def annotations = Seq(
- Annotations.getConstantFields(
- Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
- }
- val stream = new DataSet[Out](contract) with OneInputHintable[In, Out] {}
- contract.persistHints = { () => stream.applyHints(contract) }
- stream
- }
-
- val result = c.Expr[DataSet[Out] with OneInputHintable[In, Out]](Block(List(udtIn, udtOut),
- contract.tree))
-
- result
- }
-
- def filter[In: c.WeakTypeTag]
- (c: Context { type PrefixType = DataSet[In] })
- (fun: c.Expr[In => Boolean]): c.Expr[DataSet[In]
-
- with OneInputHintable[In, In]] = {
-
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- // val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
- val (udtIn, createUdtIn) = slave.mkUdtClass[In]()
-
- val stub: c.Expr[MapFunctionBase[In, In]] =
- if (fun.actualType <:< weakTypeOf[FilterFunction[In, In]])
- reify { fun.splice.asInstanceOf[MapFunctionBase[In, In]] }
- else
- reify {
- implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
- new MapFunctionBase[In, In] {
- override def map(record: Record, out: Collector[Record]) = {
- val input = deserializer.deserializeRecyclingOn(record)
- if (fun.splice.apply(input)) {
- out.collect(record)
- }
- }
- }
- }
- val contract = reify {
- val input = c.prefix.splice.contract
- val generatedStub = ClosureCleaner.clean(stub.splice)
- val builder = MapOperator.builder(generatedStub).input(input)
-
- val contract = new MapOperator(builder) with OneInputScalaOperator[In, In] {
- override def getUDF = generatedStub.udf
- override def annotations = Seq(
- Annotations.getConstantFields(
- Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
- }
- val stream = new DataSet[In](contract) with OneInputHintable[In, In] {}
- contract.persistHints = { () =>
- stream.applyHints(contract)
- 0 until generatedStub.udf.getOutputLength foreach { i => stream.markCopied(i, i) }
- }
- stream
- }
-
- val result = c.Expr[DataSet[In]
- with OneInputHintable[In, In]](Block(List(udtIn), contract.tree))
-
- result
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
deleted file mode 100644
index f25b5a3..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/ReduceOperator.scala
+++ /dev/null
@@ -1,363 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.operators
-
-import language.experimental.macros
-import scala.language.reflectiveCalls
-import scala.reflect.macros.Context
-
-import java.util.{ Iterator => JIterator }
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.api.scala.OneInputHintable
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.analysis._
-import org.apache.flink.api.scala.codegen.{MacroContextHolder, Util}
-import org.apache.flink.api.scala.functions.{ReduceFunction, ReduceFunctionBase, CombinableGroupReduceFunction, GroupReduceFunction}
-import org.apache.flink.api.scala.analysis.UDF1
-import org.apache.flink.api.scala.analysis.FieldSelector
-
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.java.record.operators.MapOperator
-import org.apache.flink.types.Record
-import org.apache.flink.types.IntValue
-import org.apache.flink.api.java.record.operators.ReduceOperator
-import org.apache.flink.api.java.record.functions.{ReduceFunction => JReduceFunction}
-
-
-class KeyedDataSet[In](val keySelection: List[Int], val input: DataSet[In]) {
- def reduceGroup[Out](fun: Iterator[In] => Out): DataSet[Out] with OneInputHintable[In, Out] = macro ReduceMacros.reduceGroup[In, Out]
- def combinableReduceGroup(fun: Iterator[In] => In): DataSet[In] with OneInputHintable[In, In] = macro ReduceMacros.combinableReduceGroup[In]
-
- def reduce(fun: (In, In) => In): DataSet[In] with OneInputHintable[In, In] = macro ReduceMacros.reduce[In]
-
- def count() : DataSet[(In, Int)] with OneInputHintable[In, (In, Int)] = macro ReduceMacros.count[In]
-}
-
-object ReduceMacros {
-
- def groupBy[In: c.WeakTypeTag, Key: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })
- (keyFun: c.Expr[In => Key]): c.Expr[KeyedDataSet[In]] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val keySelection = slave.getSelector(keyFun)
-
- val helper = reify {
- new KeyedDataSet[In](keySelection.splice, c.prefix.splice)
- }
-
- return helper
- }
-
- def reduce[In: c.WeakTypeTag](c: Context { type PrefixType = KeyedDataSet[In] })
- (fun: c.Expr[(In, In) => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
- import c.universe._
-
- reduceImpl(c)(c.prefix, fun)
- }
-
- def globalReduce[In: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })(fun: c.Expr[(In, In) => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
- import c.universe._
-
- reduceImpl(c)(reify { new KeyedDataSet[In](List[Int](), c.prefix.splice) }, fun)
- }
-
- def reduceGroup[In: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = KeyedDataSet[In] })
- (fun: c.Expr[Iterator[In] => Out]): c.Expr[DataSet[Out] with OneInputHintable[In, Out]] = {
- import c.universe._
-
- reduceGroupImpl(c)(c.prefix, fun)
- }
-
- def globalReduceGroup[In: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })(fun: c.Expr[Iterator[In] => Out]): c.Expr[DataSet[Out] with OneInputHintable[In, Out]] = {
- import c.universe._
-
- reduceGroupImpl(c)(reify { new KeyedDataSet[In](List[Int](), c.prefix.splice) }, fun)
- }
-
- def combinableReduceGroup[In: c.WeakTypeTag](c: Context { type PrefixType = KeyedDataSet[In] })
- (fun: c.Expr[Iterator[In] => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
- import c.universe._
-
- combinableReduceGroupImpl(c)(c.prefix, fun)
- }
-
- def combinableGlobalReduceGroup[In: c.WeakTypeTag](c: Context { type PrefixType = DataSet[In] })
- (fun: c.Expr[Iterator[In] => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
- import c.universe._
-
- combinableReduceGroupImpl(c)(reify { new KeyedDataSet[In](List[Int](), c.prefix.splice) }, fun)
- }
-
- def reduceImpl[In: c.WeakTypeTag](c: Context)
- (groupedInput: c.Expr[KeyedDataSet[In]], fun: c.Expr[(In, In) => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
-// val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
- val (udtIn, createUdtIn) = slave.mkUdtClass[In]
-
- val stub: c.Expr[ReduceFunctionBase[In, In]] = if (fun.actualType <:< weakTypeOf[ReduceFunction[In]])
- reify { fun.splice.asInstanceOf[ReduceFunctionBase[In, In]] }
- else reify {
- implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
-
- new ReduceFunctionBase[In, In] {
- override def combine(records: JIterator[Record], out: Collector[Record]) = {
- reduce(records, out)
- }
-
- override def reduce(records: JIterator[Record], out: Collector[Record]) = {
- if (records.hasNext) {
- val firstRecord = reduceIterator.initialize(records)
- reduceRecord.copyFrom(firstRecord, reduceForwardFrom, reduceForwardTo)
-
- val output = reduceIterator.reduce(fun.splice)
-
- reduceSerializer.serialize(output, reduceRecord)
- out.collect(reduceRecord)
- }
- }
- }
-
- }
- val contract = reify {
- val helper = groupedInput.splice
- val input = helper.input.contract
- val generatedStub = ClosureCleaner.clean(stub.splice)
- val keySelection = helper.keySelection
- val keySelector = new FieldSelector(generatedStub.inputUDT, keySelection)
-
- val builder = ReduceOperator.builder(generatedStub).input(input)
-
- val keyPositions = keySelector.selectedFields.toIndexArray
- val keyTypes = generatedStub.inputUDT.getKeySet(keyPositions)
- // global indexes haven't been computed yet...
- 0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), keyPositions(i)) }
-
- val ret = new ReduceOperator(builder) with OneInputKeyedScalaOperator[In, In] {
- override val key: FieldSelector = keySelector
- override def getUDF = generatedStub.udf
- override def annotations = Annotations.getCombinable() +: Seq(
- Annotations.getConstantFields(
- Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
- }
- new DataSet[In](ret) with OneInputHintable[In, In] {}
- }
-
- val result = c.Expr[DataSet[In] with OneInputHintable[In, In]](Block(List(udtIn), contract.tree))
-
- return result
- }
-
- def reduceGroupImpl[In: c.WeakTypeTag, Out: c.WeakTypeTag](c: Context)
- (groupedInput: c.Expr[KeyedDataSet[In]], fun: c.Expr[Iterator[In] => Out]): c.Expr[DataSet[Out] with OneInputHintable[In, Out]] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
-// val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
- val (udtIn, createUdtIn) = slave.mkUdtClass[In]
- val (udtOut, createUdtOut) = slave.mkUdtClass[Out]
-
- val stub: c.Expr[ReduceFunctionBase[In, Out]] = if (fun.actualType <:< weakTypeOf[GroupReduceFunction[In, Out]])
- reify { fun.splice.asInstanceOf[ReduceFunctionBase[In, Out]] }
- else reify {
- implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
- implicit val outputUDT: UDT[Out] = c.Expr[UDT[Out]](createUdtOut).splice
-
- new ReduceFunctionBase[In, Out] {
- override def reduce(recordsIterable: JIterator[Record], out: Collector[Record]) = {
- val records: JIterator[Record] = recordsIterable
-
- if (records.hasNext) {
- val firstRecord = reduceIterator.initialize(records)
- reduceRecord.copyFrom(firstRecord, reduceForwardFrom, reduceForwardTo)
-
- val output = fun.splice.apply(reduceIterator)
-
- reduceSerializer.serialize(output, reduceRecord)
- out.collect(reduceRecord)
- }
- }
- }
- }
- val contract = reify {
- val helper = groupedInput.splice
- val input = helper.input.contract
- val generatedStub = ClosureCleaner.clean(stub.splice)
- val keySelection = helper.keySelection
- val keySelector = new FieldSelector(generatedStub.inputUDT, keySelection)
- val builder = ReduceOperator.builder(generatedStub).input(input)
-
- val keyPositions = keySelector.selectedFields.toIndexArray
- val keyTypes = generatedStub.inputUDT.getKeySet(keyPositions)
- // global indexes haven't been computed yet...
- 0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), keyPositions(i)) }
-
- val ret = new ReduceOperator(builder) with OneInputKeyedScalaOperator[In, Out] {
- override val key: FieldSelector = keySelector
- override def getUDF = generatedStub.udf
- override def annotations = Seq(
- Annotations.getConstantFields(
- Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
- }
- new DataSet[Out](ret) with OneInputHintable[In, Out] {}
- }
-
- val result = c.Expr[DataSet[Out] with OneInputHintable[In, Out]](Block(List(udtIn, udtOut), contract.tree))
-
- return result
- }
-
- def combinableReduceGroupImpl[In: c.WeakTypeTag](c: Context)
- (groupedInput: c.Expr[KeyedDataSet[In]], fun: c.Expr[Iterator[In] => In]): c.Expr[DataSet[In] with OneInputHintable[In, In]] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
-// val (paramName, udfBody) = slave.extractOneInputUdf(fun.tree)
-
- val (udtIn, createUdtIn) = slave.mkUdtClass[In]
-
- val stub: c.Expr[ReduceFunctionBase[In, In]] = if (fun.actualType <:< weakTypeOf[CombinableGroupReduceFunction[In, In]])
- reify { fun.splice.asInstanceOf[ReduceFunctionBase[In, In]] }
- else reify {
- implicit val inputUDT: UDT[In] = c.Expr[UDT[In]](createUdtIn).splice
-
- new ReduceFunctionBase[In, In] {
- override def combine(records: JIterator[Record], out: Collector[Record]) = {
- reduce(records, out)
- }
-
- override def reduce(records: JIterator[Record], out: Collector[Record]) = {
- val firstRecord = reduceIterator.initialize(records)
- reduceRecord.copyFrom(firstRecord, reduceForwardFrom, reduceForwardTo)
-
- val output = fun.splice.apply(reduceIterator)
-
- reduceSerializer.serialize(output, reduceRecord)
- out.collect(reduceRecord)
- }
- }
- }
- val contract = reify {
- val helper = groupedInput.splice
- val input = helper.input.contract
- val generatedStub = ClosureCleaner.clean(stub.splice)
- val keySelection = helper.keySelection
- val keySelector = new FieldSelector(generatedStub.inputUDT, keySelection)
- val builder = ReduceOperator.builder(generatedStub).input(input)
-
- val keyPositions = keySelector.selectedFields.toIndexArray
- val keyTypes = generatedStub.inputUDT.getKeySet(keyPositions)
- // global indexes haven't been computed yet...
- 0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), keyPositions(i)) }
-
- val ret = new ReduceOperator(builder) with OneInputKeyedScalaOperator[In, In] {
- override val key: FieldSelector = keySelector
- override def getUDF = generatedStub.udf
- override def annotations = Annotations.getCombinable() +: Seq(
- Annotations.getConstantFields(
- Util.filterNonForwards(getUDF.getForwardIndexArrayFrom, getUDF.getForwardIndexArrayTo)))
- }
- new DataSet[In](ret) with OneInputHintable[In, In] {}
- }
-
- val result = c.Expr[DataSet[In] with OneInputHintable[In, In]](Block(List(udtIn), contract.tree))
-
- return result
- }
-
- def count[In: c.WeakTypeTag](c: Context { type PrefixType = KeyedDataSet[In] })() : c.Expr[DataSet[(In, Int)] with OneInputHintable[In, (In, Int)]] = {
- import c.universe._
-
- val slave = MacroContextHolder.newMacroHelper(c)
-
- val (udtIn, createUdtIn) = slave.mkUdtClass[In]
- val (udtOut, createUdtOut) = slave.mkUdtClass[(In, Int)]
-
- val contract = reify {
- val helper: KeyedDataSet[In] = c.prefix.splice
- val keySelection = helper.keySelection
-
- val generatedStub = new JReduceFunction with Serializable {
- val inputUDT = c.Expr[UDT[In]](createUdtIn).splice
- val outputUDT = c.Expr[UDT[(In, Int)]](createUdtOut).splice
- val keySelector = new FieldSelector(inputUDT, keySelection)
- val udf: UDF1[In, (In, Int)] = new UDF1(inputUDT, outputUDT)
-
- private val reduceRecord = new Record()
- private val pactInt = new IntValue()
-
- private var countPosition: Int = 0;
-
- override def open(config: Configuration) = {
- super.open(config)
- this.countPosition = udf.getOutputLength - 1;
- }
-
- override def reduce(records: JIterator[Record], result: Collector[Record]) : Unit = {
-
- var record : Record = null
- var counter: Int = 0
- while (records.hasNext()) {
- record = records.next()
- val count = if (record.getNumFields() <= countPosition || record.isNull(countPosition)) 1 else record.getField(countPosition, pactInt).getValue()
- counter = counter + count
- }
-
- pactInt.setValue(counter)
- record.setField(countPosition, pactInt)
- result.collect(record)
- }
-
- override def combine(records: JIterator[Record], result: Collector[Record]) : Unit = {
- reduce(records, result)
- }
-
- }
-
- val builder = ReduceOperator.builder(generatedStub).input(helper.input.contract)
-
- val keyPositions = generatedStub.keySelector.selectedFields.toIndexArray
- val keyTypes = generatedStub.inputUDT.getKeySet(keyPositions)
- // global indexes haven't been computed yet...
- 0 until keyTypes.size foreach { i => builder.keyField(keyTypes(i), keyPositions(i)) }
-
- val ret = new ReduceOperator(builder) with OneInputKeyedScalaOperator[In, (In, Int)] {
- override val key: FieldSelector = generatedStub.keySelector
- override def getUDF = generatedStub.udf
- override def annotations = Annotations.getCombinable() +: Seq(Annotations.getConstantFieldsExcept(Array[Int]()))
- }
- new DataSet[(In, Int)](ret) with OneInputHintable[In, (In, Int)] {}
- }
-
- val result = c.Expr[DataSet[(In, Int)] with OneInputHintable[In, (In, Int)]](Block(List(udtIn, udtOut), contract.tree))
-
- return result
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/UnionOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/UnionOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/UnionOperator.scala
deleted file mode 100644
index 5926eef..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/UnionOperator.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.operators
-
-import language.experimental.macros
-
-import org.apache.flink.api.scala.UnionScalaOperator
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.api.scala.analysis.UDF2
-
-import org.apache.flink.api.common.operators.Union
-import org.apache.flink.types.Record
-
-object UnionOperator {
-
- def impl[In](firstInput: DataSet[In], secondInput: DataSet[In]): DataSet[In] = {
- val union = new Union[Record](firstInput.contract, secondInput.contract)
- with UnionScalaOperator[In] {
-
- private val inputUDT = firstInput.contract.getUDF.outputUDT
- private val udf: UDF2[In, In, In] = new UDF2(inputUDT, inputUDT, inputUDT)
-
- override def getUDF = udf
- }
- new DataSet(union)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/package.scala
deleted file mode 100644
index 5a20940..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/operators/package.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import scala.language.implicitConversions
-
-import scala.collection.TraversableOnce
-
-package object operators {
-
- implicit def traversableToIterator[T](i: TraversableOnce[T]): Iterator[T] = i.toIterator
- implicit def optionToIterator[T](opt: Option[T]): Iterator[T] = opt.iterator
- implicit def arrayToIterator[T](arr: Array[T]): Iterator[T] = arr.iterator
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
new file mode 100644
index 0000000..405158f
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api
+
+import _root_.scala.reflect.ClassTag
+import language.experimental.macros
+import org.apache.flink.types.TypeInformation
+import org.apache.flink.api.scala.typeutils.TypeUtils
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+
+package object scala {
+ // We have this here so that we always have generated TypeInformationS when
+ // using the Scala API
+ implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
+
+ // We need to wrap Java DataSet because we need the scala operations
+ private[flink] def wrap[R: ClassTag](set: JavaDataSet[R]) = new DataSet[R](set)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala
new file mode 100644
index 0000000..3d21f05
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
+import org.apache.flink.core.memory.MemorySegment
+import org.apache.flink.types.{KeyFieldOutOfBoundsException, NullKeyFieldException}
+;
+
+/**
+ * Comparator for Scala Tuples. Access is different from
+ * our Java Tuples so we have to treat them differently.
+ */
+class ScalaTupleComparator[T <: Product](
+ keys: Array[Int],
+ scalaComparators: Array[TypeComparator[_]],
+ scalaSerializers: Array[TypeSerializer[_]] )
+ extends TupleComparatorBase[T](keys, scalaComparators, scalaSerializers) {
+
+ private val extractedKeys = new Array[AnyRef](keys.length)
+
+ // We cannot use the Clone Constructor from Scala so we have to do it manually
+ def duplicate: TypeComparator[T] = {
+ // ensure that the serializers are available
+ instantiateDeserializationUtils()
+ val result = new ScalaTupleComparator[T](keyPositions, comparators, serializers)
+ result.privateDuplicate(this)
+ result
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Comparator Methods
+ // --------------------------------------------------------------------------------------------
+
+ def hash(value: T): Int = {
+ val comparator = comparators(0).asInstanceOf[TypeComparator[Any]]
+ var code: Int = comparator.hash(value.productElement(keyPositions(0)))
+ for (i <- 1 until keyPositions.length) {
+ try {
+ code *= TupleComparatorBase.HASH_SALT(i & 0x1F)
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+ code += comparator.hash(value.productElement(keyPositions(i)))
+ } catch {
+ case npex: NullPointerException =>
+ throw new NullKeyFieldException(keyPositions(i))
+ case iobex: IndexOutOfBoundsException =>
+ throw new KeyFieldOutOfBoundsException(keyPositions(i))
+ }
+ }
+ code
+ }
+
+ def setReference(toCompare: T) {
+ for (i <- 0 until keyPositions.length) {
+ try {
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+ comparator.setReference(toCompare.productElement(keyPositions(i)))
+ } catch {
+ case npex: NullPointerException =>
+ throw new NullKeyFieldException(keyPositions(i))
+ case iobex: IndexOutOfBoundsException =>
+ throw new KeyFieldOutOfBoundsException(keyPositions(i))
+ }
+ }
+ }
+
+ def equalToReference(candidate: T): Boolean = {
+ for (i <- 0 until keyPositions.length) {
+ try {
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+ if (!comparator.equalToReference(candidate.productElement(keyPositions(i)))) {
+ return false
+ }
+ } catch {
+ case npex: NullPointerException =>
+ throw new NullKeyFieldException(keyPositions(i))
+ case iobex: IndexOutOfBoundsException =>
+ throw new KeyFieldOutOfBoundsException(keyPositions(i))
+ }
+ }
+ true
+ }
+
+ def compare(first: T, second: T): Int = {
+ for (i <- 0 until keyPositions.length) {
+ try {
+ val keyPos: Int = keyPositions(i)
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+ val cmp: Int = comparator.compare(
+ first.productElement(keyPos),
+ second.productElement(keyPos))
+ if (cmp != 0) {
+ return cmp
+ }
+ } catch {
+ case npex: NullPointerException =>
+ throw new NullKeyFieldException(keyPositions(i))
+ case iobex: IndexOutOfBoundsException =>
+ throw new KeyFieldOutOfBoundsException(keyPositions(i))
+ }
+ }
+ 0
+ }
+
+ def putNormalizedKey(value: T, target: MemorySegment, offsetParam: Int, numBytesParam: Int) {
+ var numBytes = numBytesParam
+ var offset = offsetParam
+ var i: Int = 0
+ try {
+ while (i < numLeadingNormalizableKeys && numBytes > 0) {
+ {
+ var len: Int = normalizedKeyLengths(i)
+ len = if (numBytes >= len) len else numBytes
+ val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
+ comparator.putNormalizedKey(value.productElement(keyPositions(i)), target, offset, len)
+ numBytes -= len
+ offset += len
+ }
+ i += 1
+ }
+ } catch {
+ case npex: NullPointerException => throw new NullKeyFieldException(keyPositions(i))
+ }
+ }
+
+ def extractKeys(value: T) = {
+ for (i <- 0 until keyPositions.length ) {
+ extractedKeys(i) = value.productElement(keyPositions(i)).asInstanceOf[AnyRef]
+ }
+ extractedKeys
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala
new file mode 100644
index 0000000..a63bb35
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+;
+
+/**
+ * Serializer for Scala Tuples. Creation and access is different from
+ * our Java Tuples so we have to treat them differently.
+ */
+abstract class ScalaTupleSerializer[T <: Product](
+ tupleClass: Class[T],
+ scalaFieldSerializers: Array[TypeSerializer[_]])
+ extends TupleSerializerBase[T](tupleClass, scalaFieldSerializers) {
+
+ def createInstance: T = {
+ val fields: Array[AnyRef] = new Array(arity)
+ for (i <- 0 until arity) {
+ fields(i) = fieldSerializers(i).createInstance()
+ }
+ createInstance(fields)
+ }
+
+ def copy(from: T, reuse: T): T = {
+ val fields: Array[AnyRef] = new Array(arity)
+ for (i <- 0 until arity) {
+ fields(i) = from.productElement(i).asInstanceOf[AnyRef]
+ }
+ createInstance(fields)
+ }
+
+ def serialize(value: T, target: DataOutputView) {
+ for (i <- 0 until arity) {
+ val serializer = fieldSerializers(i).asInstanceOf[TypeSerializer[Any]]
+ serializer.serialize(value.productElement(i), target)
+ }
+ }
+
+ def deserialize(reuse: T, source: DataInputView): T = {
+ val fields: Array[AnyRef] = new Array(arity)
+ for (i <- 0 until arity) {
+ val field = reuse.productElement(i).asInstanceOf[AnyRef]
+ fields(i) = fieldSerializers(i).deserialize(field, source)
+ }
+ createInstance(fields)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala
new file mode 100644
index 0000000..069e03b
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, AtomicType, TupleTypeInfoBase}
+import org.apache.flink.types.TypeInformation
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+
+/**
+ * TypeInformation for Scala Tuples. Creation and access is different from
+ * our Java Tuples so we have to treat them differently.
+ */
+abstract class ScalaTupleTypeInfo[T <: Product](
+ tupleClass: Class[T],
+ fieldTypes: Seq[TypeInformation[_]])
+ extends TupleTypeInfoBase[T](tupleClass, fieldTypes: _*) {
+
+ def createComparator(logicalKeyFields: Array[Int], orders: Array[Boolean]): TypeComparator[T] = {
+ // sanity checks
+ if (logicalKeyFields == null || orders == null
+ || logicalKeyFields.length != orders.length || logicalKeyFields.length > types.length) {
+ throw new IllegalArgumentException
+ }
+
+ // No special handling of leading Key field as in JavaTupleComparator for now
+
+ // --- general case ---
+ var maxKey: Int = -1
+
+ for (key <- logicalKeyFields) {
+ maxKey = Math.max(key, maxKey)
+ }
+
+ if (maxKey >= types.length) {
+ throw new IllegalArgumentException("The key position " + maxKey + " is out of range for " +
+ "Tuple" + types.length)
+ }
+
+ // create the comparators for the individual fields
+ val fieldComparators: Array[TypeComparator[_]] = new Array(logicalKeyFields.length)
+
+ for (i <- 0 until logicalKeyFields.length) {
+ val keyPos = logicalKeyFields(i)
+ if (types(keyPos).isKeyType && types(keyPos).isInstanceOf[AtomicType[_]]) {
+ fieldComparators(i) = types(keyPos).asInstanceOf[AtomicType[_]].createComparator(orders(i))
+ } else {
+ throw new IllegalArgumentException(
+ "The field at position " + i + " (" + types(keyPos) + ") is no atomic key type.")
+ }
+ }
+
+ // create the serializers for the prefix up to highest key position
+ val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](maxKey + 1)
+
+ for (i <- 0 to maxKey) {
+ fieldSerializers(i) = types(i).createSerializer
+ }
+
+ new ScalaTupleComparator[T](logicalKeyFields, fieldComparators, fieldSerializers)
+ }
+
+ override def toString = "Scala " + super.toString
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala
new file mode 100644
index 0000000..5901e48
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TypeUtils.scala
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import scala.reflect.macros.Context
+import org.apache.flink.api.scala.codegen.MacroContextHolder
+import org.apache.flink.types.TypeInformation
+
+private[flink] object TypeUtils {
+
+ def createTypeInfo[T: c.WeakTypeTag](c: Context): c.Expr[TypeInformation[T]] = {
+ val slave = MacroContextHolder.newMacroHelper(c)
+ slave.mkTypeInfo[T]
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
new file mode 100644
index 0000000..f8cbb62
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.operators.Keys
+import org.apache.flink.api.java.operators.Keys.FieldPositionKeys
+import org.apache.flink.types.TypeInformation
+
+/**
+ * This is for dealing with operations that require keys and use a fluent interface (join, and
+ * coGroup for now). For each operation we need a subclass that implements `finish` to
+ * create the actual operation using the provided keys.
+ *
+ * This way, we have a central point where all the key-providing happens and don't need to change
+ * the specific operations if the supported key types change.
+ *
+ * We use the type parameter `R` to specify the type of the resulting operation. For join
+ * this would be `JoinDataSet[T, O]` and for coGroup it would be `CoGroupDataSet[T, O]`. This
+ * way the user gets the correct type for the finished operation.
+ *
+ * @tparam T Type of the left input [[DataSet]].
+ * @tparam O Type of the right input [[DataSet]].
+ * @tparam R The type of the resulting Operation.
+ */
+private[flink] abstract class UnfinishedKeyPairOperation[T, O, R](
+ private[flink] val leftSet: JavaDataSet[T],
+ private[flink] val rightSet: JavaDataSet[O]) {
+
+ private[flink] def finish(leftKey: Keys[T], rightKey: Keys[O]): R
+
+ /**
+ * Specify the key fields for the left side of the key based operation. This returns
+ * a [[HalfUnfinishedKeyPairOperation]] on which `isEqualTo` must be called to specify the
+ * key for the right side. The result after specifying the right side key is the finished
+ * operation.
+ *
+ * This only works on a Tuple [[DataSet]].
+ */
+ def where(leftKeys: Int*) = {
+ val leftKey = new FieldPositionKeys[T](leftKeys.toArray, leftSet.getType)
+ new HalfUnfinishedKeyPairOperation[T, O, R](this, leftKey)
+ }
+
+ /**
+ * Specify the key selector function for the left side of the key based operation. This returns
+ * a [[HalfUnfinishedKeyPairOperation]] on which `isEqualTo` must be called to specify the
+ * key for the right side. The result after specifying the right side key is the finished
+ * operation.
+ */
+ def where[K: TypeInformation](fun: (T) => K) = {
+ val keyType = implicitly[TypeInformation[K]]
+ val keyExtractor = new KeySelector[T, K] {
+ def getKey(in: T) = fun(in)
+ }
+ val leftKey = new Keys.SelectorFunctionKeys[T, K](keyExtractor, leftSet.getType, keyType)
+ new HalfUnfinishedKeyPairOperation[T, O, R](this, leftKey)
+ }
+}
+
+private[flink] class HalfUnfinishedKeyPairOperation[T, O, R](
+ unfinished: UnfinishedKeyPairOperation[T, O, R], leftKey: Keys[T]) {
+
+ /**
+ * Specify the key fields for the right side of the key based operation. This returns
+ * the finished operation.
+ *
+ * This only works on a Tuple [[DataSet]].
+ */
+ def equalTo(rightKeys: Int*): R = {
+ val rightKey = new FieldPositionKeys[O](rightKeys.toArray, unfinished.rightSet.getType)
+ if (!leftKey.areCompatibale(rightKey)) {
+ throw new InvalidProgramException("The types of the key fields do not match. Left: " +
+ leftKey + " Right: " + rightKey)
+ }
+ unfinished.finish(leftKey, rightKey)
+ }
+
+ /**
+ * Specify the key selector function for the right side of the key based operation. This returns
+ * the finished operation.
+ */
+ def equalTo[K: TypeInformation](fun: (O) => K) = {
+ val keyType = implicitly[TypeInformation[K]]
+ val keyExtractor = new KeySelector[O, K] {
+ def getKey(in: O) = fun(in)
+ }
+ val rightKey =
+ new Keys.SelectorFunctionKeys[O, K](keyExtractor, unfinished.rightSet.getType, keyType)
+ if (!leftKey.areCompatibale(rightKey)) {
+ throw new InvalidProgramException("The types of the key fields do not match. Left: " +
+ leftKey + " Right: " + rightKey)
+ }
+ unfinished.finish(leftKey, rightKey)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/CollectionDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/CollectionDataSourceTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/CollectionDataSourceTest.scala
deleted file mode 100644
index 19feb3a..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/CollectionDataSourceTest.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala
-
-import org.apache.flink.api.java.record.operators.{CollectionDataSource => JCollectionDataSource}
-import org.apache.flink.types.{DoubleValue, Record}
-import org.scalatest.junit.AssertionsForJUnit
-import org.junit.Assert._
-import org.junit.Test
-
-
-class CollectionDataSourceTest extends AssertionsForJUnit {
- @Test def testScalaCollectionInput() {
- val expected = List(1.0, 2.0, 3.0)
- val datasource = org.apache.flink.api.scala.CollectionDataSource(expected)
-
- val javaCDS = datasource.contract.asInstanceOf[JCollectionDataSource]
-
- val inputFormat = javaCDS.getFormatWrapper.getUserCodeObject()
- val splits = inputFormat.createInputSplits(1)
- inputFormat.open(splits(0))
-
- val record = new Record()
- var result = List[Double]()
-
- while(!inputFormat.reachedEnd()){
- inputFormat.nextRecord(record)
- assertTrue(record.getNumFields == 1)
- val value = record.getField[DoubleValue](0, classOf[DoubleValue])
- result = value.getValue :: result
- }
-
- assertEquals(expected, result.reverse)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
index b4927b4..d5b0d24 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -1,192 +1,192 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import org.junit.Test
-import org.apache.flink.api.common.InvalidProgramException
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.operators._
-import org.scalatest.junit.AssertionsForJUnit
-
-// Verify that the sanity checking in delta iterations works. We just
-// have a dummy job that is not meant to be executed. Only verify that
-// the join/coGroup inside the iteration is checked.
-class DeltaIterationSanityCheckTest extends Serializable {
-
- @Test
- def testCorrectJoinWithSolution1 {
- val solutionInput = CollectionDataSource(Array((1, "1")))
- val worksetInput = CollectionDataSource(Array((2, "2")))
-
- def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
- val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
- (result, ws)
- }
- val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
- val output = iteration.write("/dummy", CsvOutputFormat())
-
- val plan = new ScalaPlan(Seq(output))
- }
-
- @Test
- def testCorrectJoinWithSolution2 {
- val solutionInput = CollectionDataSource(Array((1, "1")))
- val worksetInput = CollectionDataSource(Array((2, "2")))
-
- def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
- val result = ws join s where {_._1} isEqualTo {_._1} map { (l, r) => l }
- (result, ws)
- }
- val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
- val output = iteration.write("/dummy", CsvOutputFormat())
-
- val plan = new ScalaPlan(Seq(output))
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectJoinWithSolution1 {
- val solutionInput = CollectionDataSource(Array((1, "1")))
- val worksetInput = CollectionDataSource(Array((2, "2")))
-
- def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
- val result = s join ws where {_._2} isEqualTo {_._2} map { (l, r) => l }
- (result, ws)
- }
- val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
- val output = iteration.write("/dummy", CsvOutputFormat())
-
- val plan = new ScalaPlan(Seq(output))
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectJoinWithSolution2 {
- val solutionInput = CollectionDataSource(Array((1, "1")))
- val worksetInput = CollectionDataSource(Array((2, "2")))
-
- def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
- val result = ws join s where {_._2} isEqualTo {_._2} map { (l, r) => l }
- (result, ws)
- }
- val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
- val output = iteration.write("/dummy", CsvOutputFormat())
-
- val plan = new ScalaPlan(Seq(output))
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectJoinWithSolution3 {
- val solutionInput = CollectionDataSource(Array((1, "1")))
- val worksetInput = CollectionDataSource(Array((2, "2")))
-
- def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
- val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
- (result, ws)
- }
- val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
-
- val output = iteration.write("/dummy", CsvOutputFormat())
-
- val plan = new ScalaPlan(Seq(output))
- }
-
- @Test
- def testCorrectCoGroupWithSolution1 {
- val solutionInput = CollectionDataSource(Array((1, "1")))
- val worksetInput = CollectionDataSource(Array((2, "2")))
-
- def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
- val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
- (result, ws)
- }
- val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
- val output = iteration.write("/dummy", CsvOutputFormat())
-
- val plan = new ScalaPlan(Seq(output))
- }
-
- @Test
- def testCorrectCoGroupWithSolution2 {
- val solutionInput = CollectionDataSource(Array((1, "1")))
- val worksetInput = CollectionDataSource(Array((2, "2")))
-
- def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
- val result = ws cogroup s where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
- (result, ws)
- }
- val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
- val output = iteration.write("/dummy", CsvOutputFormat())
-
- val plan = new ScalaPlan(Seq(output))
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectCoGroupWithSolution1 {
- val solutionInput = CollectionDataSource(Array((1, "1")))
- val worksetInput = CollectionDataSource(Array((2, "2")))
-
- def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
- val result = s cogroup ws where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
- (result, ws)
- }
- val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
- val output = iteration.write("/dummy", CsvOutputFormat())
-
- val plan = new ScalaPlan(Seq(output))
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectCoGroupWithSolution2 {
- val solutionInput = CollectionDataSource(Array((1, "1")))
- val worksetInput = CollectionDataSource(Array((2, "2")))
-
- def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
- val result = ws cogroup s where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
- (result, ws)
- }
- val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
-
- val output = iteration.write("/dummy", CsvOutputFormat())
-
- val plan = new ScalaPlan(Seq(output))
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectCoGroupWithSolution3 {
- val solutionInput = CollectionDataSource(Array((1, "1")))
- val worksetInput = CollectionDataSource(Array((2, "2")))
-
- def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
- val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
- (result, ws)
- }
- val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
-
- val output = iteration.write("/dummy", CsvOutputFormat())
-
- val plan = new ScalaPlan(Seq(output))
- }
-}
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.api.scala
+//
+//import org.junit.Test
+//import org.apache.flink.api.common.InvalidProgramException
+//
+//import org.apache.flink.api.scala._
+//import org.apache.flink.api.scala.operators._
+//import org.scalatest.junit.AssertionsForJUnit
+//
+//// Verify that the sanity checking in delta iterations works. We just
+//// have a dummy job that is not meant to be executed. Only verify that
+//// the join/coGroup inside the iteration is checked.
+//class DeltaIterationSanityCheckTest extends Serializable {
+//
+// @Test
+// def testCorrectJoinWithSolution1 {
+// val solutionInput = CollectionDataSource(Array((1, "1")))
+// val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+// val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
+// (result, ws)
+// }
+// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+// val output = iteration.write("/dummy", CsvOutputFormat())
+//
+// val plan = new ScalaPlan(Seq(output))
+// }
+//
+// @Test
+// def testCorrectJoinWithSolution2 {
+// val solutionInput = CollectionDataSource(Array((1, "1")))
+// val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+// val result = ws join s where {_._1} isEqualTo {_._1} map { (l, r) => l }
+// (result, ws)
+// }
+// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+// val output = iteration.write("/dummy", CsvOutputFormat())
+//
+// val plan = new ScalaPlan(Seq(output))
+// }
+//
+// @Test(expected = classOf[InvalidProgramException])
+// def testIncorrectJoinWithSolution1 {
+// val solutionInput = CollectionDataSource(Array((1, "1")))
+// val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+// val result = s join ws where {_._2} isEqualTo {_._2} map { (l, r) => l }
+// (result, ws)
+// }
+// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+// val output = iteration.write("/dummy", CsvOutputFormat())
+//
+// val plan = new ScalaPlan(Seq(output))
+// }
+//
+// @Test(expected = classOf[InvalidProgramException])
+// def testIncorrectJoinWithSolution2 {
+// val solutionInput = CollectionDataSource(Array((1, "1")))
+// val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+// val result = ws join s where {_._2} isEqualTo {_._2} map { (l, r) => l }
+// (result, ws)
+// }
+// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+// val output = iteration.write("/dummy", CsvOutputFormat())
+//
+// val plan = new ScalaPlan(Seq(output))
+// }
+//
+// @Test(expected = classOf[InvalidProgramException])
+// def testIncorrectJoinWithSolution3 {
+// val solutionInput = CollectionDataSource(Array((1, "1")))
+// val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+// val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
+// (result, ws)
+// }
+// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
+//
+// val output = iteration.write("/dummy", CsvOutputFormat())
+//
+// val plan = new ScalaPlan(Seq(output))
+// }
+//
+// @Test
+// def testCorrectCoGroupWithSolution1 {
+// val solutionInput = CollectionDataSource(Array((1, "1")))
+// val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+// val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+// (result, ws)
+// }
+// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+// val output = iteration.write("/dummy", CsvOutputFormat())
+//
+// val plan = new ScalaPlan(Seq(output))
+// }
+//
+// @Test
+// def testCorrectCoGroupWithSolution2 {
+// val solutionInput = CollectionDataSource(Array((1, "1")))
+// val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+// val result = ws cogroup s where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+// (result, ws)
+// }
+// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+// val output = iteration.write("/dummy", CsvOutputFormat())
+//
+// val plan = new ScalaPlan(Seq(output))
+// }
+//
+// @Test(expected = classOf[InvalidProgramException])
+// def testIncorrectCoGroupWithSolution1 {
+// val solutionInput = CollectionDataSource(Array((1, "1")))
+// val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+// val result = s cogroup ws where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
+// (result, ws)
+// }
+// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+// val output = iteration.write("/dummy", CsvOutputFormat())
+//
+// val plan = new ScalaPlan(Seq(output))
+// }
+//
+// @Test(expected = classOf[InvalidProgramException])
+// def testIncorrectCoGroupWithSolution2 {
+// val solutionInput = CollectionDataSource(Array((1, "1")))
+// val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+// val result = ws cogroup s where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
+// (result, ws)
+// }
+// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+//
+// val output = iteration.write("/dummy", CsvOutputFormat())
+//
+// val plan = new ScalaPlan(Seq(output))
+// }
+//
+// @Test(expected = classOf[InvalidProgramException])
+// def testIncorrectCoGroupWithSolution3 {
+// val solutionInput = CollectionDataSource(Array((1, "1")))
+// val worksetInput = CollectionDataSource(Array((2, "2")))
+//
+// def step(s: DataSetOLD[(Int, String)], ws: DataSetOLD[(Int, String)]) = {
+// val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+// (result, ws)
+// }
+// val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
+//
+// val output = iteration.write("/dummy", CsvOutputFormat())
+//
+// val plan = new ScalaPlan(Seq(output))
+// }
+//}