You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:49 UTC

[07/60] Rewrite the Scala API as (somewhat) thin Layer on Java API

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/FieldSelector.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/FieldSelector.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/FieldSelector.scala
deleted file mode 100644
index d201585..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/FieldSelector.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import FieldSet.toSeq
-
-/**
- * Instances of this class are typically created by the field selector macros fieldSelectorImpl
- * and keySelectorImpl in {@link FieldSelectorMacros}. 
- *
- * In addition to the language restrictions applied to the lambda expression, the selected fields
- * must also be top-level. Nested fields (such as a list element or an inner instance of a
- * recursive type) are not allowed.
- *
- * @param selection The selected fields
- */
-class FieldSelector(udt: UDT[_], selection: List[Int]) extends Serializable {
-  
-  val inputFields = FieldSet.newInputSet(udt.numFields)
-  val selectionIndices = udt.getSelectionIndices(selection)
-  val selectedFields = inputFields.select(selectionIndices)
-
-  for (field <- inputFields.diff(selectedFields))
-    field.isUsed = false
-  
-  def copy() = new FieldSelector(udt, selection)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaFields.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaFields.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaFields.scala
deleted file mode 100644
index 80cd455..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaFields.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-abstract sealed class Field extends Serializable {
-  val localPos: Int
-  val globalPos: GlobalPos
-  var isUsed: Boolean = true
-}
-
-case class InputField(val localPos: Int, val globalPos: GlobalPos = new GlobalPos) extends Field
-case class OutputField(val localPos: Int, val globalPos: GlobalPos = new GlobalPos) extends Field
-
-class FieldSet[+FieldType <: Field] private (private val fields: Seq[FieldType]) extends Serializable {
-
-  private var globalized: Boolean = false
-  def isGlobalized = globalized
-
-  def setGlobalized(): Unit = {
-    assert(!globalized, "Field set has already been globalized")
-    globalized = true
-  }
-
-  def apply(localPos: Int): FieldType = {
-    fields.find(_.localPos == localPos).get
-  }
-
-  def select(selection: Seq[Int]): FieldSet[FieldType] = {
-    val outer = this
-    new FieldSet[FieldType](selection map apply) {
-      override def setGlobalized() = outer.setGlobalized()
-      override def isGlobalized = outer.isGlobalized
-    }
-  }
-
-  def toSerializerIndexArray: Array[Int] = fields map {
-    case field if field.isUsed => field.localPos
-//    case field if field.isUsed => field.globalPos.getValue
-    case _                     => -1
-  } toArray
-
-  def toIndexSet: Set[Int] = fields.map(_.localPos).toSet
-//  def toIndexSet: Set[Int] = fields.filter(_.isUsed).map(_.globalPos.getValue).toSet
-
-  def toIndexArray: Array[Int] = fields.map { _.localPos }.toArray
-
-//  def mapToArray[T: ClassTag](fun: FieldType => T): Array[T] = {
-//    (fields map fun) toArray
-//  }
-}
-
-object FieldSet {
-
-  def newInputSet(numFields: Int): FieldSet[InputField] = new FieldSet((0 until numFields) map { InputField(_) })
-  def newOutputSet(numFields: Int): FieldSet[OutputField] = new FieldSet((0 until numFields) map { OutputField(_) })
-
-  def newInputSet[T](udt: UDT[T]): FieldSet[InputField] = newInputSet(udt.numFields)
-  def newOutputSet[T](udt: UDT[T]): FieldSet[OutputField] = newOutputSet(udt.numFields)
-
-  implicit def toSeq[FieldType <: Field](fieldSet: FieldSet[FieldType]): Seq[FieldType] = fieldSet.fields
-}
-
-class GlobalPos extends Serializable {
-
-  private var pos: Either[Int, GlobalPos] = null
-
-  def getValue: Int = pos match {
-    case null          => -1
-    case Left(index)   => index
-    case Right(target) => target.getValue
-  }
-
-  def getIndex: Option[Int] = pos match {
-    case null | Right(_) => None
-    case Left(index)     => Some(index)
-  }
-
-  def getReference: Option[GlobalPos] = pos match {
-    case null | Left(_) => None
-    case Right(target)  => Some(target)
-  }
-
-  def resolve: GlobalPos = pos match {
-    case null          => this
-    case Left(_)       => this
-    case Right(target) => target.resolve
-  }
-
-  def isUnknown = pos == null
-  def isIndex = (pos != null) && pos.isLeft
-  def isReference = (pos != null) && pos.isRight
-
-  def setIndex(index: Int) = {
-    assert(pos == null || pos.isLeft, "Cannot convert a position reference to an index")
-    pos = Left(index)
-  }
-
-  def setReference(target: GlobalPos) = {
-//    assert(pos == null, "Cannot overwrite a known position with a reference")
-    pos = Right(target)
-  }
-}
-
-object GlobalPos {
-
-  object Unknown {
-    def unapply(globalPos: GlobalPos): Boolean = globalPos.isUnknown
-  }
-
-  object Index {
-    def unapply(globalPos: GlobalPos): Option[Int] = globalPos.getIndex
-  }
-
-  object Reference {
-    def unapply(globalPos: GlobalPos): Option[GlobalPos] = globalPos.getReference
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaGenerator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaGenerator.scala
deleted file mode 100644
index 609d41a..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaGenerator.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import java.util.{List => JList}
-
-import scala.Some
-
-import org.apache.flink.api.scala._
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.java.record.operators._
-import org.apache.flink.api.common.operators.base.{BulkIterationBase => BulkIteration, DeltaIterationBase => DeltaIteration, GenericDataSourceBase, MapOperatorBase}
-import org.apache.flink.api.common.operators.Union
-import org.apache.flink.types.Record
-
-
-class GlobalSchemaGenerator {
-
-  def initGlobalSchema(sinks: Seq[Operator[Record] with ScalaOperator[_, _]]): Unit = {
-    // don't do anything, we don't need global positions if we don't do reordering of operators
-    // FieldSet.toSerializerIndexArray returns local positions and ignores global positions
-    // sinks.foldLeft(0) { (freePos, contract) => globalizeContract(contract, Seq(), Map(), None, freePos) }
-  }
-
-  /**
-   * Computes disjoint write sets for a contract and its inputs.
-   *
-   * @param contract The contract to globalize
-   * @param parentInputs Input fields which should be bound to the contract's outputs
-   * @param proxies Provides contracts for iteration placeholders
-   * @param fixedOutputs Specifies required positions for the contract's output fields, or None to allocate new positions
-   * @param freePos The current first available position in the global schema
-   * @return The new first available position in the global schema
-   */
-  private def globalizeContract(contract: Operator[Record], parentInputs: Seq[FieldSet[InputField]], proxies: Map[Operator[Record], Operator[Record] with ScalaOperator[_, _]], fixedOutputs: Option[FieldSet[Field]], freePos: Int): Int = {
-
-    val contract4s = proxies.getOrElse(contract, contract.asInstanceOf[Operator[Record] with ScalaOperator[_, _]])
-
-    parentInputs.foreach(contract4s.getUDF.attachOutputsToInputs)
-
-    contract4s.getUDF.outputFields.isGlobalized match {
-
-      case true => freePos
-
-      case false => {
-
-        val freePos1 = globalizeContract(contract4s, proxies, fixedOutputs, freePos)
-
-        contract4s.persistConfiguration(None)
-
-        freePos1
-      }
-    }
-  }
-
-  private def globalizeContract(contract: Operator[_] with ScalaOperator[_, _], proxies: Map[Operator[Record], Operator[Record] with ScalaOperator[_, _]], fixedOutputs: Option[FieldSet[Field]], freePos: Int): Int = {
-
-    contract match {
-
-      case contract : FileDataSink with ScalaOutputOperator[_] => {
-        contract.getUDF.outputFields.setGlobalized()
-        globalizeContract(contract.getInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.asInstanceOf[UDF1[_,_]].inputFields), proxies, None, freePos)
-      }
-
-      case contract: GenericDataSourceBase[_, _] with ScalaOperator[_, _] => {
-        contract.getUDF.setOutputGlobalIndexes(freePos, fixedOutputs)
-      }
-
-      case contract : BulkIteration[_] with BulkIterationScalaOperator[_] => {
-        val s0 = contract.getInput().asInstanceOf[Operator[Record]]
-
-        val s0contract = proxies.getOrElse(s0, s0.asInstanceOf[Operator[Record] with ScalaOperator[_, Record]])
-        val newProxies = proxies + (contract.getPartialSolution().asInstanceOf[Operator[Record]] -> s0contract)
-
-        val freePos1 = globalizeContract(s0, Seq(), proxies, fixedOutputs, freePos)
-        val freePos2 = globalizeContract(contract.getNextPartialSolution().asInstanceOf[Operator[Record]], Seq(), newProxies, Some(s0contract.getUDF.outputFields), freePos1)
-        val freePos3 = Option(contract.getTerminationCriterion().asInstanceOf[Operator[Record]]) map { globalizeContract(_, Seq(), newProxies, None, freePos2) } getOrElse freePos2
-
-        contract.getUDF.assignOutputGlobalIndexes(s0contract.getUDF.outputFields)
-
-        freePos3
-      }
-
-      case contract : DeltaIteration[_, _] with DeltaIterationScalaOperator[_] => {
-//      case contract @ WorksetIterate4sContract(s0, ws0, deltaS, newWS, placeholderS, placeholderWS) => {
-        val s0 = contract.getInitialSolutionSet().asInstanceOf[Operator[Record]]
-        val ws0 = contract.getInitialWorkset().asInstanceOf[Operator[Record]]
-        val deltaS = contract.getSolutionSetDelta.asInstanceOf[Operator[Record]]
-        val newWS = contract.getNextWorkset.asInstanceOf[Operator[Record]]
-
-        val s0contract = proxies.getOrElse(s0, s0.asInstanceOf[Operator[Record] with ScalaOperator[_, _]])
-        val ws0contract = proxies.getOrElse(ws0, ws0.asInstanceOf[Operator[Record] with ScalaOperator[_, _]])
-        val newProxies = proxies + (contract.getSolutionSetDelta.asInstanceOf[Operator[Record]] -> s0contract) + (contract.getNextWorkset.asInstanceOf[Operator[Record]] -> ws0contract)
-
-        val freePos1 = globalizeContract(s0, Seq(contract.key.inputFields), proxies, fixedOutputs, freePos)
-        val freePos2 = globalizeContract(ws0, Seq(), proxies, None, freePos1)
-        val freePos3 = globalizeContract(deltaS, Seq(), newProxies, Some(s0contract.getUDF.outputFields), freePos2)
-        val freePos4 = globalizeContract(newWS, Seq(), newProxies, Some(ws0contract.getUDF.outputFields), freePos3)
-
-        contract.getUDF.assignOutputGlobalIndexes(s0contract.getUDF.outputFields)
-
-        freePos4
-      }
-
-      case contract : CoGroupOperator with TwoInputKeyedScalaOperator[_, _, _] => {
-
-        val freePos1 = globalizeContract(contract.getFirstInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.leftInputFields, contract.leftKey.inputFields), proxies, None, freePos)
-        val freePos2 = globalizeContract(contract.getSecondInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.rightInputFields, contract.rightKey.inputFields), proxies, None, freePos1)
-
-        contract.getUDF.setOutputGlobalIndexes(freePos2, fixedOutputs)
-      }
-
-      case contract: CrossOperator with TwoInputScalaOperator[_, _, _] => {
-
-        val freePos1 = globalizeContract(contract.getFirstInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.leftInputFields), proxies, None, freePos)
-        val freePos2 = globalizeContract(contract.getSecondInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.rightInputFields), proxies, None, freePos1)
-
-        contract.getUDF.setOutputGlobalIndexes(freePos2, fixedOutputs)
-      }
-
-      case contract : JoinOperator with TwoInputKeyedScalaOperator[_, _, _] => {
-
-        val freePos1 = globalizeContract(contract.getFirstInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.leftInputFields, contract.leftKey.inputFields), proxies, None, freePos)
-        val freePos2 = globalizeContract(contract.getSecondInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.rightInputFields, contract.rightKey.inputFields), proxies, None, freePos1)
-
-        contract.getUDF.setOutputGlobalIndexes(freePos2, fixedOutputs)
-      }
-
-      case contract : MapOperatorBase[_, _, _] with OneInputScalaOperator[_, _] => {
-
-        val freePos1 = globalizeContract(contract.getInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.inputFields), proxies, None, freePos)
-
-        contract.getUDF.setOutputGlobalIndexes(freePos1, fixedOutputs)
-      }
-
-      case contract : ReduceOperator with OneInputKeyedScalaOperator[_, _] => {
-
-        val freePos1 = globalizeContract(contract.getInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.inputFields, contract.key.inputFields), proxies, None, freePos)
-
-        contract.getUDF.setOutputGlobalIndexes(freePos1, fixedOutputs)
-      }
-
-      // for key-less (global) reducers
-      case contract : ReduceOperator with OneInputScalaOperator[_, _] => {
-
-        val freePos1 = globalizeContract(contract.getInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.inputFields), proxies, None, freePos)
-
-        contract.getUDF.setOutputGlobalIndexes(freePos1, fixedOutputs)
-      }
-
-      case contract : Union[_] with UnionScalaOperator[_] => {
-        
-        val freePos1 = globalizeContract(contract.getFirstInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.leftInputFields), proxies, fixedOutputs, freePos)
-        val freePos2 = globalizeContract(contract.getSecondInput().asInstanceOf[Operator[Record]], Seq(contract.getUDF.rightInputFields), proxies, fixedOutputs, freePos1)
-
-        contract.getUDF.setOutputGlobalIndexes(freePos2, fixedOutputs)
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaPrinter.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaPrinter.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaPrinter.scala
deleted file mode 100644
index 2d758ce..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/GlobalSchemaPrinter.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import org.apache.commons.logging.{LogFactory, Log}
-
-import scala.collection.JavaConversions.collectionAsScalaIterable
-import scala.Array.canBuildFrom
-
-import org.apache.flink.api.common.Plan
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.common.operators.DualInputOperator
-import org.apache.flink.api.common.operators.SingleInputOperator
-import org.apache.flink.api.common.operators.base.{BulkIterationBase => BulkIteration, DeltaIterationBase => DeltaIteration}
-import org.apache.flink.api.java.record.operators.GenericDataSink
-import org.apache.flink.api.common.operators.base.{BulkIterationBase => BulkIteration}
-import org.apache.flink.api.common.operators.base.{DeltaIterationBase => DeltaIteration}
-
-import Extractors.DataSourceNode
-import Extractors.DataSinkNode
-import Extractors.DeltaIterationNode
-import Extractors.JoinNode
-import Extractors.MapNode
-import Extractors.ReduceNode
-import Extractors.UnionNode
-
-object GlobalSchemaPrinter {
-
-  import Extractors._
-
-  private final val LOG: Log = LogFactory.getLog(classOf[GlobalSchemaGenerator])
-
-  def printSchema(plan: Plan): Unit = {
-
-    LOG.debug("### " + plan.getJobName + " ###")
-    plan.getDataSinks.foldLeft(Set[Operator[_]]())(printSchema)
-    LOG.debug("####" + ("#" * plan.getJobName.length) + "####")
-  }
-
-  private def printSchema(visited: Set[Operator[_]], node: Operator[_]): Set[Operator[_]] = {
-
-    visited.contains(node) match {
-
-      case true => visited
-
-      case false => {
-
-        val children = node match {
-          case bi: BulkIteration[_] => List(bi.getInput()) :+ bi.getNextPartialSolution()
-          case wi: DeltaIteration[_, _] => List(wi.getInitialSolutionSet()) :+ wi.getInitialWorkset() :+ wi.getSolutionSetDelta() :+ wi.getNextWorkset()
-          case si : SingleInputOperator[_, _, _] => List(si.getInput())
-          case di : DualInputOperator[_, _, _, _] => List(di.getFirstInput()) :+ di.getSecondInput()
-          case gds : GenericDataSink => List(gds.getInput())
-          case _ => List()
-        }
-        val newVisited = children.foldLeft(visited + node)(printSchema)
-
-        node match {
-          
-          case _ : BulkIteration.PartialSolutionPlaceHolder[_] =>
-          case _ : DeltaIteration.SolutionSetPlaceHolder[_] =>
-          case _ : DeltaIteration.WorksetPlaceHolder[_] =>
-
-          case DataSinkNode(udf, input) => {
-            printInfo(node, "Sink",
-              Seq(),
-              Seq(("", udf.inputFields)),
-              Seq(("", udf.getForwardIndexArrayFrom)),
-              Seq(("", udf.getDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-
-          case DataSourceNode(udf) => {
-            printInfo(node, "Source",
-              Seq(),
-              Seq(),
-              Seq(),
-              Seq(),
-              udf.outputFields
-            )
-          }
-
-          case CoGroupNode(udf, leftKey, rightKey, leftInput, rightInput) => {
-            printInfo(node, "CoGroup",
-              Seq(("L", leftKey), ("R", rightKey)),
-              Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
-              Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
-              Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-
-          case CrossNode(udf, leftInput, rightInput) => {
-            printInfo(node, "Cross",
-              Seq(),
-              Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
-              Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
-              Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-
-          case JoinNode(udf, leftKey, rightKey, leftInput, rightInput) => {
-            printInfo(node, "Join",
-              Seq(("L", leftKey), ("R", rightKey)),
-              Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
-              Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
-              Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-
-          case MapNode(udf, input) => {
-            printInfo(node, "Map",
-              Seq(),
-              Seq(("", udf.inputFields)),
-              Seq(("", udf.getForwardIndexArrayFrom)),
-              Seq(("", udf.getDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-          
-          case UnionNode(udf, leftInput, rightInput) => {
-            printInfo(node, "Union",
-              Seq(),
-              Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
-              Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
-              Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-
-          case ReduceNode(udf, key, input) => {
-
-//            val contract = node.asInstanceOf[Reduce4sContract[_, _, _]] 
-//            contract.userCombineCode map { _ =>
-//              printInfo(node, "Combine",
-//                Seq(("", key)),
-//                Seq(("", udf.inputFields)),
-//                Seq(("", contract.combineForwardSet.toArray)),
-//                Seq(("", contract.combineDiscardSet.toArray)),
-//                udf.inputFields
-//              )
-//            }
-
-            printInfo(node, "Reduce",
-              Seq(("", key)),
-              Seq(("", udf.inputFields)),
-              Seq(("", udf.getForwardIndexArrayFrom)),
-              Seq(("", udf.getDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-          case DeltaIterationNode(udf, key, input1, input2) => {
-
-            printInfo(node, "WorksetIterate",
-              Seq(("", key)),
-              Seq(),
-              Seq(),
-              Seq(),
-              udf.outputFields)
-          }
-
-          case BulkIterationNode(udf, input1) => {
-
-            printInfo(node, "BulkIterate",
-              Seq(),
-              Seq(),
-              Seq(),
-              Seq(),
-              udf.outputFields)
-          }
-        }
-
-        newVisited
-      }
-    }
-  }
-
-  private def printInfo(node: Operator[_], kind: String, keys: Seq[(String, FieldSelector)], reads: Seq[(String, FieldSet[_])], forwards: Seq[(String, Array[Int])], discards: Seq[(String, Array[Int])], writes: FieldSet[_]): Unit = {
-
-    def indexesToStrings(pre: String, indexes: Array[Int]) = indexes map {
-      case -1 => "_"
-      case i  => pre + i
-    }
-
-    val formatString = "%s (%s): K{%s}: R[%s] => F[%s] - D[%s] + W[%s]"
-
-    val name = node.getName
-
-    val sKeys = keys flatMap { case (pre, value) => value.selectedFields.toSerializerIndexArray.map(pre + _) } mkString ", "
-    val sReads = reads flatMap { case (pre, value) => indexesToStrings(pre, value.toSerializerIndexArray) } mkString ", "
-    val sForwards = forwards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", "
-    val sDiscards = discards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", "
-    val sWrites = indexesToStrings("", writes.toSerializerIndexArray) mkString ", "
-
-    LOG.debug(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedFunction.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedFunction.scala
deleted file mode 100644
index 2e9c203..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedFunction.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import scala.collection.mutable
-import scala.util.Either.MergeableEither
-
-
-abstract class UDF[R] extends Serializable {
-
-  val outputUDT: UDT[R]
-  val outputFields = FieldSet.newOutputSet(outputUDT)
-
-  def getOutputSerializer = outputUDT.getSerializer(outputFields.toSerializerIndexArray)
-
-  def getOutputLength = {
-    val indexes = outputFields.toIndexSet
-    if (indexes.isEmpty) {
-      0 
-    } else {
-      indexes.max + 1
-    }
-  }
-
-  def allocateOutputGlobalIndexes(startPos: Int): Int = {
-
-    outputFields.setGlobalized()
-
-    outputFields.map(_.globalPos).foldLeft(startPos) {
-      case (i, gPos @ GlobalPos.Unknown()) => gPos.setIndex(i); i + 1
-      case (i, _)                          => i
-    }
-    startPos
-  }
-
-  def assignOutputGlobalIndexes(sameAs: FieldSet[Field]): Unit = {
-
-    outputFields.setGlobalized()
-
-    outputFields.foreach {
-      case OutputField(localPos, globalPos) => globalPos.setReference(sameAs(localPos).globalPos)
-    }
-  }
-
-  def setOutputGlobalIndexes(startPos: Int, sameAs: Option[FieldSet[Field]]): Int = sameAs match {
-    case None         => allocateOutputGlobalIndexes(startPos)
-    case Some(sameAs) => assignOutputGlobalIndexes(sameAs); startPos
-  }
-
-  def attachOutputsToInputs(inputFields: FieldSet[InputField]): Unit = {
-
-    inputFields.setGlobalized()
-
-    inputFields.foreach {
-      case InputField(localPos, globalPos) => globalPos.setReference(outputFields(localPos).globalPos)
-    }
-  }
-
-  protected def markFieldCopied(inputGlobalPos: GlobalPos, outputLocalPos: Int): Unit = {
-    val outputField = outputFields(outputLocalPos)
-    outputField.globalPos.setReference(inputGlobalPos)
-    outputField.isUsed = false
-  }
-}
-
-class UDF0[R](val outputUDT: UDT[R]) extends UDF[R]
-
-class UDF1[T, R](val inputUDT: UDT[T], val outputUDT: UDT[R]) extends UDF[R] {
-
-  val inputFields = FieldSet.newInputSet(inputUDT)
-  val forwardSet = mutable.Set[(InputField, OutputField)]()
-  val discardSet = mutable.Set[GlobalPos]()
-
-  def getInputDeserializer = inputUDT.getSerializer(inputFields.toSerializerIndexArray)
-  def getForwardIndexSetFrom = forwardSet.map(_._1.localPos)
-  def getForwardIndexSetTo = forwardSet.map(_._2.localPos)
-  def getForwardIndexArrayFrom = getForwardIndexSetFrom.toArray
-  def getForwardIndexArrayTo = getForwardIndexSetTo.toArray
-  def getDiscardIndexArray = discardSet.map(_.getValue).toArray
-
-  override def getOutputLength = {
-    val forwardMax = if (forwardSet.isEmpty) -1 else forwardSet.map(_._2.localPos).max
-    math.max(super.getOutputLength, forwardMax + 1)
-  }
-
-  def markInputFieldUnread(localPos: Int): Unit = {
-    inputFields(localPos).isUsed = false
-  }
-
-  def markFieldCopied(inputLocalPos: Int, outputLocalPos: Int): Unit = {
-    val inputField = inputFields(inputLocalPos)
-    val inputGlobalPos = inputField.globalPos
-    forwardSet.add((inputField, outputFields(outputLocalPos)))
-    markFieldCopied(inputGlobalPos, outputLocalPos)
-  }
-}
-
-class UDF2[T1, T2, R](val leftInputUDT: UDT[T1], val rightInputUDT: UDT[T2], val outputUDT: UDT[R]) extends UDF[R] {
-
-  val leftInputFields = FieldSet.newInputSet(leftInputUDT)
-  val leftForwardSet = mutable.Set[(InputField, OutputField)]()
-  val leftDiscardSet = mutable.Set[GlobalPos]()
-
-  val rightInputFields = FieldSet.newInputSet(rightInputUDT)
-  val rightForwardSet = mutable.Set[(InputField, OutputField)]()
-  val rightDiscardSet = mutable.Set[GlobalPos]()
-
-  def getLeftInputDeserializer = leftInputUDT.getSerializer(leftInputFields.toSerializerIndexArray)
-  def getLeftForwardIndexSetFrom = leftForwardSet.map(_._1.localPos)
-  def getLeftForwardIndexSetTo = leftForwardSet.map(_._2.localPos)
-  def getLeftForwardIndexArrayFrom = getLeftForwardIndexSetFrom.toArray
-  def getLeftForwardIndexArrayTo = getLeftForwardIndexSetTo.toArray
-  def getLeftDiscardIndexArray = leftDiscardSet.map(_.getValue).toArray
-
-  def getRightInputDeserializer = rightInputUDT.getSerializer(rightInputFields.toSerializerIndexArray)
-  def getRightForwardIndexSetFrom = rightForwardSet.map(_._1.localPos)
-  def getRightForwardIndexSetTo = rightForwardSet.map(_._2.localPos)
-  def getRightForwardIndexArrayFrom = getRightForwardIndexSetFrom.toArray
-  def getRightForwardIndexArrayTo = getRightForwardIndexSetTo.toArray
-  def getRightDiscardIndexArray = rightDiscardSet.map(_.getValue).toArray
-
-  override def getOutputLength = {
-    val leftForwardMax = if (leftForwardSet.isEmpty) -1 else leftForwardSet.map(_._2.localPos).max
-    val rightForwardMax = if (rightForwardSet.isEmpty) -1 else rightForwardSet.map(_._2.localPos).max
-    math.max(super.getOutputLength, math.max(leftForwardMax, rightForwardMax) + 1)
-  }
-
-  private def getInputField(localPos: Either[Int, Int]): InputField = localPos match {
-    case Left(pos)  => leftInputFields(pos)
-    case Right(pos) => rightInputFields(pos)
-  }
-
-  def markInputFieldUnread(localPos: Either[Int, Int]): Unit = {
-    localPos.fold(leftInputFields(_), rightInputFields(_)).isUsed = false
-  }
-
-  def markFieldCopied(inputLocalPos: Either[Int, Int], outputLocalPos: Int): Unit = {
-    val (inputFields, forwardSet) = inputLocalPos.fold(_ => (leftInputFields, leftForwardSet), _ => (rightInputFields, rightForwardSet))
-    val inputField = inputFields(inputLocalPos.merge)
-    val inputGlobalPos = inputField.globalPos
-    forwardSet.add((inputField, outputFields(outputLocalPos)))
-    markFieldCopied(inputGlobalPos, outputLocalPos)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedType.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedType.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedType.scala
deleted file mode 100644
index aaa3b7b..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/UserDefinedType.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis
-
-import scala.language.experimental.macros
-import scala.language.postfixOps
-
-import org.apache.flink.api.scala.codegen.Util
-
-import org.apache.flink.types.{Key => PactKey}
-import org.apache.flink.types.Record
-import org.apache.flink.types.{Value => PactValue}
-import org.apache.flink.types.StringValue
-
-
-abstract class UDT[T] extends Serializable {
-  protected def createSerializer(indexMap: Array[Int]): UDTSerializer[T]
-  val fieldTypes: Array[Class[_ <: org.apache.flink.types.Value]]
-  val udtIdMap: Map[Int, Int]
-  
-  def numFields = fieldTypes.length
-
-  def getSelectionIndices(selection: List[Int]) = { 
-    selection map { udtIdMap.getOrElse(_, -1) }
-  }
-
-  def getKeySet(fields: Seq[Int]): Array[Class[_ <: PactKey[_]]] = {
-    fields map { fieldNum => fieldTypes(fieldNum).asInstanceOf[Class[_ <: PactKey[_]]] } toArray
-  }
-
-  def getSerializer(indexMap: Array[Int]): UDTSerializer[T] = {
-    val ser = createSerializer(indexMap)
-    ser
-  }
-
-  @transient private var defaultSerializer: UDTSerializer[T] = null
-
-  def getSerializerWithDefaultLayout: UDTSerializer[T] = {
-    // This method will be reentrant if T is a recursive type
-    if (defaultSerializer == null) {
-      defaultSerializer = createSerializer((0 until numFields) toArray)
-    }
-    defaultSerializer
-  }
-}
-
-abstract class UDTSerializer[T](val indexMap: Array[Int]) {
-  def serialize(item: T, record: Record)
-  def deserializeRecyclingOff(record: Record): T
-  def deserializeRecyclingOn(record: Record): T
-}
-
-trait UDTLowPriorityImplicits {
-  implicit def createUDT[T]: UDT[T] = macro Util.createUDTImpl[T]
-}
-
-object UDT extends UDTLowPriorityImplicits {
-
-  // UDTs needed by library code
-
-  object NothingUDT extends UDT[Nothing] {
-    override val fieldTypes = Array[Class[_ <: PactValue]]()
-    override val udtIdMap: Map[Int, Int] = Map()
-    override def createSerializer(indexMap: Array[Int]) = throw new UnsupportedOperationException("Cannot create UDTSerializer for type Nothing")
-  }
-
-  object StringUDT extends UDT[String] {
-
-    override val fieldTypes = Array[Class[_ <: PactValue]](classOf[StringValue])
-    override val udtIdMap: Map[Int, Int] = Map()
-
-    override def createSerializer(indexMap: Array[Int]) = new UDTSerializer[String](indexMap) {
-
-      private val index = indexMap(0)
-
-      @transient private var pactField = new StringValue()
-
-//      override def getFieldIndex(selection: Seq[String]): List[Int] = selection match {
-//        case Seq() => List(index)
-////        case _     => invalidSelection(selection)
-//        case _     => throw new RuntimeException("Invalid selection: " + selection)
-//      }
-
-      override def serialize(item: String, record: Record) = {
-        if (index >= 0) {
-          pactField.setValue(item)
-          record.setField(index, pactField)
-        }
-      }
-
-      override def deserializeRecyclingOff(record: Record): String = {
-        if (index >= 0) {
-          record.getFieldInto(index, pactField)
-          pactField.getValue()
-        } else {
-          null
-        }
-      }
-
-      override def deserializeRecyclingOn(record: Record): String = {
-        if (index >= 0) {
-          record.getFieldInto(index, pactField)
-          pactField.getValue()
-        } else {
-          null
-        }
-      }
-
-      private def readObject(in: java.io.ObjectInputStream) = {
-        in.defaultReadObject()
-        pactField = new StringValue()
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/AmbientFieldDetector.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/AmbientFieldDetector.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/AmbientFieldDetector.scala
deleted file mode 100644
index df7d77f..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/AmbientFieldDetector.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.apache.flink.api.scala.analysis.postPass;
-// Comment out because this is not working right now
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//package org.apache.flink.api.scala.analysis.postPass
-//
-//import scala.collection.mutable
-//import scala.collection.JavaConversions._
-//
-//import org.apache.flink.api.scala.analysis._
-//import org.apache.flink.api.scala.contracts._
-//
-//import org.apache.flink.pact.compiler.plan._
-//import org.apache.flink.pact.compiler.plan.candidate.OptimizedPlan
-//
-//object AmbientFieldDetector {
-//
-//  import Extractors._
-//  import EdgeDependencySets.EdgeDependencySet
-//
-//  def updateAmbientFields(plan: OptimizedPlan, edgeDependencies: Map[PactConnection, EdgeDependencySet], outputPositions: Map[Int, GlobalPos]): Unit = {
-//    plan.getDataSinks.map(_.getSinkNode).foldLeft(Set[OptimizerNode]())(updateAmbientFields(outputPositions, edgeDependencies))
-//  }
-//
-//  private def updateAmbientFields(outputPositions: Map[Int, GlobalPos], edgeDependencies: Map[PactConnection, EdgeDependencySet])(visited: Set[OptimizerNode], node: OptimizerNode): Set[OptimizerNode] = {
-//
-//    visited.contains(node) match {
-//
-//      case true => visited
-//
-//      case false => {
-//        node match {
-//
-//          case _: SinkJoiner | _: BinaryUnionNode =>
-//          case DataSinkNode(udf, input)                       =>
-//          case DataSourceNode(udf)                            =>
-//
-//          case CoGroupNode(udf, _, _, leftInput, rightInput) => {
-//
-//            val leftProvides = edgeDependencies(leftInput).childProvides
-//            val rightProvides = edgeDependencies(rightInput).childProvides
-//            val parentNeeds = edgeDependencies(node.getOutgoingConnections.head).childProvides
-//            val writes = udf.outputFields.toIndexSet
-//
-//            populateSets(udf.leftForwardSet, udf.leftDiscardSet, leftProvides, parentNeeds, writes, outputPositions)
-//            populateSets(udf.rightForwardSet, udf.rightDiscardSet, rightProvides, parentNeeds, writes, outputPositions)
-//          }
-//
-//          case CrossNode(udf, leftInput, rightInput) => {
-//
-//            val leftProvides = edgeDependencies(leftInput).childProvides
-//            val rightProvides = edgeDependencies(rightInput).childProvides
-//            val parentNeeds = edgeDependencies(node.getOutgoingConnections.head).childProvides
-//            val writes = udf.outputFields.toIndexSet
-//
-//            populateSets(udf.leftForwardSet, udf.leftDiscardSet, leftProvides, parentNeeds, writes, outputPositions)
-//            populateSets(udf.rightForwardSet, udf.rightDiscardSet, rightProvides, parentNeeds, writes, outputPositions)
-//          }
-//
-//          case JoinNode(udf, _, _, leftInput, rightInput) => {
-//
-//            val leftProvides = edgeDependencies(leftInput).childProvides
-//            val rightProvides = edgeDependencies(rightInput).childProvides
-//            val parentNeeds = edgeDependencies(node.getOutgoingConnections.head).childProvides
-//            val writes = udf.outputFields.toIndexSet
-//
-//            populateSets(udf.leftForwardSet, udf.leftDiscardSet, leftProvides, parentNeeds, writes, outputPositions)
-//            populateSets(udf.rightForwardSet, udf.rightDiscardSet, rightProvides, parentNeeds, writes, outputPositions)
-//          }
-//
-//          case MapNode(udf, input) => {
-//
-//            val inputProvides = edgeDependencies(input).childProvides
-//            val parentNeeds = edgeDependencies(node.getOutgoingConnections.head).childProvides
-//            val writes = udf.outputFields.toIndexSet
-//
-//            populateSets(udf.forwardSet, udf.discardSet, inputProvides, parentNeeds, writes, outputPositions)
-//          }
-//
-//          case ReduceNode(udf, _, input) => {
-//            val inputProvides = edgeDependencies(input).childProvides
-//            val parentNeeds = edgeDependencies(node.getOutgoingConnections.head).childProvides
-//            val writes = udf.outputFields.toIndexSet
-//
-//            populateSets(udf.forwardSet, udf.discardSet, inputProvides, parentNeeds, writes, outputPositions)
-//          }
-//        }
-//
-//        node.getIncomingConnections.map(_.getSource).foldLeft(visited + node)(updateAmbientFields(outputPositions, edgeDependencies))
-//      }
-//    }
-//  }
-//
-//  private def populateSets(forwards: mutable.Set[GlobalPos], discards: mutable.Set[GlobalPos], childProvides: Set[Int], parentNeeds: Set[Int], writes: Set[Int], outputPositions: Map[Int, GlobalPos]): Unit = {
-//    forwards.clear()
-//    forwards.addAll((parentNeeds -- writes).intersect(childProvides).map(outputPositions(_)))
-//
-//    discards.clear()
-//    discards.addAll((childProvides -- parentNeeds -- writes).intersect(childProvides).map(outputPositions(_)))
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/EdgeDependencySets.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/EdgeDependencySets.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/EdgeDependencySets.scala
deleted file mode 100644
index c92cef5..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/EdgeDependencySets.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-package org.apache.flink.api.scala.analysis.postPass;
-// Comment out because this is not working right now
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//
-//package org.apache.flink.api.scala.analysis.postPass
-//
-//import scala.collection.JavaConversions._
-//
-//import org.apache.flink.api.scala.analysis._
-//import org.apache.flink.api.scala.contracts._
-//
-//import org.apache.flink.pact.compiler.plan._
-//import org.apache.flink.pact.compiler.plan.candidate.OptimizedPlan
-//
-//object EdgeDependencySets {
-//
-//  import Extractors._
-//
-//  case class EdgeDependencySet(parentNeeds: Set[Int], childProvides: Set[Int] = Set())
-//
-//  def computeEdgeDependencySets(plan: OptimizedPlan, outputSets: Map[OptimizerNode, Set[Int]]): Map[PactConnection, EdgeDependencySet] = {
-//
-//    plan.getDataSinks.map(_.getSinkNode).foldLeft(Map[PactConnection, EdgeDependencySet]())(computeEdgeDependencySets(outputSets))
-//  }
-//
-//  private def computeEdgeDependencySets(outputSets: Map[OptimizerNode, Set[Int]])(edgeDependencySets: Map[PactConnection, EdgeDependencySet], node: OptimizerNode): Map[PactConnection, EdgeDependencySet] = {
-//
-//    // breadth-first traversal: parentNeeds will be None if any parent has not yet been visited
-//    val parentNeeds = node.getOutgoingConnections().foldLeft(Option(Set[Int]())) {
-//      case (None, _)           => None
-//      case (Some(acc), parent) => edgeDependencySets.get(parent) map { acc ++ _.parentNeeds }
-//    }
-//
-//    parentNeeds match {
-//      case None              => edgeDependencySets
-//      case Some(parentNeeds) => computeEdgeDependencySets(node, parentNeeds, outputSets, edgeDependencySets)
-//    }
-//  }
-//
-//  private def computeEdgeDependencySets(node: OptimizerNode, parentNeeds: Set[Int], outputSets: Map[OptimizerNode, Set[Int]], edgeDependencySets: Map[PactConnection, EdgeDependencySet]): Map[PactConnection, EdgeDependencySet] = {
-//
-//    def updateEdges(needs: (PactConnection, Set[Int])*): Map[PactConnection, EdgeDependencySet] = {
-//
-//      val updParents = node.getOutgoingConnections().foldLeft(edgeDependencySets) { (edgeDependencySets, parent) =>
-//        val entry = edgeDependencySets(parent)
-//        edgeDependencySets.updated(parent, entry.copy(childProvides = parentNeeds))
-//      }
-//
-//      needs.foldLeft(updParents) {
-//        case (edgeDependencySets, (inConn, needs)) => {
-//          val updInConn = edgeDependencySets.updated(inConn, EdgeDependencySet(needs))
-//          computeEdgeDependencySets(outputSets)(updInConn, inConn.getSource)
-//        }
-//      }
-//    }
-//
-//    for (udf <- node.getUDF) {
-//
-//      // suppress outputs that aren't needed by any parent
-//      val writeFields = udf.outputFields filter { _.isUsed }
-//      val unused = writeFields filterNot { f => parentNeeds.contains(f.globalPos.getValue) }
-//
-//      for (field <- unused) {
-//        field.isUsed = false
-//        if (field.globalPos.isIndex)
-//          field.globalPos.setIndex(Int.MinValue)
-//      }
-//    }
-//
-//    node match {
-//
-//      case DataSinkNode(udf, input) => {
-//        val needs = udf.inputFields.toIndexSet
-//        updateEdges(input -> needs)
-//      }
-//
-//      case DataSourceNode(udf) => {
-//        updateEdges()
-//      }
-//
-//      case CoGroupNode(udf, leftKey, rightKey, leftInput, rightInput) => {
-//
-//        val leftReads = udf.leftInputFields.toIndexSet ++ leftKey.selectedFields.toIndexSet
-//        val rightReads = udf.rightInputFields.toIndexSet ++ rightKey.selectedFields.toIndexSet
-//        val writes = udf.outputFields.toIndexSet
-//
-//        val parentPreNeeds = parentNeeds -- writes
-//
-//        val parentLeftNeeds = parentPreNeeds.intersect(outputSets(leftInput.getSource))
-//        val parentRightNeeds = parentPreNeeds.intersect(outputSets(rightInput.getSource))
-//
-//        val leftForwards = udf.leftForwardSet.map(_.getValue)
-//        val rightForwards = udf.rightForwardSet.map(_.getValue)
-//
-//        val (leftRes, rightRes) = parentLeftNeeds.intersect(parentRightNeeds).partition {
-//          case index if leftForwards(index) && !rightForwards(index) => true
-//          case index if !leftForwards(index) && rightForwards(index) => false
-//          case _                                                     => throw new UnsupportedOperationException("Schema conflict: cannot forward the same field from both sides of a two-input operator.")
-//        }
-//
-//        val leftNeeds = (parentLeftNeeds -- rightRes) ++ leftReads
-//        val rightNeeds = (parentRightNeeds -- leftRes) ++ rightReads
-//
-//        updateEdges(leftInput -> leftNeeds, rightInput -> rightNeeds)
-//      }
-//
-//      case CrossNode(udf, leftInput, rightInput) => {
-//
-//        val leftReads = udf.leftInputFields.toIndexSet
-//        val rightReads = udf.rightInputFields.toIndexSet
-//        val writes = udf.outputFields.toIndexSet
-//
-//        val parentPreNeeds = parentNeeds -- writes
-//
-//        val parentLeftNeeds = parentPreNeeds.intersect(outputSets(leftInput.getSource))
-//        val parentRightNeeds = parentPreNeeds.intersect(outputSets(rightInput.getSource))
-//
-//        val leftForwards = udf.leftForwardSet.map(_.getValue)
-//        val rightForwards = udf.rightForwardSet.map(_.getValue)
-//
-//        val (leftRes, rightRes) = parentLeftNeeds.intersect(parentRightNeeds).partition {
-//          case index if leftForwards(index) && !rightForwards(index) => true
-//          case index if !leftForwards(index) && rightForwards(index) => false
-//          case _                                                     => throw new UnsupportedOperationException("Schema conflict: cannot forward the same field from both sides of a two-input operator.")
-//        }
-//
-//        val leftNeeds = (parentLeftNeeds -- rightRes) ++ leftReads
-//        val rightNeeds = (parentRightNeeds -- leftRes) ++ rightReads
-//
-//        updateEdges(leftInput -> leftNeeds, rightInput -> rightNeeds)
-//      }
-//
-//      case JoinNode(udf, leftKey, rightKey, leftInput, rightInput) => {
-//
-//        val leftReads = udf.leftInputFields.toIndexSet ++ leftKey.selectedFields.toIndexSet
-//        val rightReads = udf.rightInputFields.toIndexSet ++ rightKey.selectedFields.toIndexSet
-//        val writes = udf.outputFields.toIndexSet
-//
-//        val parentPreNeeds = parentNeeds -- writes
-//
-//        val parentLeftNeeds = parentPreNeeds.intersect(outputSets(leftInput.getSource))
-//        val parentRightNeeds = parentPreNeeds.intersect(outputSets(rightInput.getSource))
-//
-//        val leftForwards = udf.leftForwardSet.map(_.getValue)
-//        val rightForwards = udf.rightForwardSet.map(_.getValue)
-//
-//        val (leftRes, rightRes) = parentLeftNeeds.intersect(parentRightNeeds).partition {
-//          case index if leftForwards(index) && !rightForwards(index) => true
-//          case index if !leftForwards(index) && rightForwards(index) => false
-//          case _                                                     => throw new UnsupportedOperationException("Schema conflict: cannot forward the same field from both sides of a two-input operator.")
-//        }
-//
-//        val leftNeeds = (parentLeftNeeds -- rightRes) ++ leftReads
-//        val rightNeeds = (parentRightNeeds -- leftRes) ++ rightReads
-//
-//        updateEdges(leftInput -> leftNeeds, rightInput -> rightNeeds)
-//      }
-//
-//      case MapNode(udf, input) => {
-//
-//        val reads = udf.inputFields.toIndexSet
-//        val writes = udf.outputFields.toIndexSet
-//
-//        val needs = parentNeeds -- writes ++ reads
-//
-//        updateEdges(input -> needs)
-//      }
-//
-//      case ReduceNode(udf, key, input) => {
-//
-//        val reads = udf.inputFields.toIndexSet ++ key.selectedFields.toIndexSet
-//        val writes = udf.outputFields.toIndexSet
-//
-//        val needs = parentNeeds -- writes ++ reads
-//
-//        updateEdges(input -> needs)
-//      }
-//
-//      case _: SinkJoiner | _: BinaryUnionNode => {
-//        updateEdges(node.getIncomingConnections.map(_ -> parentNeeds): _*)
-//      }
-//    }
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/Extractors.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/Extractors.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/Extractors.scala
deleted file mode 100644
index 09e32d5..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/Extractors.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis.postPass
-
-import scala.language.implicitConversions
-
-import scala.Some
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.analysis.FieldSelector
-import org.apache.flink.api.scala.analysis.UDF
-import org.apache.flink.api.scala.analysis.UDF0
-import org.apache.flink.api.scala.analysis.UDF1
-import org.apache.flink.api.scala.analysis.UDF2
-
-import org.apache.flink.api.java.record.operators.CoGroupOperator
-import org.apache.flink.api.java.record.operators.CrossOperator
-import org.apache.flink.api.java.record.operators.MapOperator
-import org.apache.flink.api.java.record.operators.JoinOperator
-import org.apache.flink.api.java.record.operators.ReduceOperator
-import org.apache.flink.compiler.dag.BinaryUnionNode
-import org.apache.flink.compiler.dag.BulkIterationNode
-import org.apache.flink.compiler.dag.CoGroupNode
-import org.apache.flink.compiler.dag.CrossNode
-import org.apache.flink.compiler.dag.DataSinkNode
-import org.apache.flink.compiler.dag.DataSourceNode
-import org.apache.flink.compiler.dag.CollectorMapNode
-import org.apache.flink.compiler.dag.MatchNode
-import org.apache.flink.compiler.dag.OptimizerNode
-import org.apache.flink.compiler.dag.PactConnection
-import org.apache.flink.compiler.dag.GroupReduceNode
-import org.apache.flink.compiler.dag.SinkJoiner
-import org.apache.flink.compiler.dag.WorksetIterationNode
-import org.apache.flink.api.common.operators.Union
-import org.apache.flink.api.common.operators.base.{BulkIterationBase => BulkIteration, DeltaIterationBase => DeltaIteration, GenericDataSinkBase, GenericDataSourceBase}
-
-object Extractors {
-
-  implicit def nodeToGetUDF(node: OptimizerNode) = new {
-    def getUDF: Option[UDF[_]] = node match {
-      case _: SinkJoiner | _: BinaryUnionNode => None
-      case _ => {
-        Some(node.getPactContract.asInstanceOf[ScalaOperator[_, _]].getUDF)
-      }
-    }
-  }
-
-  object DataSinkNode {
-    def unapply(node: OptimizerNode): Option[(UDF1[_, _], PactConnection)] = node match {
-      case node: DataSinkNode => node.getPactContract match {
-        case contract: GenericDataSinkBase[_] with ScalaOutputOperator[_] => {
-          Some((contract.getUDF, node.getInputConnection))
-        }
-        case _  => None
-      }
-      case _ => None
-    }
-  }
-
-  object DataSourceNode {
-    def unapply(node: OptimizerNode): Option[(UDF0[_])] = node match {
-      case node: DataSourceNode => node.getPactContract() match {
-        case contract: GenericDataSourceBase[_, _] with ScalaOperator[_, _] => Some(contract.getUDF.asInstanceOf[UDF0[_]])
-        case _ => None
-      }
-      case _ => None
-    }
-  }
-
-  object CoGroupNode {
-    def unapply(node: OptimizerNode): Option[(UDF2[_, _, _], FieldSelector, FieldSelector, PactConnection, PactConnection)] = node match {
-      case node: CoGroupNode => node.getPactContract() match {
-        case contract: CoGroupOperator with TwoInputKeyedScalaOperator[_, _, _] => Some((contract.getUDF, contract.leftKey, contract.rightKey, node.getFirstIncomingConnection, node.getSecondIncomingConnection))
-        case _ => None
-      }
-      case _ => None
-    }
-  }
-
-  object CrossNode {
-    def unapply(node: OptimizerNode): Option[(UDF2[_, _, _], PactConnection, PactConnection)] = node match {
-      case node: CrossNode => node.getPactContract match {
-        case contract: CrossOperator with TwoInputScalaOperator[_, _, _] => Some((contract.getUDF, node.getFirstIncomingConnection, node.getSecondIncomingConnection))
-        case _ => None
-      }
-      case _ => None
-    }
-  }
-
-  object JoinNode {
-    def unapply(node: OptimizerNode): Option[(UDF2[_, _, _], FieldSelector, FieldSelector, PactConnection, PactConnection)] = node match {
-      case node: MatchNode => node.getPactContract match {
-        case contract: JoinOperator with TwoInputKeyedScalaOperator[_, _, _] => Some((contract.getUDF, contract.leftKey, contract.rightKey, node.getFirstIncomingConnection, node.getSecondIncomingConnection))
-        case _ => None
-      }
-      case _ => None
-    }
-  }
-
-  object MapNode {
-    def unapply(node: OptimizerNode): Option[(UDF1[_, _], PactConnection)] = node match {
-      case node: CollectorMapNode => node.getPactContract match {
-        case contract: MapOperator with OneInputScalaOperator[_, _] => Some((contract.getUDF, node.getIncomingConnection))
-        case _ => None
-      }
-      case _ => None
-    }
-  }
-  
-  object UnionNode {
-    def unapply(node: OptimizerNode): Option[(UDF2[_, _, _], PactConnection, PactConnection)] = node match {
-      case node: BinaryUnionNode => node.getPactContract match {
-        case contract: Union[_] with UnionScalaOperator[_] => Some((contract.getUDF, node.getFirstIncomingConnection(), node.getSecondIncomingConnection()))
-        case _ => None
-      }
-      case _ => None
-    }
-  }
-
-  object ReduceNode {
-    def unapply(node: OptimizerNode): Option[(UDF1[_, _], FieldSelector, PactConnection)] = node match {
-      case node: GroupReduceNode => node.getPactContract match {
-        case contract: ReduceOperator with OneInputKeyedScalaOperator[_, _] => Some((contract.getUDF, contract.key, node.getIncomingConnection))
-        case contract: ReduceOperator with OneInputScalaOperator[_, _] => Some((contract.getUDF, new FieldSelector(contract.getUDF.inputUDT, Nil), node.getIncomingConnection))
-        case _ => None
-      }
-      case _ => None
-    }
-  }
-  object DeltaIterationNode {
-    def unapply(node: OptimizerNode): Option[(UDF0[_], FieldSelector, PactConnection, PactConnection)] = node match {
-      case node: WorksetIterationNode => node.getPactContract match {
-        case contract: DeltaIteration[_, _] with DeltaIterationScalaOperator[_] => Some((contract.getUDF, contract.key, node.getFirstIncomingConnection(), node.getSecondIncomingConnection()))
-        case _                                  => None
-      }
-      case _ => None
-    }
-  }
-  
-  object BulkIterationNode {
-    def unapply(node: OptimizerNode): Option[(UDF0[_], PactConnection)] = node match {
-      case node: BulkIterationNode => node.getPactContract match {
-        case contract: BulkIteration[_] with BulkIterationScalaOperator[_] => Some((contract.getUDF, node.getIncomingConnection()))
-        case _                                  => None
-      }
-      case _ => None
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaCompactor.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaCompactor.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaCompactor.scala
deleted file mode 100644
index e74695f..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaCompactor.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-package org.apache.flink.api.scala.analysis.postPass;
-// Comment out because this is not working right now
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-//
-//package org.apache.flink.api.scala.analysis.postPass
-//
-//import scala.collection.mutable
-//import scala.collection.JavaConversions._
-//
-//import org.apache.flink.api.scala.analysis._
-//import org.apache.flink.api.scala.contracts._
-//
-//import org.apache.flink.pact.compiler.plan._
-//import org.apache.flink.pact.compiler.plan.candidate.OptimizedPlan
-//
-//object GlobalSchemaCompactor {
-//
-//  import Extractors._
-//
-//  def compactSchema(plan: OptimizedPlan): Unit = {
-//
-//    val (_, conflicts) = plan.getDataSinks.map(_.getSinkNode).foldLeft((Set[OptimizerNode](), Map[GlobalPos, Set[GlobalPos]]())) {
-//      case ((visited, conflicts), node) => findConflicts(node, visited, conflicts)
-//    }
-//
-//    // Reset all position indexes before reassigning them
-//    conflicts.keys.foreach { _.setIndex(Int.MinValue) }
-//
-//    plan.getDataSinks.map(_.getSinkNode).foldLeft(Set[OptimizerNode]())(compactSchema(conflicts))
-//  }
-//
-//  /**
-//   * Two fields are in conflict when they exist in the same place (record) at the same time (plan node).
-//   * If two fields are in conflict, then they must be assigned different indexes.
-//   *
-//   * p1 conflictsWith p2 =
-//   *   Exists(n in Nodes):
-//   *     p1 != p2 &&
-//   *     (
-//   *       (p1 in n.forwards && p2 in n.forwards) ||
-//   *       (p1 in n.forwards && p2 in n.outputs) ||
-//   *       (p2 in n.forwards && p1 in n.outputs)
-//   *     )
-//   */
-//  private def findConflicts(node: OptimizerNode, visited: Set[OptimizerNode], conflicts: Map[GlobalPos, Set[GlobalPos]]): (Set[OptimizerNode], Map[GlobalPos, Set[GlobalPos]]) = {
-//
-//    visited.contains(node) match {
-//
-//      case true => (visited, conflicts)
-//
-//      case false => {
-//
-//        val (forwardPos, outputFields) = node.getUDF match {
-//          case None                     => (Set[GlobalPos](), Set[OutputField]())
-//          case Some(udf: UDF0[_])       => (Set[GlobalPos](), udf.outputFields.toSet)
-//          case Some(udf: UDF1[_, _])    => (udf.forwardSet, udf.outputFields.toSet)
-//          case Some(udf: UDF2[_, _, _]) => (udf.leftForwardSet ++ udf.rightForwardSet, udf.outputFields.toSet)
-//          case _        => (Set[GlobalPos](), Set[OutputField]())
-//        }
-//
-//        // resolve GlobalPos references to the instance that holds the actual index
-//        val forwards = forwardPos map { _.resolve }
-//        val outputs = outputFields filter { _.isUsed } map { _.globalPos.resolve }
-//
-//        val newConflictsF = forwards.foldLeft(conflicts) {
-//          case (conflicts, fPos) => {
-//            // add all other forwards and all outputs to this forward's conflict set
-//            val fConflicts = conflicts.getOrElse(fPos, Set()) ++ (forwards filterNot { _ == fPos }) ++ outputs
-//            conflicts.updated(fPos, fConflicts)
-//          }
-//        }
-//
-//        val newConflictsO = outputs.foldLeft(newConflictsF) {
-//          case (conflicts, oPos) => {
-//            // add all forwards to this output's conflict set
-//            val oConflicts = conflicts.getOrElse(oPos, Set()) ++ forwards
-//            conflicts.updated(oPos, oConflicts)
-//          }
-//        }
-//
-//        node.getIncomingConnections.map(_.getSource).foldLeft((visited + node, newConflictsO)) {
-//          case ((visited, conflicts), node) => findConflicts(node, visited, conflicts)
-//        }
-//      }
-//    }
-//  }
-//
-//  /**
-//   * Assign indexes bottom-up, giving lower values to fields with larger conflict sets.
-//   * This ordering should do a decent job of minimizing the number of gaps between fields.
-//   */
-//  private def compactSchema(conflicts: Map[GlobalPos, Set[GlobalPos]])(visited: Set[OptimizerNode], node: OptimizerNode): Set[OptimizerNode] = {
-//
-//    visited.contains(node) match {
-//
-//      case true => visited
-//
-//      case false => {
-//
-//        val newVisited = node.getIncomingConnections.map(_.getSource).foldLeft(visited + node)(compactSchema(conflicts))
-//
-//        val outputFields = node.getUDF match {
-//          case None      => Seq[OutputField]()
-//          case Some(udf) => udf.outputFields filter { _.isUsed }
-//        }
-//
-//        val outputs = outputFields map {
-//          case field => {
-//            val pos = field.globalPos.resolve
-//            (pos, field.localPos, conflicts(pos) map { _.getValue })
-//          }
-//        } sortBy {
-//          case (_, localPos, posConflicts) => (Int.MaxValue - posConflicts.size, localPos)
-//        }
-//
-//        val initUsed = outputs map { _._1.getValue } filter { _ >= 0 } toSet
-//
-//        val used = outputs.filter(_._1.getValue < 0).foldLeft(initUsed) {
-//          case (used, (pos, _, conflicts)) => {
-//            val index = chooseIndexValue(used ++ conflicts)
-//            pos.setIndex(index)
-//            used + index
-//          }
-//        }
-//
-//        node.getUDF match {
-//          case Some(udf: UDF1[_, _])    => updateDiscards(used, udf.discardSet)
-//          case Some(udf: UDF2[_, _, _]) => updateDiscards(used, udf.leftDiscardSet, udf.rightDiscardSet)
-//          case _                        =>
-//        }
-//
-//        newVisited
-//      }
-//    }
-//  }
-//
-//  private def chooseIndexValue(used: Set[Int]): Int = {
-//    var index = 0
-//    while (used(index)) {
-//      index = index + 1
-//    }
-//    index
-//  }
-//
-//  private def updateDiscards(outputs: Set[Int], discardSets: mutable.Set[GlobalPos]*): Unit = {
-//    for (discardSet <- discardSets) {
-//
-//      val overwrites = discardSet filter { pos => outputs.contains(pos.getValue) } toList
-//
-//      for (pos <- overwrites)
-//        discardSet.remove(pos)
-//    }
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaOptimizer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaOptimizer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaOptimizer.scala
deleted file mode 100644
index 607a41d..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaOptimizer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis.postPass
-
-import scala.collection.JavaConversions.asScalaBuffer
-import scala.collection.JavaConversions.collectionAsScalaIterable
-
-import org.apache.flink.api.scala.ScalaOperator
-
-import org.apache.flink.compiler.dag.OptimizerNode
-import org.apache.flink.compiler.plan.OptimizedPlan
-
-
-trait GlobalSchemaOptimizer {
-
-  import Extractors._
-
-  def optimizeSchema(plan: OptimizedPlan, compactSchema: Boolean): Unit = {
-
-//    val (outputSets, outputPositions) = OutputSets.computeOutputSets(plan)
-//    val edgeSchemas = EdgeDependencySets.computeEdgeDependencySets(plan, outputSets)
-//
-//    AmbientFieldDetector.updateAmbientFields(plan, edgeSchemas, outputPositions)
-//
-//    if (compactSchema) {
-//      GlobalSchemaCompactor.compactSchema(plan)
-//    }
-
-    GlobalSchemaPrinter.printSchema(plan)
-    
-    plan.getDataSinks.map(_.getSinkNode).foldLeft(Set[OptimizerNode]())(persistConfiguration)
-  }
-
-  private def persistConfiguration(visited: Set[OptimizerNode], node: OptimizerNode): Set[OptimizerNode] = {
-
-    visited.contains(node) match {
-
-      case true => visited
-
-      case false => {
-
-        val children = node.getIncomingConnections.map(_.getSource).toSet
-        val newVisited = children.foldLeft(visited + node)(persistConfiguration)
-
-        node.getPactContract match {
-
-          case c: ScalaOperator[_, _] => c.persistConfiguration(Some(node))
-          case _                    =>
-        }
-
-        newVisited
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaPrinter.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaPrinter.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaPrinter.scala
deleted file mode 100644
index 9e79613..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/GlobalSchemaPrinter.scala
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis.postPass
-
-import scala.Array.canBuildFrom
-import scala.collection.JavaConversions.asScalaBuffer
-import scala.collection.JavaConversions.collectionAsScalaIterable
-
-import org.apache.commons.logging.{LogFactory, Log}
-
-import Extractors.CoGroupNode
-import Extractors.CrossNode
-import Extractors.DataSinkNode
-import Extractors.DataSourceNode
-import Extractors.JoinNode
-import Extractors.MapNode
-import Extractors.ReduceNode
-
-import org.apache.flink.api.scala.analysis.FieldSet
-import org.apache.flink.api.scala.analysis.FieldSelector
-
-import org.apache.flink.compiler.dag.BinaryUnionNode
-import org.apache.flink.compiler.dag.OptimizerNode
-import org.apache.flink.compiler.dag.SinkJoiner
-import org.apache.flink.compiler.plan.OptimizedPlan
-import org.apache.flink.api.common.Plan
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.common.operators.SingleInputOperator
-import org.apache.flink.api.common.operators.DualInputOperator
-import org.apache.flink.api.java.record.operators.GenericDataSink
-
-object GlobalSchemaPrinter {
-
-  import Extractors._
-
-  private final val LOG: Log = LogFactory.getLog(classOf[GlobalSchemaOptimizer])
-
-  def printSchema(plan: OptimizedPlan): Unit = {
-
-    LOG.debug("### " + plan.getJobName + " ###")
-    plan.getDataSinks.map(_.getSinkNode).foldLeft(Set[OptimizerNode]())(printSchema)
-    LOG.debug("####" + ("#" * plan.getJobName.length) + "####")
-  }
-
-  private def printSchema(visited: Set[OptimizerNode], node: OptimizerNode): Set[OptimizerNode] = {
-
-    visited.contains(node) match {
-
-      case true => visited
-
-      case false => {
-
-        val children = node.getIncomingConnections.map(_.getSource).toSet
-        val newVisited = children.foldLeft(visited + node)(printSchema)
-
-        node match {
-
-          case _: SinkJoiner | _: BinaryUnionNode =>
-
-          case DataSinkNode(udf, input) => {
-            printInfo(node, "Sink",
-              Seq(),
-              Seq(("", udf.inputFields)),
-              Seq(("", udf.getForwardIndexArrayFrom)),
-              Seq(("", udf.getDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-
-          case DataSourceNode(udf) => {
-            printInfo(node, "Source",
-              Seq(),
-              Seq(),
-              Seq(),
-              Seq(),
-              udf.outputFields
-            )
-          }
-
-          case CoGroupNode(udf, leftKey, rightKey, leftInput, rightInput) => {
-            printInfo(node, "CoGroup",
-              Seq(("L", leftKey), ("R", rightKey)),
-              Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
-              Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
-              Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-
-          case CrossNode(udf, leftInput, rightInput) => {
-            printInfo(node, "Cross",
-              Seq(),
-              Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
-              Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
-              Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-
-          case JoinNode(udf, leftKey, rightKey, leftInput, rightInput) => {
-            printInfo(node, "Join",
-              Seq(("L", leftKey), ("R", rightKey)),
-              Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
-              Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
-              Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-
-          case MapNode(udf, input) => {
-            printInfo(node, "Map",
-              Seq(),
-              Seq(("", udf.inputFields)),
-              Seq(("", udf.getForwardIndexArrayFrom)),
-              Seq(("", udf.getDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-
-          case UnionNode(udf, leftInput, rightInput) => {
-            printInfo(node, "Union",
-              Seq(),
-              Seq(("L", udf.leftInputFields), ("R", udf.rightInputFields)),
-              Seq(("L", udf.getLeftForwardIndexArrayFrom), ("R", udf.getRightForwardIndexArrayFrom)),
-              Seq(("L", udf.getLeftDiscardIndexArray), ("R", udf.getRightDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-
-          case ReduceNode(udf, key, input) => {
-
-//            val contract = node.asInstanceOf[Reduce4sContract[_, _, _]]
-//            contract.userCombineCode map { _ =>
-//              printInfo(node, "Combine",
-//                Seq(("", key)),
-//                Seq(("", udf.inputFields)),
-//                Seq(("", contract.combineForwardSet.toArray)),
-//                Seq(("", contract.combineDiscardSet.toArray)),
-//                udf.inputFields
-//              )
-//            }
-
-            printInfo(node, "Reduce",
-              Seq(("", key)),
-              Seq(("", udf.inputFields)),
-              Seq(("", udf.getForwardIndexArrayFrom)),
-              Seq(("", udf.getDiscardIndexArray)),
-              udf.outputFields
-            )
-          }
-          case DeltaIterationNode(udf, key, input1, input2) => {
-
-            printInfo(node, "WorksetIterate",
-              Seq(("", key)),
-              Seq(),
-              Seq(),
-              Seq(),
-              udf.outputFields)
-          }
-
-          case BulkIterationNode(udf, input1) => {
-
-            printInfo(node, "BulkIterate",
-              Seq(),
-              Seq(),
-              Seq(),
-              Seq(),
-              udf.outputFields)
-          }
-
-        }
-
-        newVisited
-      }
-    }
-  }
-
-  private def printInfo(node: OptimizerNode, kind: String, keys: Seq[(String, FieldSelector)], reads: Seq[(String, FieldSet[_])], forwards: Seq[(String, Array[Int])], discards: Seq[(String, Array[Int])], writes: FieldSet[_]): Unit = {
-
-    def indexesToStrings(pre: String, indexes: Array[Int]) = indexes map {
-      case -1 => "_"
-      case i  => pre + i
-    }
-
-    val formatString = "%s (%s): K{%s}: R[%s] => F[%s] - D[%s] + W[%s]"
-
-    val name = node.getName
-
-    val sKeys = keys flatMap { case (pre, value) => value.selectedFields.toSerializerIndexArray.map(pre + _) } mkString ", "
-    val sReads = reads flatMap { case (pre, value) => indexesToStrings(pre, value.toSerializerIndexArray) } mkString ", "
-    val sForwards = forwards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", "
-    val sDiscards = discards flatMap { case (pre, value) => value.sorted.map(pre + _) } mkString ", "
-    val sWrites = indexesToStrings("", writes.toSerializerIndexArray) mkString ", "
-
-    LOG.debug(formatString.format(name, kind, sKeys, sReads, sForwards, sDiscards, sWrites))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/OutputSets.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/OutputSets.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/OutputSets.scala
deleted file mode 100644
index 50ece34..0000000
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/analysis/postPass/OutputSets.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.scala.analysis.postPass
-
-import scala.language.reflectiveCalls
-import scala.collection.JavaConversions._
-
-import org.apache.flink.api.scala.analysis._
-
-import org.apache.flink.compiler.dag._
-import org.apache.flink.compiler.plan.OptimizedPlan
-
-
-object OutputSets {
-
-  import Extractors._
-
-  def computeOutputSets(plan: OptimizedPlan): (Map[OptimizerNode, Set[Int]], Map[Int, GlobalPos]) = {
-    
-    val root = plan.getDataSinks.map(s => s.getSinkNode: OptimizerNode).reduceLeft((n1, n2) => new SinkJoiner(n1, n2))
-    val outputSets = computeOutputSets(Map[OptimizerNode, Set[GlobalPos]](), root)
-    val outputPositions = outputSets(root).map(pos => (pos.getValue, pos)).toMap
-    
-    (outputSets.mapValues(_.map(_.getValue)), outputPositions)
-  }
-
-  private def computeOutputSets(outputSets: Map[OptimizerNode, Set[GlobalPos]], node: OptimizerNode): Map[OptimizerNode, Set[GlobalPos]] = {
-
-    outputSets.contains(node) match {
-
-      case true => outputSets
-
-      case false => {
-
-        val children = node.getIncomingConnections.map(_.getSource).toSet
-        val newOutputSets = children.foldLeft(outputSets)(computeOutputSets)
-        
-        val childOutputs = children.map(newOutputSets(_)).flatten
-        val nodeOutputs = node.getUDF map { _.outputFields.filter(_.isUsed).map(_.globalPos).toSet } getOrElse Set()
-        
-        newOutputSets.updated(node, childOutputs ++ nodeOutputs)
-      }
-    }
-  }
-}