You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/26 16:54:00 UTC

flink git commit: [FLINK-4590] [table] Some Table API tests are failing when debug lvl is set to DEBUG

Repository: flink
Updated Branches:
  refs/heads/master 70e71c161 -> 7eb45c133


[FLINK-4590] [table] Some Table API tests are failing when debug lvl is set to DEBUG

This closes #2504.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7eb45c13
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7eb45c13
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7eb45c13

Branch: refs/heads/master
Commit: 7eb45c133c49933b14719f06bf68ccf162a3e0b2
Parents: 70e71c1
Author: twalthr <tw...@apache.org>
Authored: Fri Sep 16 10:52:28 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Sep 26 18:48:47 2016 +0200

----------------------------------------------------------------------
 .../table/plan/nodes/dataset/BatchScan.scala    | 10 ++---
 .../nodes/dataset/BatchTableSourceScan.scala    |  6 +--
 .../plan/nodes/dataset/DataSetAggregate.scala   | 28 +++++++-------
 .../table/plan/nodes/dataset/DataSetCalc.scala  | 16 ++++----
 .../plan/nodes/dataset/DataSetIntersect.scala   | 16 ++++----
 .../table/plan/nodes/dataset/DataSetJoin.scala  | 20 +++++-----
 .../table/plan/nodes/dataset/DataSetMinus.scala | 39 +++++++++++++-------
 .../table/plan/nodes/dataset/DataSetScan.scala  |  6 +--
 .../table/plan/nodes/dataset/DataSetSort.scala  | 22 ++++++-----
 .../table/plan/nodes/dataset/DataSetUnion.scala | 19 ++++++----
 .../plan/nodes/dataset/DataSetValues.scala      | 16 ++++----
 .../plan/nodes/datastream/DataStreamCalc.scala  | 16 ++++----
 .../plan/nodes/datastream/DataStreamScan.scala  |  6 +--
 .../plan/nodes/datastream/DataStreamUnion.scala | 16 ++++----
 .../nodes/datastream/DataStreamValues.scala     | 12 +++---
 .../plan/nodes/datastream/StreamScan.scala      |  8 ++--
 .../src/test/resources/log4j-test.properties    | 27 ++++++++++++++
 .../src/test/resources/logback-test.xml         | 29 +++++++++++++++
 18 files changed, 193 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
