You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:49 UTC
[07/60] Rewrite the Scala API as (somewhat) thin Layer on Java API
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/FieldSelector.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/FieldSelector.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/FieldSelector.scala
deleted file mode 100644
index d201585..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/FieldSelector.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import FieldSet.toSeq
-
-/**
- * Instances of this class are typically created by the field selector macros fieldSelectorImpl
- * and keySelectorImpl in {@link FieldSelectorMacros}.
- *
- * In addition to the language restrictions applied to the lambda expression, the selected fields
- * must also be top-level. Nested fields (such as a list element or an inner instance of a
- * recursive type) are not allowed.
- *
- * @param selection The selected fields
- */
-class FieldSelector(udt: UDT[_], selection: List[Int]) extends Serializable {
-
- val inputFields = FieldSet.newInputSet(udt.numFields)
- val selectionIndices = udt.getSelectionIndices(selection)
- val selectedFields = inputFields.select(selectionIndices)
-
- for (field <- inputFields.diff(selectedFields))
- field.isUsed = false
-
- def copy() = new FieldSelector(udt, selection)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaFields.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaFields.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaFields.scala
deleted file mode 100644
index 80cd455..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaFields.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-abstract sealed class Field extends Serializable {
- val localPos: Int
- val globalPos: GlobalPos
- var isUsed: Boolean = true
-}
-
-case class InputField(val localPos: Int, val globalPos: GlobalPos = new GlobalPos) extends Field
-case class OutputField(val localPos: Int, val globalPos: GlobalPos = new GlobalPos) extends Field
-
-class FieldSet[+FieldType <: Field] private (private val fields: Seq[FieldType]) extends Serializable {
-
- private var globalized: Boolean = false
- def isGlobalized = globalized
-
- def setGlobalized(): Unit = {
- assert(!globalized, "Field set has already been globalized")
- globalized = true
- }
-
- def apply(localPos: Int): FieldType = {
- fields.find(_.localPos == localPos).get
- }
-
- def select(selection: Seq[Int]): FieldSet[FieldType] = {
- val outer = this
- new FieldSet[FieldType](selection map apply) {
- override def setGlobalized() = outer.setGlobalized()
- override def isGlobalized = outer.isGlobalized
- }
- }
-
- def toSerializerIndexArray: Array[Int] = fields map {
- case field if field.isUsed => field.localPos
-// case field if field.isUsed => field.globalPos.getValue
- case _ => -1
- } toArray
-
- def toIndexSet: Set[Int] = fields.map(_.localPos).toSet
-// def toIndexSet: Set[Int] = fields.filter(_.isUsed).map(_.globalPos.getValue).toSet
-
- def toIndexArray: Array[Int] = fields.map { _.localPos }.toArray
-
-// def mapToArray[T: ClassTag](fun: FieldType => T): Array[T] = {
-// (fields map fun) toArray
-// }
-}
-
-object FieldSet {
-
- def newInputSet(numFields: Int): FieldSet[InputField] = new FieldSet((0 until numFields) map { InputField(_) })
- def newOutputSet(numFields: Int): FieldSet[OutputField] = new FieldSet((0 until numFields) map { OutputField(_) })
-
- def newInputSet[T](udt: UDT[T]): FieldSet[InputField] = newInputSet(udt.numFields)
- def newOutputSet[T](udt: UDT[T]): FieldSet[OutputField] = newOutputSet(udt.numFields)
-
- implicit def toSeq[FieldType <: Field](fieldSet: FieldSet[FieldType]): Seq[FieldType] = fieldSet.fields
-}
-
-class GlobalPos extends Serializable {
-
- private var pos: Either[Int, GlobalPos] = null
-
- def getValue: Int = pos match {
- case null => -1
- case Left(index) => index
- case Right(target) => target.getValue
- }
-
- def getIndex: Option[Int] = pos match {
- case null | Right(_) => None
- case Left(index) => Some(index)
- }
-
- def getReference: Option[GlobalPos] = pos match {
- case null | Left(_) => None
- case Right(target) => Some(target)
- }
-
- def resolve: GlobalPos = pos match {
- case null => this
- case Left(_) => this
- case Right(target) => target.resolve
- }
-
- def isUnknown = pos == null
- def isIndex = (pos != null) && pos.isLeft
- def isReference = (pos != null) && pos.isRight
-
- def setIndex(index: Int) = {
- assert(pos == null || pos.isLeft, "Cannot convert a position reference to an index")
- pos = Left(index)
- }
-
- def setReference(target: GlobalPos) = {
-// assert(pos == null, "Cannot overwrite a known position with a reference")
- pos = Right(target)
- }
-}
-
-object GlobalPos {
-
- object Unknown {
- def unapply(globalPos: GlobalPos): Boolean = globalPos.isUnknown
- }
-
- object Index {
- def unapply(globalPos: GlobalPos): Option[Int] = globalPos.getIndex
- }
-
- object Reference {
- def unapply(globalPos: GlobalPos): Option[GlobalPos] = globalPos.getReference
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaGenerator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaGenerator.scala
deleted file mode 100644
index 609d41a..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaGenerator.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import java.util.{List => JList}
-
-import scala.Some
-
-import org.apache.flink.api.scala._
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.java.record.operators._
-import org.apache.flink.api.common.operators.base.{BulkIterationBase => BulkIteration, DeltaIterationBase => DeltaIteration, GenericDataSourceBase, MapOperatorBase}
-import org.apache.flink.api.common.operators.Union
-import org.apache.flink.types.Record
-
-
-class GlobalSchemaGenerator {
-
- def initGlobalSchema(sinks: Seq[Operator[Record] with ScalaOperator[_, _]]): Unit = {
- // don't do anything, we don't need global positions if we don't do reordering of operators
- // FieldSet.toSerializerIndexArray returns local positions and ignores global positions
- // sinks.foldLeft(0) { (freePos, contract) => globalizeContract(contract, Seq(), Map(), None, freePos) }
- }
-
- /**
- * Computes disjoint write sets for a contract and its inputs.
- *
- * @param contract The contract to globalize
- * @param parentInputs Input fields which should be bound to the contract's outputs
- * @param proxies Provides contracts for iteration placeholders
- * @param fixedOutputs Specifies required positions for the contract's output fields, or None to allocate new positions
- * @param freePos The current first available position in the global schema
- * @return The new first available position in the global schema
- */
- private def globalizeContract(contract: Operator[Record], parentInputs: Seq[FieldSet[InputField]], proxies: Map[Operator[Record], Operator[Record] with ScalaOperator[_, _]], fixedOutputs: Option[FieldSet[Field]], freePos: Int): Int = {
-
- val contract4s = proxies.getOrElse(contract, contract.asInstanceOf[Operator[Record] with ScalaOperator[_, _]])
-
- parentInputs.foreach(contract4s.getUDF.attachOutputsToInputs)
-
- contract4s.getUDF.outputFields.isGlobalized match {
-
- case true => freePos
-
- case false => {
-
- val freePos1 = globalizeContract(contract4s, proxies, fixedOutputs, freePos)
-
- contract4s.persistConfiguration(None)
-
- freePos1
- }
- }
- }
-
- private def globalizeContract(contract: Operator[_] with ScalaOperator[_, _], proxies: Map[Operator[Record], Operator[Record] with ScalaOperator[_, _]], fixedOutputs: Option[FieldSet[Field]], freePos: Int): Int = {
-
- contract match {
-
- case contract : FileDataSink with ScalaOutputOperator[_] => {
- contract.getUDF.outputFields.setGlobalized()
- globalizeContract(contract.getInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.asInstanceOf[UDF1[_,_]].inputFields), proxies, None, freePos)
- }
-
- case contract: GenericDataSourceBase[_, _] with ScalaOperator[_, _] => {
- contract.getUDF.setOutputGlobalIndexes(freePos, fixedOutputs)
- }
-
- case contract : BulkIteration[_] with BulkIterationScalaOperator[_] => {
- val s0 = contract.getInput().asInstanceOf[Operator[Record]]
-
- val s0contract = proxies.getOrElse(s0, s0.asInstanceOf[Operator[Record] with ScalaOperator[_, Record]])
- val newProxies = proxies + (contract.getPartialSolution().asInstanceOf[Operator[Record]] -> s0contract)
-
- val freePos1 = globalizeContract(s0, Seq(), proxies, fixedOutputs, freePos)
- val freePos2 = globalizeContract(contract.getNextPartialSolution().asInstanceOf[Operator[Record]], Seq(), newProxies, Some(s0contract.getUDF.outputFields), freePos1)
- val freePos3 = Option(contract.getTerminationCriterion().asInstanceOf[Operator[Record]]) map { globalizeContract(_, Seq(), newProxies, None, freePos2) } getOrElse freePos2
-
- contract.getUDF.assignOutputGlobalIndexes(s0contract.getUDF.outputFields)
-
- freePos3
- }
-
- case contract : DeltaIteration[_, _] with DeltaIterationScalaOperator[_] => {
-// case contract @ WorksetIterate4sContract(s0, ws0, deltaS, newWS, placeholderS, placeholderWS) => {
- val s0 = contract.getInitialSolutionSet().asInstanceOf[Operator[Record]]
- val ws0 = contract.getInitialWorkset().asInstanceOf[Operator[Record]]
- val deltaS = contract.getSolutionSetDelta.asInstanceOf[Operator[Record]]
- val newWS = contract.getNextWorkset.asInstanceOf[Operator[Record]]
-
- val s0contract = proxies.getOrElse(s0, s0.asInstanceOf[Operator[Record] with ScalaOperator[_, _]])
- val ws0contract = proxies.getOrElse(ws0, ws0.asInstanceOf[Operator[Record] with ScalaOperator[_, _]])
- val newProxies = proxies + (contract.getSolutionSetDelta.asInstanceOf[Operator[Record]] -> s0contract) + (contract.getNextWorkset.asInstanceOf[Operator[Record]] -> ws0contract)
-
- val freePos1 = globalizeContract(s0, Seq(contract.key.inputFields), proxies, fixedOutputs, freePos)
- val freePos2 = globalizeContract(ws0, Seq(), proxies, None, freePos1)
- val freePos3 = globalizeContract(deltaS, Seq(), newProxies, Some(s0contract.getUDF.outputFields), freePos2)
- val freePos4 = globalizeContract(newWS, Seq(), newProxies, Some(ws0contract.getUDF.outputFields), freePos3)
-
- contract.getUDF.assignOutputGlobalIndexes(s0contract.getUDF.outputFields)
-
- freePos4
- }
-
- case contract : CoGroupOperator with TwoInputKeyedScalaOperator[_, _, _] => {
-
- val freePos1 = globalizeContract(contract.getFirstInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.leftInputFields, contract.leftKey.inputFields), proxies, None, freePos)
- val freePos2 = globalizeContract(contract.getSecondInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.rightInputFields, contract.rightKey.inputFields), proxies, None, freePos1)
-
- contract.getUDF.setOutputGlobalIndexes(freePos2, fixedOutputs)
- }
-
- case contract: CrossOperator with TwoInputScalaOperator[_, _, _] => {
-
- val freePos1 = globalizeContract(contract.getFirstInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.leftInputFields), proxies, None, freePos)
- val freePos2 = globalizeContract(contract.getSecondInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.rightInputFields), proxies, None, freePos1)
-
- contract.getUDF.setOutputGlobalIndexes(freePos2, fixedOutputs)
- }
-
- case contract : JoinOperator with TwoInputKeyedScalaOperator[_, _, _] => {
-
- val freePos1 = globalizeContract(contract.getFirstInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.leftInputFields, contract.leftKey.inputFields), proxies, None, freePos)
- val freePos2 = globalizeContract(contract.getSecondInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.rightInputFields, contract.rightKey.inputFields), proxies, None, freePos1)
-
- contract.getUDF.setOutputGlobalIndexes(freePos2, fixedOutputs)
- }
-
- case contract : MapOperatorBase[_, _, _] with OneInputScalaOperator[_, _] => {
-
- val freePos1 = globalizeContract(contract.getInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.inputFields), proxies, None, freePos)
-
- contract.getUDF.setOutputGlobalIndexes(freePos1, fixedOutputs)
- }
-
- case contract : ReduceOperator with OneInputKeyedScalaOperator[_, _] => {
-
- val freePos1 = globalizeContract(contract.getInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.inputFields, contract.key.inputFields), proxies, None, freePos)
-
- contract.getUDF.setOutputGlobalIndexes(freePos1, fixedOutputs)
- }
-
- // for key-less (global) reducers
- case contract : ReduceOperator with OneInputScalaOperator[_, _] => {
-
- val freePos1 = globalizeContract(contract.getInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.inputFields), proxies, None, freePos)
-
- contract.getUDF.setOutputGlobalIndexes(freePos1, fixedOutputs)
- }
-
- case contract : Union[_] with UnionScalaOperator[_] => {
-
- val freePos1 = globalizeContract(contract.getFirstInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.leftInputFields), proxies, fixedOutputs, freePos)
- val freePos2 = globalizeContract(contract.getSecondInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.rightInputFields), proxies, fixedOutputs, freePos1)
-
- contract.getUDF.setOutputGlobalIndexes(freePos2, fixedOutputs)
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaPrinter.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaPrinter.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaPrinter.scala
deleted file mode 100644
index 2d758ce..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaPrinter.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import org.apache.commons.logging.{LogFactory, Log}
-
-import scala.collection.JavaConversions.collectionAsScalaIterable
-import scala.Array.canBuildFrom
-
-import org.apache.flink.api.common.Plan
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.common.operators.DualInputOperator
-import org.apache.flink.api.common.operators.SingleInputOperator
-import org.apache.flink.api.common.operators.base.{BulkIterationBase => BulkIteration, DeltaIterationBase => DeltaIteration}
-import org.apache.flink.api.java.record.operators.GenericDataSink
-import org.apache.flink.api.common.operators.base.{BulkIterationBase => BulkIteration}
-import org.apache.flink.api.common.operators.base.{DeltaIterationBase => DeltaIteration}
-
-import Extractors.DataSourceNode
-import Extractors.DataSinkNode
-import Extractors.DeltaIterationNode
-import Extractors.JoinNode
-import Extractors.MapNode
-import Extractors.ReduceNode
-import Extractors.UnionNode
-
-object GlobalSchemaPrinter {
-
- import Extractors._
-
- private final val LOG: Log = LogFactory.getLog(classOf[GlobalSchemaGenerator])
-
- def printSchema(plan: Plan): Unit = {
-
- LOG.debug("### " + plan.getJobName + " ###")
- plan.getDataSinks.foldLeft(Set[Operator[_]]())(printSchema)
- LOG.debug("####" + ("#" * plan.getJobName.length) + "####")
- }
-
- private def printSchema(visited: Set[Operator[_]], node: Operator[_]): Set[Operator[_]] = {
-
- visited.contains(node) match {
-
- case true => visited
-
- case false => {
-
- val children = node match {
- case bi: BulkIteration[_] => List(bi.getInput()) :+ bi.getNextPartialSolution()
- case wi: DeltaIteration[_, _] => List(wi.getInitialSolutionSet()) :+ wi.getInitialWorkset() :+ wi.getSolutionSetDelta() :+ wi.getNextWorkset()
- case si : SingleInputOperator[_, _, _] => List(si.getInput())
- case di : DualInputOperator[_, _, _, _] => List(di.getFirstInput()) :+ di.getSecondInput()
- case gds : GenericDataSink => List(gds.getInput())
- case _ => List()
- }
- val newVisited = children.foldLeft(visited + node)(printSchema)
-
- node match {
-
- case _ : BulkIteration.PartialSolutionPlaceHolder[_] =>
- case _ : DeltaIteration.SolutionSetPlaceHolder[_] =>
- case _ : DeltaIteration.WorksetPlaceHolder[_] =>
-
- case DataSinkNode(udf, input) => {
- printInfo(node, "Sink",
- Seq(),
- Seq(("", udf.inputFields)),
- Seq(("", udf.getForwardIndexArrayFrom)),
- Seq(("", udf.getDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case DataSourceNode(udf) => {
- printInfo(node, "Source",
- Seq(),
- Seq(),
- Seq(),
- Seq(),
- udf.outputFields
- )
- }
-
- case CoGroupNode(udf, leftKey, rightKey, leftInput, rightInput) => {
- printInfo(node, "CoGroup",
- Seq(("L", leftKey), ("R", rightKey)),
- Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
- Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
- Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case CrossNode(udf, leftInput, rightInput) => {
- printInfo(node, "Cross",
- Seq(),
- Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
- Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
- Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case JoinNode(udf, leftKey, rightKey, leftInput, rightInput) => {
- printInfo(node, "Join",
- Seq(("L", leftKey), ("R", rightKey)),
- Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
- Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
- Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case MapNode(udf, input) => {
- printInfo(node, "Map",
- Seq(),
- Seq(("", udf.inputFields)),
- Seq(("", udf.getForwardIndexArrayFrom)),
- Seq(("", udf.getDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case UnionNode(udf, leftInput, rightInput) => {
- printInfo(node, "Union",
- Seq(),
- Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
- Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
- Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case ReduceNode(udf, key, input) => {
-
-// val contract = node.asInstanceOf[Reduce4sContract[_, _, _]]
-// contract.userCombineCode map { _ =>
-// printInfo(node, "Combine",
-// Seq(("", key)),
-// Seq(("", udf.inputFields)),
-// Seq(("", contract.combineForwardSet.toArray)),
-// Seq(("", contract.combineDiscardSet.toArray)),
-// udf.inputFields
-// )
-// }
-
- printInfo(node, "Reduce",
- Seq(("", key)),
- Seq(("", udf.inputFields)),
- Seq(("", udf.getForwardIndexArrayFrom)),
- Seq(("", udf.getDiscardIndexArray)),
- udf.outputFields
- )
- }
- case DeltaIterationNode(udf, key, input1, input2) => {
-
- printInfo(node, "WorksetIterate",
- Seq(("", key)),
- Seq(),
- Seq(),
- Seq(),
- udf.outputFields)
- }
-
- case BulkIterationNode(udf, input1) => {
-
- printInfo(node, "BulkIterate",
- Seq(),
- Seq(),
- Seq(),
- Seq(),
- udf.outputFields)
- }
- }
-
- newVisited
- }
- }
- }
-
- private def printInfo(node: Operator[_], kind: String, keys: Seq[(String, FieldSelector)], reads: Seq[(String, FieldSet[_])], forwards: Seq[(String, Array[Int])], discards: Seq[(String, Array[Int])], writes: FieldSet[_]): Unit = {
-
- def indexesToStrings(pre: String, indexes: Array[Int]) = indexes map {
- case -1 => "_"
- case i => pre + i
- }
-
- val formatString = "%s (%s): K{%s}: R[%s] => F[%s] - D[%s] + W[%s]"
-
- val name = node.getName
-
- val sKeys = keys flatMap { case (pre, value) => value.selectedFields.toSerializerIndexArray.map(pre + _) } mkString ", "
- val sReads = reads flatMap { case (pre, value) => indexesToStrings(pre, value.toSerializerIndexArray) } mkString ", "
- val sForwards = forwards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", "
- val sDiscards = discards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", "
- val sWrites = indexesToStrings("", writes.toSerializerIndexArray) mkString ", "
-
- LOG.debug(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedFunction.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedFunction.scala
deleted file mode 100644
index 2e9c203..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedFunction.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import scala.collection.mutable
-import scala.util.Either.MergeableEither
-
-
-abstract class UDF[R] extends Serializable {
-
- val outputUDT: UDT[R]
- val outputFields = FieldSet.newOutputSet(outputUDT)
-
- def getOutputSerializer = outputUDT.getSerializer(outputFields.toSerializerIndexArray)
-
- def getOutputLength = {
- val indexes = outputFields.toIndexSet
- if (indexes.isEmpty) {
- 0
- } else {
- indexes.max + 1
- }
- }
-
- def allocateOutputGlobalIndexes(startPos: Int): Int = {
-
- outputFields.setGlobalized()
-
- outputFields.map(_.globalPos).foldLeft(startPos) {
- case (i, gPos @ GlobalPos.Unknown()) => gPos.setIndex(i); i + 1
- case (i, _) => i
- }
- startPos
- }
-
- def assignOutputGlobalIndexes(sameAs: FieldSet[Field]): Unit = {
-
- outputFields.setGlobalized()
-
- outputFields.foreach {
- case OutputField(localPos, globalPos) => globalPos.setReference(sameAs(localPos).globalPos)
- }
- }
-
- def setOutputGlobalIndexes(startPos: Int, sameAs: Option[FieldSet[Field]]): Int = sameAs match {
- case None => allocateOutputGlobalIndexes(startPos)
- case Some(sameAs) => assignOutputGlobalIndexes(sameAs); startPos
- }
-
- def attachOutputsToInputs(inputFields: FieldSet[InputField]): Unit = {
-
- inputFields.setGlobalized()
-
- inputFields.foreach {
- case InputField(localPos, globalPos) => globalPos.setReference(outputFields(localPos).globalPos)
- }
- }
-
- protected def markFieldCopied(inputGlobalPos: GlobalPos, outputLocalPos: Int): Unit = {
- val outputField = outputFields(outputLocalPos)
- outputField.globalPos.setReference(inputGlobalPos)
- outputField.isUsed = false
- }
-}
-
-class UDF0[R](val outputUDT: UDT[R]) extends UDF[R]
-
-class UDF1[T, R](val inputUDT: UDT[T], val outputUDT: UDT[R]) extends UDF[R] {
-
- val inputFields = FieldSet.newInputSet(inputUDT)
- val forwardSet = mutable.Set[(InputField, OutputField)]()
- val discardSet = mutable.Set[GlobalPos]()
-
- def getInputDeserializer = inputUDT.getSerializer(inputFields.toSerializerIndexArray)
- def getForwardIndexSetFrom = forwardSet.map(_._1.localPos)
- def getForwardIndexSetTo = forwardSet.map(_._2.localPos)
- def getForwardIndexArrayFrom = getForwardIndexSetFrom.toArray
- def getForwardIndexArrayTo = getForwardIndexSetTo.toArray
- def getDiscardIndexArray = discardSet.map(_.getValue).toArray
-
- override def getOutputLength = {
- val forwardMax = if (forwardSet.isEmpty) -1 else forwardSet.map(_._2.localPos).max
- math.max(super.getOutputLength, forwardMax + 1)
- }
-
- def markInputFieldUnread(localPos: Int): Unit = {
- inputFields(localPos).isUsed = false
- }
-
- def markFieldCopied(inputLocalPos: Int, outputLocalPos: Int): Unit = {
- val inputField = inputFields(inputLocalPos)
- val inputGlobalPos = inputField.globalPos
- forwardSet.add((inputField, outputFields(outputLocalPos)))
- markFieldCopied(inputGlobalPos, outputLocalPos)
- }
-}
-
-class UDF2[T1, T2, R](val leftInputUDT: UDT[T1], val rightInputUDT: UDT[T2], val outputUDT: UDT[R]) extends UDF[R] {
-
- val leftInputFields = FieldSet.newInputSet(leftInputUDT)
- val leftForwardSet = mutable.Set[(InputField, OutputField)]()
- val leftDiscardSet = mutable.Set[GlobalPos]()
-
- val rightInputFields = FieldSet.newInputSet(rightInputUDT)
- val rightForwardSet = mutable.Set[(InputField, OutputField)]()
- val rightDiscardSet = mutable.Set[GlobalPos]()
-
- def getLeftInputDeserializer = leftInputUDT.getSerializer(leftInputFields.toSerializerIndexArray)
- def getLeftForwardIndexSetFrom = leftForwardSet.map(_._1.localPos)
- def getLeftForwardIndexSetTo = leftForwardSet.map(_._2.localPos)
- def getLeftForwardIndexArrayFrom = getLeftForwardIndexSetFrom.toArray
- def getLeftForwardIndexArrayTo = getLeftForwardIndexSetTo.toArray
- def getLeftDiscardIndexArray = leftDiscardSet.map(_.getValue).toArray
-
- def getRightInputDeserializer = rightInputUDT.getSerializer(rightInputFields.toSerializerIndexArray)
- def getRightForwardIndexSetFrom = rightForwardSet.map(_._1.localPos)
- def getRightForwardIndexSetTo = rightForwardSet.map(_._2.localPos)
- def getRightForwardIndexArrayFrom = getRightForwardIndexSetFrom.toArray
- def getRightForwardIndexArrayTo = getRightForwardIndexSetTo.toArray
- def getRightDiscardIndexArray = rightDiscardSet.map(_.getValue).toArray
-
- override def getOutputLength = {
- val leftForwardMax = if (leftForwardSet.isEmpty) -1 else leftForwardSet.map(_._2.localPos).max
- val rightForwardMax = if (rightForwardSet.isEmpty) -1 else rightForwardSet.map(_._2.localPos).max
- math.max(super.getOutputLength, math.max(leftForwardMax, rightForwardMax) + 1)
- }
-
- private def getInputField(localPos: Either[Int, Int]): InputField = localPos match {
- case Left(pos) => leftInputFields(pos)
- case Right(pos) => rightInputFields(pos)
- }
-
- def markInputFieldUnread(localPos: Either[Int, Int]): Unit = {
- localPos.fold(leftInputFields(_), rightInputFields(_)).isUsed = false
- }
-
- def markFieldCopied(inputLocalPos: Either[Int, Int], outputLocalPos: Int): Unit = {
- val (inputFields, forwardSet) = inputLocalPos.fold(_ => (leftInputFields, leftForwardSet), _ => (rightInputFields, rightForwardSet))
- val inputField = inputFields(inputLocalPos.merge)
- val inputGlobalPos = inputField.globalPos
- forwardSet.add((inputField, outputFields(outputLocalPos)))
- markFieldCopied(inputGlobalPos, outputLocalPos)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedType.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedType.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedType.scala
deleted file mode 100644
index aaa3b7b..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedType.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import scala.language.experimental.macros
-import scala.language.postfixOps
-
-import org.apache.flink.api.scala.codegen.Util
-
-import org.apache.flink.types.{Key => PactKey}
-import org.apache.flink.types.Record
-import org.apache.flink.types.{Value => PactValue}
-import org.apache.flink.types.StringValue
-
-
-abstract class UDT[T] extends Serializable {
- protected def createSerializer(indexMap: Array[Int]): UDTSerializer[T]
- val fieldTypes: Array[Class[_ <: org.apache.flink.types.Value]]
- val udtIdMap: Map[Int, Int]
-
- def numFields = fieldTypes.length
-
- def getSelectionIndices(selection: List[Int]) = {
- selection map { udtIdMap.getOrElse(_, -1) }
- }
-
- def getKeySet(fields: Seq[Int]): Array[Class[_ <: PactKey[_]]] = {
- fields map { fieldNum => fieldTypes(fieldNum).asInstanceOf[Class[_ <: PactKey[_]]] } toArray
- }
-
- def getSerializer(indexMap: Array[Int]): UDTSerializer[T] = {
- val ser = createSerializer(indexMap)
- ser
- }
-
- @transient private var defaultSerializer: UDTSerializer[T] = null
-
- def getSerializerWithDefaultLayout: UDTSerializer[T] = {
- // This method will be reentrant if T is a recursive type
- if (defaultSerializer == null) {
- defaultSerializer = createSerializer((0 until numFields) toArray)
- }
- defaultSerializer
- }
-}
-
-abstract class UDTSerializer[T](val indexMap: Array[Int]) {
- def serialize(item: T, record: Record)
- def deserializeRecyclingOff(record: Record): T
- def deserializeRecyclingOn(record: Record): T
-}
-
-trait UDTLowPriorityImplicits {
- implicit def createUDT[T]: UDT[T] = macro Util.createUDTImpl[T]
-}
-
-object UDT extends UDTLowPriorityImplicits {
-
- // UDTs needed by library code
-
- object NothingUDT extends UDT[Nothing] {
- override val fieldTypes = Array[Class[_ <: PactValue]]()
- override val udtIdMap: Map[Int, Int] = Map()
- override def createSerializer(indexMap: Array[Int]) = throw new UnsupportedOperationException("Cannot create UDTSerializer for type Nothing")
- }
-
- object StringUDT extends UDT[String] {
-
- override val fieldTypes = Array[Class[_ <: PactValue]](classOf[StringValue])
- override val udtIdMap: Map[Int, Int] = Map()
-
- override def createSerializer(indexMap: Array[Int]) = new UDTSerializer[String](indexMap) {
-
- private val index = indexMap(0)
-
- @transient private var pactField = new StringValue()
-
-// override def getFieldIndex(selection: Seq[String]): List[Int] = selection match {
-// case Seq() => List(index)
-//// case _ => invalidSelection(selection)
-// case _ => throw new RuntimeException("Invalid selection: " + selection)
-// }
-
- override def serialize(item: String, record: Record) = {
- if (index >= 0) {
- pactField.setValue(item)
- record.setField(index, pactField)
- }
- }
-
- override def deserializeRecyclingOff(record: Record): String = {
- if (index >= 0) {
- record.getFieldInto(index, pactField)
- pactField.getValue()
- } else {
- null
- }
- }
-
- override def deserializeRecyclingOn(record: Record): String = {
- if (index >= 0) {
- record.getFieldInto(index, pactField)
- pactField.getValue()
- } else {
- null
- }
- }
-
- private def readObject(in: java.io.ObjectInputStream) = {
- in.defaultReadObject()
- pactField = new StringValue()
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/AmbientFieldDetector.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/AmbientFieldDetector.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/AmbientFieldDetector.scala
deleted file mode 100644
index df7d77f..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/AmbientFieldDetector.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.flink.api.scala.analysis.postPass;
-// Comment out because this is not working right now
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//package org.apache.flink.api.scala.analysis.postPass
-//
-//import scala.collection.mutable
-//import scala.collection.JavaConversions._
-//
-//import org.apache.flink.api.scala.analysis._
-//import org.apache.flink.api.scala.contracts._
-//
-//import org.apache.flink.pact.compiler.plan._
-//import org.apache.flink.pact.compiler.plan.candidate.OptimizedPlan
-//
-//object AmbientFieldDetector {
-//
-// import Extractors._
-// import EdgeDependencySets.EdgeDependencySet
-//
-// def updateAmbientFields(plan: OptimizedPlan, edgeDependencies: Map[PactConnection, EdgeDependencySet], outputPositions: Map[Int, GlobalPos]): Unit = {
-// plan.getDataSinks.map(_.getSinkNode).foldLeft(Set[OptimizerNode]())(updateAmbientFields(outputPositions, edgeDependencies))
-// }
-//
-// private def updateAmbientFields(outputPositions: Map[Int, GlobalPos], edgeDependencies: Map[PactConnection, EdgeDependencySet])(visited: Set[OptimizerNode], node: OptimizerNode): Set[OptimizerNode] = {
-//
-// visited.contains(node) match {
-//
-// case true => visited
-//
-// case false => {
-// node match {
-//
-// case _: SinkJoiner | _: BinaryUnionNode =>
-// case DataSinkNode(udf, input) =>
-// case DataSourceNode(udf) =>
-//
-// case CoGroupNode(udf, _, _, leftInput, rightInput) => {
-//
-// val leftProvides = edgeDependencies(leftInput).childProvides
-// val rightProvides = edgeDependencies(rightInput).childProvides
-// val parentNeeds = edgeDependencies(node.getOutgoingConnections.head).childProvides
-// val writes = udf.outputFields.toIndexSet
-//
-// populateSets(udf.leftForwardSet, udf.leftDiscardSet, leftProvides, parentNeeds, writes, outputPositions)
-// populateSets(udf.rightForwardSet, udf.rightDiscardSet, rightProvides, parentNeeds, writes, outputPositions)
-// }
-//
-// case CrossNode(udf, leftInput, rightInput) => {
-//
-// val leftProvides = edgeDependencies(leftInput).childProvides
-// val rightProvides = edgeDependencies(rightInput).childProvides
-// val parentNeeds = edgeDependencies(node.getOutgoingConnections.head).childProvides
-// val writes = udf.outputFields.toIndexSet
-//
-// populateSets(udf.leftForwardSet, udf.leftDiscardSet, leftProvides, parentNeeds, writes, outputPositions)
-// populateSets(udf.rightForwardSet, udf.rightDiscardSet, rightProvides, parentNeeds, writes, outputPositions)
-// }
-//
-// case JoinNode(udf, _, _, leftInput, rightInput) => {
-//
-// val leftProvides = edgeDependencies(leftInput).childProvides
-// val rightProvides = edgeDependencies(rightInput).childProvides
-// val parentNeeds = edgeDependencies(node.getOutgoingConnections.head).childProvides
-// val writes = udf.outputFields.toIndexSet
-//
-// populateSets(udf.leftForwardSet, udf.leftDiscardSet, leftProvides, parentNeeds, writes, outputPositions)
-// populateSets(udf.rightForwardSet, udf.rightDiscardSet, rightProvides, parentNeeds, writes, outputPositions)
-// }
-//
-// case MapNode(udf, input) => {
-//
-// val inputProvides = edgeDependencies(input).childProvides
-// val parentNeeds = edgeDependencies(node.getOutgoingConnections.head).childProvides
-// val writes = udf.outputFields.toIndexSet
-//
-// populateSets(udf.forwardSet, udf.discardSet, inputProvides, parentNeeds, writes, outputPositions)
-// }
-//
-// case ReduceNode(udf, _, input) => {
-// val inputProvides = edgeDependencies(input).childProvides
-// val parentNeeds = edgeDependencies(node.getOutgoingConnections.head).childProvides
-// val writes = udf.outputFields.toIndexSet
-//
-// populateSets(udf.forwardSet, udf.discardSet, inputProvides, parentNeeds, writes, outputPositions)
-// }
-// }
-//
-// node.getIncomingConnections.map(_.getSource).foldLeft(visited + node)(updateAmbientFields(outputPositions, edgeDependencies))
-// }
-// }
-// }
-//
-// private def populateSets(forwards: mutable.Set[GlobalPos], discards: mutable.Set[GlobalPos], childProvides: Set[Int], parentNeeds: Set[Int], writes: Set[Int], outputPositions: Map[Int, GlobalPos]): Unit = {
-// forwards.clear()
-// forwards.addAll((parentNeeds -- writes).intersect(childProvides).map(outputPositions(_)))
-//
-// discards.clear()
-// discards.addAll((childProvides -- parentNeeds -- writes).intersect(childProvides).map(outputPositions(_)))
-// }
-//}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/EdgeDependencySets.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/EdgeDependencySets.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/EdgeDependencySets.scala
deleted file mode 100644
index c92cef5..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/EdgeDependencySets.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-package org.apache.flink.api.scala.analysis.postPass;
-// Comment out because this is not working right now
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//
-//package org.apache.flink.api.scala.analysis.postPass
-//
-//import scala.collection.JavaConversions._
-//
-//import org.apache.flink.api.scala.analysis._
-//import org.apache.flink.api.scala.contracts._
-//
-//import org.apache.flink.pact.compiler.plan._
-//import org.apache.flink.pact.compiler.plan.candidate.OptimizedPlan
-//
-//object EdgeDependencySets {
-//
-// import Extractors._
-//
-// case class EdgeDependencySet(parentNeeds: Set[Int], childProvides: Set[Int] = Set())
-//
-// def computeEdgeDependencySets(plan: OptimizedPlan, outputSets: Map[OptimizerNode, Set[Int]]): Map[PactConnection, EdgeDependencySet] = {
-//
-// plan.getDataSinks.map(_.getSinkNode).foldLeft(Map[PactConnection, EdgeDependencySet]())(computeEdgeDependencySets(outputSets))
-// }
-//
-// private def computeEdgeDependencySets(outputSets: Map[OptimizerNode, Set[Int]])(edgeDependencySets: Map[PactConnection, EdgeDependencySet], node: OptimizerNode): Map[PactConnection, EdgeDependencySet] = {
-//
-// // breadth-first traversal: parentNeeds will be None if any parent has not yet been visited
-// val parentNeeds = node.getOutgoingConnections().foldLeft(Option(Set[Int]())) {
-// case (None, _) => None
-// case (Some(acc), parent) => edgeDependencySets.get(parent) map { acc ++ _.parentNeeds }
-// }
-//
-// parentNeeds match {
-// case None => edgeDependencySets
-// case Some(parentNeeds) => computeEdgeDependencySets(node, parentNeeds, outputSets, edgeDependencySets)
-// }
-// }
-//
-// private def computeEdgeDependencySets(node: OptimizerNode, parentNeeds: Set[Int], outputSets: Map[OptimizerNode, Set[Int]], edgeDependencySets: Map[PactConnection, EdgeDependencySet]): Map[PactConnection, EdgeDependencySet] = {
-//
-// def updateEdges(needs: (PactConnection, Set[Int])*): Map[PactConnection, EdgeDependencySet] = {
-//
-// val updParents = node.getOutgoingConnections().foldLeft(edgeDependencySets) { (edgeDependencySets, parent) =>
-// val entry = edgeDependencySets(parent)
-// edgeDependencySets.updated(parent, entry.copy(childProvides = parentNeeds))
-// }
-//
-// needs.foldLeft(updParents) {
-// case (edgeDependencySets, (inConn, needs)) => {
-// val updInConn = edgeDependencySets.updated(inConn, EdgeDependencySet(needs))
-// computeEdgeDependencySets(outputSets)(updInConn, inConn.getSource)
-// }
-// }
-// }
-//
-// for (udf <- node.getUDF) {
-//
-// // suppress outputs that aren't needed by any parent
-// val writeFields = udf.outputFields filter { _.isUsed }
-// val unused = writeFields filterNot { f => parentNeeds.contains(f.globalPos.getValue) }
-//
-// for (field <- unused) {
-// field.isUsed = false
-// if (field.globalPos.isIndex)
-// field.globalPos.setIndex(Int.MinValue)
-// }
-// }
-//
-// node match {
-//
-// case DataSinkNode(udf, input) => {
-// val needs = udf.inputFields.toIndexSet
-// updateEdges(input -> needs)
-// }
-//
-// case DataSourceNode(udf) => {
-// updateEdges()
-// }
-//
-// case CoGroupNode(udf, leftKey, rightKey, leftInput, rightInput) => {
-//
-// val leftReads = udf.leftInputFields.toIndexSet ++ leftKey.selectedFields.toIndexSet
-// val rightReads = udf.rightInputFields.toIndexSet ++ rightKey.selectedFields.toIndexSet
-// val writes = udf.outputFields.toIndexSet
-//
-// val parentPreNeeds = parentNeeds -- writes
-//
-// val parentLeftNeeds = parentPreNeeds.intersect(outputSets(leftInput.getSource))
-// val parentRightNeeds = parentPreNeeds.intersect(outputSets(rightInput.getSource))
-//
-// val leftForwards = udf.leftForwardSet.map(_.getValue)
-// val rightForwards = udf.rightForwardSet.map(_.getValue)
-//
-// val (leftRes, rightRes) = parentLeftNeeds.intersect(parentRightNeeds).partition {
-// case index if leftForwards(index) && !rightForwards(index) => true
-// case index if !leftForwards(index) && rightForwards(index) => false
-// case _ => throw new UnsupportedOperationException("Schema conflict: cannot forward the same field from both sides of a two-input operator.")
-// }
-//
-// val leftNeeds = (parentLeftNeeds -- rightRes) ++ leftReads
-// val rightNeeds = (parentRightNeeds -- leftRes) ++ rightReads
-//
-// updateEdges(leftInput -> leftNeeds, rightInput -> rightNeeds)
-// }
-//
-// case CrossNode(udf, leftInput, rightInput) => {
-//
-// val leftReads = udf.leftInputFields.toIndexSet
-// val rightReads = udf.rightInputFields.toIndexSet
-// val writes = udf.outputFields.toIndexSet
-//
-// val parentPreNeeds = parentNeeds -- writes
-//
-// val parentLeftNeeds = parentPreNeeds.intersect(outputSets(leftInput.getSource))
-// val parentRightNeeds = parentPreNeeds.intersect(outputSets(rightInput.getSource))
-//
-// val leftForwards = udf.leftForwardSet.map(_.getValue)
-// val rightForwards = udf.rightForwardSet.map(_.getValue)
-//
-// val (leftRes, rightRes) = parentLeftNeeds.intersect(parentRightNeeds).partition {
-// case index if leftForwards(index) && !rightForwards(index) => true
-// case index if !leftForwards(index) && rightForwards(index) => false
-// case _ => throw new UnsupportedOperationException("Schema conflict: cannot forward the same field from both sides of a two-input operator.")
-// }
-//
-// val leftNeeds = (parentLeftNeeds -- rightRes) ++ leftReads
-// val rightNeeds = (parentRightNeeds -- leftRes) ++ rightReads
-//
-// updateEdges(leftInput -> leftNeeds, rightInput -> rightNeeds)
-// }
-//
-// case JoinNode(udf, leftKey, rightKey, leftInput, rightInput) => {
-//
-// val leftReads = udf.leftInputFields.toIndexSet ++ leftKey.selectedFields.toIndexSet
-// val rightReads = udf.rightInputFields.toIndexSet ++ rightKey.selectedFields.toIndexSet
-// val writes = udf.outputFields.toIndexSet
-//
-// val parentPreNeeds = parentNeeds -- writes
-//
-// val parentLeftNeeds = parentPreNeeds.intersect(outputSets(leftInput.getSource))
-// val parentRightNeeds = parentPreNeeds.intersect(outputSets(rightInput.getSource))
-//
-// val leftForwards = udf.leftForwardSet.map(_.getValue)
-// val rightForwards = udf.rightForwardSet.map(_.getValue)
-//
-// val (leftRes, rightRes) = parentLeftNeeds.intersect(parentRightNeeds).partition {
-// case index if leftForwards(index) && !rightForwards(index) => true
-// case index if !leftForwards(index) && rightForwards(index) => false
-// case _ => throw new UnsupportedOperationException("Schema conflict: cannot forward the same field from both sides of a two-input operator.")
-// }
-//
-// val leftNeeds = (parentLeftNeeds -- rightRes) ++ leftReads
-// val rightNeeds = (parentRightNeeds -- leftRes) ++ rightReads
-//
-// updateEdges(leftInput -> leftNeeds, rightInput -> rightNeeds)
-// }
-//
-// case MapNode(udf, input) => {
-//
-// val reads = udf.inputFields.toIndexSet
-// val writes = udf.outputFields.toIndexSet
-//
-// val needs = parentNeeds -- writes ++ reads
-//
-// updateEdges(input -> needs)
-// }
-//
-// case ReduceNode(udf, key, input) => {
-//
-// val reads = udf.inputFields.toIndexSet ++ key.selectedFields.toIndexSet
-// val writes = udf.outputFields.toIndexSet
-//
-// val needs = parentNeeds -- writes ++ reads
-//
-// updateEdges(input -> needs)
-// }
-//
-// case _: SinkJoiner | _: BinaryUnionNode => {
-// updateEdges(node.getIncomingConnections.map(_ -> parentNeeds): _*)
-// }
-// }
-// }
-//}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/Extractors.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/Extractors.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/Extractors.scala
deleted file mode 100644
index 09e32d5..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/Extractors.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis.postPass
-
-import scala.language.implicitConversions
-
-import scala.Some
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.analysis.FieldSelector
-import org.apache.flink.api.scala.analysis.UDF
-import org.apache.flink.api.scala.analysis.UDF0
-import org.apache.flink.api.scala.analysis.UDF1
-import org.apache.flink.api.scala.analysis.UDF2
-
-import org.apache.flink.api.java.record.operators.CoGroupOperator
-import org.apache.flink.api.java.record.operators.CrossOperator
-import org.apache.flink.api.java.record.operators.MapOperator
-import org.apache.flink.api.java.record.operators.JoinOperator
-import org.apache.flink.api.java.record.operators.ReduceOperator
-import org.apache.flink.compiler.dag.BinaryUnionNode
-import org.apache.flink.compiler.dag.BulkIterationNode
-import org.apache.flink.compiler.dag.CoGroupNode
-import org.apache.flink.compiler.dag.CrossNode
-import org.apache.flink.compiler.dag.DataSinkNode
-import org.apache.flink.compiler.dag.DataSourceNode
-import org.apache.flink.compiler.dag.CollectorMapNode
-import org.apache.flink.compiler.dag.MatchNode
-import org.apache.flink.compiler.dag.OptimizerNode
-import org.apache.flink.compiler.dag.PactConnection
-import org.apache.flink.compiler.dag.GroupReduceNode
-import org.apache.flink.compiler.dag.SinkJoiner
-import org.apache.flink.compiler.dag.WorksetIterationNode
-import org.apache.flink.api.common.operators.Union
-import org.apache.flink.api.common.operators.base.{BulkIterationBase => BulkIteration, DeltaIterationBase => DeltaIteration, GenericDataSinkBase, GenericDataSourceBase}
-
-object Extractors {
-
- implicit def nodeToGetUDF(node: OptimizerNode) = new {
- def getUDF: Option[UDF[_]] = node match {
- case _: SinkJoiner | _: BinaryUnionNode => None
- case _ => {
- Some(node.getPactContract.asInstanceOf[ScalaOperator[_, _]].getUDF)
- }
- }
- }
-
- object DataSinkNode {
- def unapply(node: OptimizerNode): Option[(UDF1[_, _], PactConnection)] = node match {
- case node: DataSinkNode => node.getPactContract match {
- case contract: GenericDataSinkBase[_] with ScalaOutputOperator[_] => {
- Some((contract.getUDF, node.getInputConnection))
- }
- case _ => None
- }
- case _ => None
- }
- }
-
- object DataSourceNode {
- def unapply(node: OptimizerNode): Option[(UDF0[_])] = node match {
- case node: DataSourceNode => node.getPactContract() match {
- case contract: GenericDataSourceBase[_, _] with ScalaOperator[_, _] => Some(contract.getUDF.asInstanceOf[UDF0[_]])
- case _ => None
- }
- case _ => None
- }
- }
-
- object CoGroupNode {
- def unapply(node: OptimizerNode): Option[(UDF2[_, _, _], FieldSelector, FieldSelector, PactConnection, PactConnection)] = node match {
- case node: CoGroupNode => node.getPactContract() match {
- case contract: CoGroupOperator with TwoInputKeyedScalaOperator[_, _, _] => Some((contract.getUDF, contract.leftKey, contract.rightKey, node.getFirstIncomingConnection, node.getSecondIncomingConnection))
- case _ => None
- }
- case _ => None
- }
- }
-
- object CrossNode {
- def unapply(node: OptimizerNode): Option[(UDF2[_, _, _], PactConnection, PactConnection)] = node match {
- case node: CrossNode => node.getPactContract match {
- case contract: CrossOperator with TwoInputScalaOperator[_, _, _] => Some((contract.getUDF, node.getFirstIncomingConnection, node.getSecondIncomingConnection))
- case _ => None
- }
- case _ => None
- }
- }
-
- object JoinNode {
- def unapply(node: OptimizerNode): Option[(UDF2[_, _, _], FieldSelector, FieldSelector, PactConnection, PactConnection)] = node match {
- case node: MatchNode => node.getPactContract match {
- case contract: JoinOperator with TwoInputKeyedScalaOperator[_, _, _] => Some((contract.getUDF, contract.leftKey, contract.rightKey, node.getFirstIncomingConnection, node.getSecondIncomingConnection))
- case _ => None
- }
- case _ => None
- }
- }
-
- object MapNode {
- def unapply(node: OptimizerNode): Option[(UDF1[_, _], PactConnection)] = node match {
- case node: CollectorMapNode => node.getPactContract match {
- case contract: MapOperator with OneInputScalaOperator[_, _] => Some((contract.getUDF, node.getIncomingConnection))
- case _ => None
- }
- case _ => None
- }
- }
-
- object UnionNode {
- def unapply(node: OptimizerNode): Option[(UDF2[_, _, _], PactConnection, PactConnection)] = node match {
- case node: BinaryUnionNode => node.getPactContract match {
- case contract: Union[_] with UnionScalaOperator[_] => Some((contract.getUDF, node.getFirstIncomingConnection(), node.getSecondIncomingConnection()))
- case _ => None
- }
- case _ => None
- }
- }
-
- object ReduceNode {
- def unapply(node: OptimizerNode): Option[(UDF1[_, _], FieldSelector, PactConnection)] = node match {
- case node: GroupReduceNode => node.getPactContract match {
- case contract: ReduceOperator with OneInputKeyedScalaOperator[_, _] => Some((contract.getUDF, contract.key, node.getIncomingConnection))
- case contract: ReduceOperator with OneInputScalaOperator[_, _] => Some((contract.getUDF, new FieldSelector(contract.getUDF.inputUDT, Nil), node.getIncomingConnection))
- case _ => None
- }
- case _ => None
- }
- }
- object DeltaIterationNode {
- def unapply(node: OptimizerNode): Option[(UDF0[_], FieldSelector, PactConnection, PactConnection)] = node match {
- case node: WorksetIterationNode => node.getPactContract match {
- case contract: DeltaIteration[_, _] with DeltaIterationScalaOperator[_] => Some((contract.getUDF, contract.key, node.getFirstIncomingConnection(), node.getSecondIncomingConnection()))
- case _ => None
- }
- case _ => None
- }
- }
-
- object BulkIterationNode {
- def unapply(node: OptimizerNode): Option[(UDF0[_], PactConnection)] = node match {
- case node: BulkIterationNode => node.getPactContract match {
- case contract: BulkIteration[_] with BulkIterationScalaOperator[_] => Some((contract.getUDF, node.getIncomingConnection()))
- case _ => None
- }
- case _ => None
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaCompactor.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaCompactor.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaCompactor.scala
deleted file mode 100644
index e74695f..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaCompactor.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-package org.apache.flink.api.scala.analysis.postPass;
-// Comment out because this is not working right now
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//
-//package org.apache.flink.api.scala.analysis.postPass
-//
-//import scala.collection.mutable
-//import scala.collection.JavaConversions._
-//
-//import org.apache.flink.api.scala.analysis._
-//import org.apache.flink.api.scala.contracts._
-//
-//import org.apache.flink.pact.compiler.plan._
-//import org.apache.flink.pact.compiler.plan.candidate.OptimizedPlan
-//
-//object GlobalSchemaCompactor {
-//
-// import Extractors._
-//
-// def compactSchema(plan: OptimizedPlan): Unit = {
-//
-// val (_, conflicts) = plan.getDataSinks.map(_.getSinkNode).foldLeft((Set[OptimizerNode](), Map[GlobalPos, Set[GlobalPos]]())) {
-// case ((visited, conflicts), node) => findConflicts(node, visited, conflicts)
-// }
-//
-// // Reset all position indexes before reassigning them
-// conflicts.keys.foreach { _.setIndex(Int.MinValue) }
-//
-// plan.getDataSinks.map(_.getSinkNode).foldLeft(Set[OptimizerNode]())(compactSchema(conflicts))
-// }
-//
-// /**
-// * Two fields are in conflict when they exist in the same place (record) at the same time (plan node).
-// * If two fields are in conflict, then they must be assigned different indexes.
-// *
-// * p1 conflictsWith p2 =
-// * Exists(n in Nodes):
-// * p1 != p2 &&
-// * (
-// * (p1 in n.forwards && p2 in n.forwards) ||
-// * (p1 in n.forwards && p2 in n.outputs) ||
-// * (p2 in n.forwards && p1 in n.outputs)
-// * )
-// */
-// private def findConflicts(node: OptimizerNode, visited: Set[OptimizerNode], conflicts: Map[GlobalPos, Set[GlobalPos]]): (Set[OptimizerNode], Map[GlobalPos, Set[GlobalPos]]) = {
-//
-// visited.contains(node) match {
-//
-// case true => (visited, conflicts)
-//
-// case false => {
-//
-// val (forwardPos, outputFields) = node.getUDF match {
-// case None => (Set[GlobalPos](), Set[OutputField]())
-// case Some(udf: UDF0[_]) => (Set[GlobalPos](), udf.outputFields.toSet)
-// case Some(udf: UDF1[_, _]) => (udf.forwardSet, udf.outputFields.toSet)
-// case Some(udf: UDF2[_, _, _]) => (udf.leftForwardSet ++ udf.rightForwardSet, udf.outputFields.toSet)
-// case _ => (Set[GlobalPos](), Set[OutputField]())
-// }
-//
-// // resolve GlobalPos references to the instance that holds the actual index
-// val forwards = forwardPos map { _.resolve }
-// val outputs = outputFields filter { _.isUsed } map { _.globalPos.resolve }
-//
-// val newConflictsF = forwards.foldLeft(conflicts) {
-// case (conflicts, fPos) => {
-// // add all other forwards and all outputs to this forward's conflict set
-// val fConflicts = conflicts.getOrElse(fPos, Set()) ++ (forwards filterNot { _ == fPos }) ++ outputs
-// conflicts.updated(fPos, fConflicts)
-// }
-// }
-//
-// val newConflictsO = outputs.foldLeft(newConflictsF) {
-// case (conflicts, oPos) => {
-// // add all forwards to this output's conflict set
-// val oConflicts = conflicts.getOrElse(oPos, Set()) ++ forwards
-// conflicts.updated(oPos, oConflicts)
-// }
-// }
-//
-// node.getIncomingConnections.map(_.getSource).foldLeft((visited + node, newConflictsO)) {
-// case ((visited, conflicts), node) => findConflicts(node, visited, conflicts)
-// }
-// }
-// }
-// }
-//
-// /**
-// * Assign indexes bottom-up, giving lower values to fields with larger conflict sets.
-// * This ordering should do a decent job of minimizing the number of gaps between fields.
-// */
-// private def compactSchema(conflicts: Map[GlobalPos, Set[GlobalPos]])(visited: Set[OptimizerNode], node: OptimizerNode): Set[OptimizerNode] = {
-//
-// visited.contains(node) match {
-//
-// case true => visited
-//
-// case false => {
-//
-// val newVisited = node.getIncomingConnections.map(_.getSource).foldLeft(visited + node)(compactSchema(conflicts))
-//
-// val outputFields = node.getUDF match {
-// case None => Seq[OutputField]()
-// case Some(udf) => udf.outputFields filter { _.isUsed }
-// }
-//
-// val outputs = outputFields map {
-// case field => {
-// val pos = field.globalPos.resolve
-// (pos, field.localPos, conflicts(pos) map { _.getValue })
-// }
-// } sortBy {
-// case (_, localPos, posConflicts) => (Int.MaxValue - posConflicts.size, localPos)
-// }
-//
-// val initUsed = outputs map { _._1.getValue } filter { _ >= 0 } toSet
-//
-// val used = outputs.filter(_._1.getValue < 0).foldLeft(initUsed) {
-// case (used, (pos, _, conflicts)) => {
-// val index = chooseIndexValue(used ++ conflicts)
-// pos.setIndex(index)
-// used + index
-// }
-// }
-//
-// node.getUDF match {
-// case Some(udf: UDF1[_, _]) => updateDiscards(used, udf.discardSet)
-// case Some(udf: UDF2[_, _, _]) => updateDiscards(used, udf.leftDiscardSet, udf.rightDiscardSet)
-// case _ =>
-// }
-//
-// newVisited
-// }
-// }
-// }
-//
-// private def chooseIndexValue(used: Set[Int]): Int = {
-// var index = 0
-// while (used(index)) {
-// index = index + 1
-// }
-// index
-// }
-//
-// private def updateDiscards(outputs: Set[Int], discardSets: mutable.Set[GlobalPos]*): Unit = {
-// for (discardSet <- discardSets) {
-//
-// val overwrites = discardSet filter { pos => outputs.contains(pos.getValue) } toList
-//
-// for (pos <- overwrites)
-// discardSet.remove(pos)
-// }
-// }
-//}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaOptimizer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaOptimizer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaOptimizer.scala
deleted file mode 100644
index 607a41d..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaOptimizer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis.postPass
-
-import scala.collection.JavaConversions.asScalaBuffer
-import scala.collection.JavaConversions.collectionAsScalaIterable
-
-import org.apache.flink.api.scala.ScalaOperator
-
-import org.apache.flink.compiler.dag.OptimizerNode
-import org.apache.flink.compiler.plan.OptimizedPlan
-
-
-trait GlobalSchemaOptimizer {
-
- import Extractors._
-
- def optimizeSchema(plan: OptimizedPlan, compactSchema: Boolean): Unit = {
-
-// val (outputSets, outputPositions) = OutputSets.computeOutputSets(plan)
-// val edgeSchemas = EdgeDependencySets.computeEdgeDependencySets(plan, outputSets)
-//
-// AmbientFieldDetector.updateAmbientFields(plan, edgeSchemas, outputPositions)
-//
-// if (compactSchema) {
-// GlobalSchemaCompactor.compactSchema(plan)
-// }
-
- GlobalSchemaPrinter.printSchema(plan)
-
- plan.getDataSinks.map(_.getSinkNode).foldLeft(Set[OptimizerNode]())(persistConfiguration)
- }
-
- private def persistConfiguration(visited: Set[OptimizerNode], node: OptimizerNode): Set[OptimizerNode] = {
-
- visited.contains(node) match {
-
- case true => visited
-
- case false => {
-
- val children = node.getIncomingConnections.map(_.getSource).toSet
- val newVisited = children.foldLeft(visited + node)(persistConfiguration)
-
- node.getPactContract match {
-
- case c: ScalaOperator[_, _] => c.persistConfiguration(Some(node))
- case _ =>
- }
-
- newVisited
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaPrinter.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaPrinter.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaPrinter.scala
deleted file mode 100644
index 9e79613..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaPrinter.scala
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis.postPass
-
-import scala.Array.canBuildFrom
-import scala.collection.JavaConversions.asScalaBuffer
-import scala.collection.JavaConversions.collectionAsScalaIterable
-
-import org.apache.commons.logging.{LogFactory, Log}
-
-import Extractors.CoGroupNode
-import Extractors.CrossNode
-import Extractors.DataSinkNode
-import Extractors.DataSourceNode
-import Extractors.JoinNode
-import Extractors.MapNode
-import Extractors.ReduceNode
-
-import org.apache.flink.api.scala.analysis.FieldSet
-import org.apache.flink.api.scala.analysis.FieldSelector
-
-import org.apache.flink.compiler.dag.BinaryUnionNode
-import org.apache.flink.compiler.dag.OptimizerNode
-import org.apache.flink.compiler.dag.SinkJoiner
-import org.apache.flink.compiler.plan.OptimizedPlan
-import org.apache.flink.api.common.Plan
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.common.operators.SingleInputOperator
-import org.apache.flink.api.common.operators.DualInputOperator
-import org.apache.flink.api.java.record.operators.GenericDataSink
-
-object GlobalSchemaPrinter {
-
- import Extractors._
-
- private final val LOG: Log = LogFactory.getLog(classOf[GlobalSchemaOptimizer])
-
- def printSchema(plan: OptimizedPlan): Unit = {
-
- LOG.debug("### " + plan.getJobName + " ###")
- plan.getDataSinks.map(_.getSinkNode).foldLeft(Set[OptimizerNode]())(printSchema)
- LOG.debug("####" + ("#" * plan.getJobName.length) + "####")
- }
-
- private def printSchema(visited: Set[OptimizerNode], node: OptimizerNode): Set[OptimizerNode] = {
-
- visited.contains(node) match {
-
- case true => visited
-
- case false => {
-
- val children = node.getIncomingConnections.map(_.getSource).toSet
- val newVisited = children.foldLeft(visited + node)(printSchema)
-
- node match {
-
- case _: SinkJoiner | _: BinaryUnionNode =>
-
- case DataSinkNode(udf, input) => {
- printInfo(node, "Sink",
- Seq(),
- Seq(("", udf.inputFields)),
- Seq(("", udf.getForwardIndexArrayFrom)),
- Seq(("", udf.getDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case DataSourceNode(udf) => {
- printInfo(node, "Source",
- Seq(),
- Seq(),
- Seq(),
- Seq(),
- udf.outputFields
- )
- }
-
- case CoGroupNode(udf, leftKey, rightKey, leftInput, rightInput) => {
- printInfo(node, "CoGroup",
- Seq(("L", leftKey), ("R", rightKey)),
- Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
- Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
- Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case CrossNode(udf, leftInput, rightInput) => {
- printInfo(node, "Cross",
- Seq(),
- Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
- Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
- Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case JoinNode(udf, leftKey, rightKey, leftInput, rightInput) => {
- printInfo(node, "Join",
- Seq(("L", leftKey), ("R", rightKey)),
- Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
- Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
- Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case MapNode(udf, input) => {
- printInfo(node, "Map",
- Seq(),
- Seq(("", udf.inputFields)),
- Seq(("", udf.getForwardIndexArrayFrom)),
- Seq(("", udf.getDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case UnionNode(udf, leftInput, rightInput) => {
- printInfo(node, "Union",
- Seq(),
- Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
- Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
- Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
- udf.outputFields
- )
- }
-
- case ReduceNode(udf, key, input) => {
-
-// val contract = node.asInstanceOf[Reduce4sContract[_, _, _]]
-// contract.userCombineCode map { _ =>
-// printInfo(node, "Combine",
-// Seq(("", key)),
-// Seq(("", udf.inputFields)),
-// Seq(("", contract.combineForwardSet.toArray)),
-// Seq(("", contract.combineDiscardSet.toArray)),
-// udf.inputFields
-// )
-// }
-
- printInfo(node, "Reduce",
- Seq(("", key)),
- Seq(("", udf.inputFields)),
- Seq(("", udf.getForwardIndexArrayFrom)),
- Seq(("", udf.getDiscardIndexArray)),
- udf.outputFields
- )
- }
- case DeltaIterationNode(udf, key, input1, input2) => {
-
- printInfo(node, "WorksetIterate",
- Seq(("", key)),
- Seq(),
- Seq(),
- Seq(),
- udf.outputFields)
- }
-
- case BulkIterationNode(udf, input1) => {
-
- printInfo(node, "BulkIterate",
- Seq(),
- Seq(),
- Seq(),
- Seq(),
- udf.outputFields)
- }
-
- }
-
- newVisited
- }
- }
- }
-
- private def printInfo(node: OptimizerNode, kind: String, keys: Seq[(String, FieldSelector)], reads: Seq[(String, FieldSet[_])], forwards: Seq[(String, Array[Int])], discards: Seq[(String, Array[Int])], writes: FieldSet[_]): Unit = {
-
- def indexesToStrings(pre: String, indexes: Array[Int]) = indexes map {
- case -1 => "_"
- case i => pre + i
- }
-
- val formatString = "%s (%s): K{%s}: R[%s] => F[%s] - D[%s] + W[%s]"
-
- val name = node.getName
-
- val sKeys = keys flatMap { case (pre, value) => value.selectedFields.toSerializerIndexArray.map(pre + _) } mkString ", "
- val sReads = reads flatMap { case (pre, value) => indexesToStrings(pre, value.toSerializerIndexArray) } mkString ", "
- val sForwards = forwards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", "
- val sDiscards = discards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", "
- val sWrites = indexesToStrings("", writes.toSerializerIndexArray) mkString ", "
-
- LOG.debug(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/OutputSets.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/OutputSets.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/OutputSets.scala
deleted file mode 100644
index 50ece34..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/OutputSets.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis.postPass
-
-import scala.language.reflectiveCalls
-import scala.collection.JavaConversions._
-
-import org.apache.flink.api.scala.analysis._
-
-import org.apache.flink.compiler.dag._
-import org.apache.flink.compiler.plan.OptimizedPlan
-
-
-object OutputSets {
-
- import Extractors._
-
- def computeOutputSets(plan: OptimizedPlan): (Map[OptimizerNode, Set[Int]], Map[Int, GlobalPos]) = {
-
- val root = plan.getDataSinks.map(s => s.getSinkNode: OptimizerNode).reduceLeft((n1, n2) => new SinkJoiner(n1, n2))
- val outputSets = computeOutputSets(Map[OptimizerNode, Set[GlobalPos]](), root)
- val outputPositions = outputSets(root).map(pos => (pos.getValue, pos)).toMap
-
- (outputSets.mapValues(_.map(_.getValue)), outputPositions)
- }
-
- private def computeOutputSets(outputSets: Map[OptimizerNode, Set[GlobalPos]], node: OptimizerNode): Map[OptimizerNode, Set[GlobalPos]] = {
-
- outputSets.contains(node) match {
-
- case true => outputSets
-
- case false => {
-
- val children = node.getIncomingConnections.map(_.getSource).toSet
- val newOutputSets = children.foldLeft(outputSets)(computeOutputSets)
-
- val childOutputs = children.map(newOutputSets(_)).flatten
- val nodeOutputs = node.getUDF map { _.outputFields.filter(_.isUsed).map(_.globalPos).toSet } getOrElse Set()
-
- newOutputSets.updated(node, childOutputs ++ nodeOutputs)
- }
- }
- }
-}