index 85ed6ef..15b2081 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
@@ -36,14 +36,14 @@ abstract class BatchScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    rowType: RelDataType)
+    rowRelDataType: RelDataType)
   extends TableScan(cluster, traitSet, table)
   with DataSetRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def toString: String = {
-    s"Source(from: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
+    s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
@@ -81,14 +81,14 @@ abstract class BatchScan(
 
           val mapFunc = getConversionMapper(
             config,
-            false,
+            nullableInput = false,
             inputType,
             determinedType,
             "DataSetSourceConversion",
             getRowType.getFieldNames,
             Some(flinkTable.fieldIndexes))
 
-          val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+          val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
           input.map(mapFunc).name(opName)
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 027a5be..10d9534 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -35,15 +35,15 @@ class BatchTableSourceScan(
     rowType: RelDataType)
   extends BatchScan(cluster, traitSet, table, rowType) {
 
-  val tableSourceTable = table.unwrap(classOf[TableSourceTable])
+  val tableSourceTable = getTable.unwrap(classOf[TableSourceTable])
   val tableSource = tableSourceTable.tableSource.asInstanceOf[BatchTableSource[_]]
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new BatchTableSourceScan(
       cluster,
       traitSet,
-      table,
-      rowType
+      getTable,
+      getRowType
     )
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index 8aa18ca..c826d83 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -40,13 +40,13 @@ class DataSetAggregate(
     traitSet: RelTraitSet,
     input: RelNode,
     namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    rowType: RelDataType,
+    rowRelDataType: RelDataType,
     inputType: RelDataType,
     grouping: Array[Int])
   extends SingleRel(cluster, traitSet, input)
   with DataSetRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetAggregate(
@@ -54,7 +54,7 @@ class DataSetAggregate(
       traitSet,
       inputs.get(0),
       namedAggregates,
-      rowType,
+      getRowType,
       inputType,
       grouping)
   }
@@ -91,15 +91,15 @@ class DataSetAggregate(
     val groupingKeys = grouping.indices.toArray
     // add grouping fields, position keys in the input, and input type
     val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
-      inputType, rowType, grouping, config)
+      inputType, getRowType, grouping, config)
 
-    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
+    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
       tableEnv,
       // tell the input operator that this operator currently only supports Rows as input
       Some(TypeConverter.DEFAULT_ROW_TYPE))
 
     // get the output types
-    val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
+    val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
     .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
     .toArray
 
@@ -138,14 +138,14 @@ class DataSetAggregate(
     // if the expected type is not a Row, inject a mapper to convert to the expected type
     expectedType match {
       case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
-        val mapName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+        val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
         result.map(getConversionMapper(
-          config,
-          false,
-          rowTypeInfo.asInstanceOf[TypeInformation[Any]],
-          expectedType.get,
-          "DataSetAggregateConversion",
-          rowType.getFieldNames.asScala
+          config = config,
+          nullableInput = false,
+          inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
+          expectedType = expectedType.get,
+          conversionOperatorName = "DataSetAggregateConversion",
+          fieldNames = getRowType.getFieldNames.asScala
         ))
         .name(mapName)
       case _ => result
@@ -161,7 +161,7 @@ class DataSetAggregate(
   private def aggregationToString: String = {
 
     val inFields = inputType.getFieldNames.asScala
-    val outFields = rowType.getFieldNames.asScala
+    val outFields = getRowType.getFieldNames.asScala
 
     val groupStrings = grouping.map( inFields(_) )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
index 75e4fd2..6d10089 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -40,32 +40,32 @@ class DataSetCalc(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     input: RelNode,
-    rowType: RelDataType,
+    rowRelDataType: RelDataType,
     calcProgram: RexProgram,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
   with FlinkCalc
   with DataSetRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetCalc(
       cluster,
       traitSet,
       inputs.get(0),
-      rowType,
+      getRowType,
       calcProgram,
       ruleDescription)
   }
 
-  override def toString: String = calcToString(calcProgram, getExpressionString(_, _, _))
+  override def toString: String = calcToString(calcProgram, getExpressionString)
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
-      .item("select", selectionToString(calcProgram, getExpressionString(_, _, _)))
+      .item("select", selectionToString(calcProgram, getExpressionString))
       .itemIf("where",
-        conditionToString(calcProgram, getExpressionString(_, _, _)),
+        conditionToString(calcProgram, getExpressionString),
         calcProgram.getCondition != null)
   }
 
@@ -95,7 +95,7 @@ class DataSetCalc(
 
     val config = tableEnv.getConfig
 
-    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+    val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
     val returnType = determineReturnType(
       getRowType,
@@ -120,7 +120,7 @@ class DataSetCalc(
       returnType)
 
     val mapFunc = calcMapFunction(genFunction)
-    inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString(_, _, _)))
+    inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
index 042c28b..d2203d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
@@ -38,14 +38,14 @@ import scala.collection.JavaConverters._
 class DataSetIntersect(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    left: RelNode,
-    right: RelNode,
-    rowType: RelDataType,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    rowRelDataType: RelDataType,
     all: Boolean)
-  extends BiRel(cluster, traitSet, left, right)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
     with DataSetRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetIntersect(
@@ -53,7 +53,7 @@ class DataSetIntersect(
       traitSet,
       inputs.get(0),
       inputs.get(1),
-      rowType,
+      getRowType,
       all
     )
   }
@@ -115,7 +115,7 @@ class DataSetIntersect(
             "DataSetIntersectConversion",
             getRowType.getFieldNames)
 
-          val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
           intersectDs.map(mapFunc).name(opName)
         }
@@ -127,7 +127,7 @@ class DataSetIntersect(
   }
 
   private def intersectSelectionToString: String = {
-    rowType.getFieldNames.asScala.toList.mkString(", ")
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index 50a9b2d..bbb6325 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -44,9 +44,9 @@ import scala.collection.mutable.ArrayBuffer
 class DataSetJoin(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    left: RelNode,
-    right: RelNode,
-    rowType: RelDataType,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    rowRelDataType: RelDataType,
     joinCondition: RexNode,
     joinRowType: RelDataType,
     joinInfo: JoinInfo,
@@ -54,10 +54,10 @@ class DataSetJoin(
     joinType: JoinRelType,
     joinHint: JoinHint,
     ruleDescription: String)
-  extends BiRel(cluster, traitSet, left, right)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
   with DataSetRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetJoin(
@@ -65,7 +65,7 @@ class DataSetJoin(
       traitSet,
       inputs.get(0),
       inputs.get(1),
-      rowType,
+      getRowType,
       joinCondition,
       joinRowType,
       joinInfo,
@@ -113,7 +113,7 @@ class DataSetJoin(
     val rightKeys = ArrayBuffer.empty[Int]
     if (keyPairs.isEmpty) {
       // if no equality keys => not supported
-      throw new TableException(
+      throw TableException(
         "Joins should have at least one equality condition.\n" +
           s"\tLeft: ${left.toString},\n" +
           s"\tRight: ${right.toString},\n" +
@@ -135,7 +135,7 @@ class DataSetJoin(
           leftKeys.add(pair.source)
           rightKeys.add(pair.target)
         } else {
-          throw new TableException(
+          throw TableException(
             "Equality join predicate on incompatible types.\n" +
               s"\tLeft: ${left.toString},\n" +
               s"\tRight: ${right.toString},\n" +
@@ -156,7 +156,7 @@ class DataSetJoin(
     }
 
     if (nullCheck && !config.getNullCheck) {
-      throw new TableException("Null check in TableConfig must be enabled for outer joins.")
+      throw TableException("Null check in TableConfig must be enabled for outer joins.")
     }
 
     val generator = new CodeGenerator(
@@ -205,7 +205,7 @@ class DataSetJoin(
   }
 
   private def joinSelectionToString: String = {
-    rowType.getFieldNames.asScala.toList.mkString(", ")
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
   }
 
   private def joinConditionToString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
index d3a2fe7..6a5cbd1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
@@ -38,14 +38,14 @@ import scala.collection.JavaConverters._
 class DataSetMinus(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    left: RelNode,
-    right: RelNode,
-    rowType: RelDataType,
+    leftNode: RelNode,
+    rightNode: RelNode,
+    rowRelDataType: RelDataType,
     all: Boolean)
-  extends BiRel(cluster, traitSet, left, right)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
     with DataSetRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetMinus(
@@ -53,7 +53,7 @@ class DataSetMinus(
       traitSet,
       inputs.get(0),
       inputs.get(1),
-      rowType,
+      getRowType,
       all
     )
   }
@@ -75,6 +75,17 @@ class DataSetMinus(
     }
   }
 
+  override def estimateRowCount(mq: RelMetadataQuery): Double = {
+    // from org.apache.calcite.rel.metadata.RelMdUtil.getMinusRowCount
+    val children = this.getInputs
+    var rowCnt = mq.getRowCount(children.head)
+    getInputs.tail.foreach(rowCnt -= 0.5 * mq.getRowCount(_))
+    if (rowCnt < 0) {
+      rowCnt = 0.0
+    }
+    rowCnt
+  }
+
   override def translateToPlan(
       tableEnv: BatchTableEnvironment,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
@@ -108,14 +119,14 @@ class DataSetMinus(
         // conversion
         if (determinedType != leftType) {
           val mapFunc = getConversionMapper(
-            config,
-            false,
-            leftType,
-            determinedType,
-            "DataSetMinusConversion",
-            getRowType.getFieldNames)
+            config = config,
+            nullableInput = false,
+            inputType = leftType,
+            expectedType = determinedType,
+            conversionOperatorName = "DataSetMinusConversion",
+            fieldNames = getRowType.getFieldNames)
 
-          val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
           minusDs.map(mapFunc).name(opName)
         }
@@ -127,7 +138,7 @@ class DataSetMinus(
   }
 
   private def minusSelectionToString: String = {
-    rowType.getFieldNames.asScala.toList.mkString(", ")
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
index 17a7db2..3c34bc3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
@@ -38,14 +38,14 @@ class DataSetScan(
     rowType: RelDataType)
   extends BatchScan(cluster, traitSet, table, rowType) {
 
-  val dataSetTable: DataSetTable[Any] = table.unwrap(classOf[DataSetTable[Any]])
+  val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]])
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetScan(
       cluster,
       traitSet,
-      table,
-      rowType
+      getTable,
+      getRowType
     )
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
index 22930e7..661aeef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -39,7 +39,7 @@ class DataSetSort(
     traitSet: RelTraitSet,
     inp: RelNode,
     collations: RelCollation,
-    rowType2: RelDataType,
+    rowRelDataType: RelDataType,
     offset: RexNode,
     fetch: RexNode)
   extends SingleRel(cluster, traitSet, inp)
@@ -57,13 +57,15 @@ class DataSetSort(
     Long.MaxValue
   }
 
+  override def deriveRowType(): RelDataType = rowRelDataType
+
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
     new DataSetSort(
       cluster,
       traitSet,
       inputs.get(0),
       collations,
-      rowType2,
+      getRowType,
       offset,
       fetch
     )
@@ -138,15 +140,15 @@ class DataSetSort(
         if (determinedType != inputType) {
 
           val mapFunc = getConversionMapper(
-            config,
-            false,
-            partitionedDs.getType,
-            determinedType,
-            "DataSetSortConversion",
-            getRowType.getFieldNames.asScala
+            config = config,
+            nullableInput = false,
+            inputType = partitionedDs.getType,
+            expectedType = determinedType,
+            conversionOperatorName = "DataSetSortConversion",
+            fieldNames = getRowType.getFieldNames.asScala
           )
 
-          val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+          val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
           limitedDs.map(mapFunc).name(opName)
         }
@@ -170,7 +172,7 @@ class DataSetSort(
     .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
 
   private val sortFieldsToString = fieldCollations
-    .map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
+    .map(col => s"${getRowType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
 
   private val offsetToString = s"$offset"
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
index ff1ff29..6e43fae 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
@@ -36,13 +36,13 @@ import scala.collection.JavaConverters._
 class DataSetUnion(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    left: RelNode,
-    right: RelNode,
-    rowType: RelDataType)
-  extends BiRel(cluster, traitSet, left, right)
+    leftNode: RelNode,
+    rightNode: RelNode,
+    rowRelDataType: RelDataType)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
   with DataSetRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetUnion(
@@ -50,7 +50,7 @@ class DataSetUnion(
       traitSet,
       inputs.get(0),
       inputs.get(1),
-      rowType
+      rowRelDataType
     )
   }
 
@@ -72,6 +72,11 @@ class DataSetUnion(
     planner.getCostFactory.makeCost(rowCnt, 0, 0)
   }
 
+  override def estimateRowCount(mq: RelMetadataQuery): Double = {
+    // adopted from org.apache.calcite.rel.metadata.RelMdUtil.getUnionAllRowCount
+    getInputs.foldLeft(0.0)(_ + mq.getRowCount(_))
+  }
+
   override def translateToPlan(
       tableEnv: BatchTableEnvironment,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
@@ -93,7 +98,7 @@ class DataSetUnion(
   }
 
   private def unionSelectionToString: String = {
-    rowType.getFieldNames.asScala.toList.mkString(", ")
+    rowRelDataType.getFieldNames.asScala.toList.mkString(", ")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
index a31f199..1b637c8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
@@ -41,24 +41,24 @@ import scala.collection.JavaConversions._
 class DataSetValues(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    rowType: RelDataType,
+    rowRelDataType: RelDataType,
     tuples: ImmutableList[ImmutableList[RexLiteral]])
-  extends Values(cluster, rowType, tuples, traitSet)
+  extends Values(cluster, rowRelDataType, tuples, traitSet)
   with DataSetRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataSetValues(
       cluster,
       traitSet,
-      rowType,
-      tuples
+      getRowType,
+      getTuples
     )
   }
 
   override def toString: String = {
-    "Values(values: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))"
+    s"Values(values: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -78,7 +78,7 @@ class DataSetValues(
       config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
 
     // convert List[RexLiteral] to Row
-    val rows: Seq[Row] = tuples.asList.map { t =>
+    val rows: Seq[Row] = getTuples.asList.map { t =>
       val row = new Row(t.size())
       t.zipWithIndex.foreach( x => row.setField(x._2, x._1.getValue.asInstanceOf[Any]) )
       row
@@ -89,7 +89,7 @@ class DataSetValues(
   }
 
   private def valuesFieldsToString: String = {
-    rowType.getFieldNames.asScala.toList.mkString(", ")
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
index 334c0aa..5312a5f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -38,33 +38,33 @@ class DataStreamCalc(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     input: RelNode,
-    rowType: RelDataType,
+    rowRelDataType: RelDataType,
     calcProgram: RexProgram,
     ruleDescription: String)
   extends SingleRel(cluster, traitSet, input)
   with FlinkCalc
   with DataStreamRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamCalc(
       cluster,
       traitSet,
       inputs.get(0),
-      rowType,
+      getRowType,
       calcProgram,
       ruleDescription
     )
   }
 
-  override def toString: String = calcToString(calcProgram, getExpressionString(_, _, _))
+  override def toString: String = calcToString(calcProgram, getExpressionString)
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
-      .item("select", selectionToString(calcProgram, getExpressionString(_, _, _)))
+      .item("select", selectionToString(calcProgram, getExpressionString))
       .itemIf("where",
-        conditionToString(calcProgram, getExpressionString(_, _, _)),
+        conditionToString(calcProgram, getExpressionString),
         calcProgram.getCondition != null)
   }
 
@@ -74,7 +74,7 @@ class DataStreamCalc(
 
     val config = tableEnv.getConfig
 
-    val inputDataStream = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+    val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
     val returnType = determineReturnType(
       getRowType,
@@ -99,6 +99,6 @@ class DataStreamCalc(
       returnType)
 
     val mapFunc = calcMapFunction(genFunction)
-    inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString(_, _, _)))
+    inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
index cfd04b0..463e1bc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
@@ -38,14 +38,14 @@ class DataStreamScan(
     rowType: RelDataType)
   extends StreamScan(cluster, traitSet, table, rowType) {
 
-  val dataStreamTable: DataStreamTable[Any] = table.unwrap(classOf[DataStreamTable[Any]])
+  val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamScan(
       cluster,
       traitSet,
-      table,
-      rowType
+      getTable,
+      getRowType
     )
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
index e72e9a8..f490d31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -34,13 +34,13 @@ import scala.collection.JavaConverters._
 class DataStreamUnion(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    left: RelNode,
-    right: RelNode,
-    rowType: RelDataType)
-  extends BiRel(cluster, traitSet, left, right)
+    leftNode: RelNode,
+    rightNode: RelNode,
+    rowRelDataType: RelDataType)
+  extends BiRel(cluster, traitSet, leftNode, rightNode)
   with DataStreamRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamUnion(
@@ -48,7 +48,7 @@ class DataStreamUnion(
       traitSet,
       inputs.get(0),
       inputs.get(1),
-      rowType
+      getRowType
     )
   }
 
@@ -57,7 +57,7 @@ class DataStreamUnion(
   }
 
   override def toString = {
-    "Union(union: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))"
+    s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
   }
 
   override def translateToPlan(
@@ -70,6 +70,6 @@ class DataStreamUnion(
   }
 
   private def unionSelectionToString: String = {
-    rowType.getFieldNames.asScala.toList.mkString(", ")
+    getRowType.getFieldNames.asScala.toList.mkString(", ")
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
index 3ae19ac..44130e7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
@@ -39,19 +39,19 @@ import scala.collection.JavaConversions._
 class DataStreamValues(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
-    rowType: RelDataType,
+    rowRelDataType: RelDataType,
     tuples: ImmutableList[ImmutableList[RexLiteral]])
-  extends Values(cluster, rowType, tuples, traitSet)
+  extends Values(cluster, rowRelDataType, tuples, traitSet)
   with DataStreamRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
     new DataStreamValues(
       cluster,
       traitSet,
-      rowType,
-      tuples
+      getRowType,
+      getTuples
     )
   }
 
@@ -68,7 +68,7 @@ class DataStreamValues(
       config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
 
     // convert List[RexLiteral] to Row
-    val rows: Seq[Row] = tuples.asList.map { t =>
+    val rows: Seq[Row] = getTuples.asList.map { t =>
       val row = new Row(t.size())
       t.zipWithIndex.foreach( x => row.setField(x._2, x._1.getValue.asInstanceOf[Any]) )
       row

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
index 1f5fc6a..17620d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
@@ -38,11 +38,11 @@ abstract class StreamScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    rowType: RelDataType)
+    rowRelDataType: RelDataType)
   extends TableScan(cluster, traitSet, table)
   with DataStreamRel {
 
-  override def deriveRowType() = rowType
+  override def deriveRowType() = rowRelDataType
 
   protected def convertToExpectedType(
       input: DataStream[Any],
@@ -72,7 +72,7 @@ abstract class StreamScan(
         if (determinedType != inputType) {
           val generator = new CodeGenerator(
             config,
-            false,
+            nullableInput = false,
             input.getType,
             flinkTable.fieldIndexes)
 
@@ -97,7 +97,7 @@ abstract class StreamScan(
             genFunction.code,
             genFunction.returnType)
 
-          val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+          val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
 
           input.map(mapFunc).name(opName)
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/log4j-test.properties b/flink-libraries/flink-table/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..4c74d85
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/logback-test.xml b/flink-libraries/flink-table/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..b99489e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>