You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/26 16:54:00 UTC
flink git commit: [FLINK-4590] [table] Some Table API tests are
failing when debug lvl is set to DEBUG
Repository: flink
Updated Branches:
refs/heads/master 70e71c161 -> 7eb45c133
[FLINK-4590] [table] Some Table API tests are failing when debug lvl is set to DEBUG
This closes #2504.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7eb45c13
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7eb45c13
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7eb45c13
Branch: refs/heads/master
Commit: 7eb45c133c49933b14719f06bf68ccf162a3e0b2
Parents: 70e71c1
Author: twalthr <tw...@apache.org>
Authored: Fri Sep 16 10:52:28 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Sep 26 18:48:47 2016 +0200
----------------------------------------------------------------------
.../table/plan/nodes/dataset/BatchScan.scala | 10 ++---
.../nodes/dataset/BatchTableSourceScan.scala | 6 +--
.../plan/nodes/dataset/DataSetAggregate.scala | 28 +++++++-------
.../table/plan/nodes/dataset/DataSetCalc.scala | 16 ++++----
.../plan/nodes/dataset/DataSetIntersect.scala | 16 ++++----
.../table/plan/nodes/dataset/DataSetJoin.scala | 20 +++++-----
.../table/plan/nodes/dataset/DataSetMinus.scala | 39 +++++++++++++-------
.../table/plan/nodes/dataset/DataSetScan.scala | 6 +--
.../table/plan/nodes/dataset/DataSetSort.scala | 22 ++++++-----
.../table/plan/nodes/dataset/DataSetUnion.scala | 19 ++++++----
.../plan/nodes/dataset/DataSetValues.scala | 16 ++++----
.../plan/nodes/datastream/DataStreamCalc.scala | 16 ++++----
.../plan/nodes/datastream/DataStreamScan.scala | 6 +--
.../plan/nodes/datastream/DataStreamUnion.scala | 16 ++++----
.../nodes/datastream/DataStreamValues.scala | 12 +++---
.../plan/nodes/datastream/StreamScan.scala | 8 ++--
.../src/test/resources/log4j-test.properties | 27 ++++++++++++++
.../src/test/resources/logback-test.xml | 29 +++++++++++++++
18 files changed, 193 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
index 85ed6ef..15b2081 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
@@ -36,14 +36,14 @@ abstract class BatchScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
- rowType: RelDataType)
+ rowRelDataType: RelDataType)
extends TableScan(cluster, traitSet, table)
with DataSetRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
override def toString: String = {
- s"Source(from: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
+ s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
}
override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
@@ -81,14 +81,14 @@ abstract class BatchScan(
val mapFunc = getConversionMapper(
config,
- false,
+ nullableInput = false,
inputType,
determinedType,
"DataSetSourceConversion",
getRowType.getFieldNames,
Some(flinkTable.fieldIndexes))
- val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+ val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
input.map(mapFunc).name(opName)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 027a5be..10d9534 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -35,15 +35,15 @@ class BatchTableSourceScan(
rowType: RelDataType)
extends BatchScan(cluster, traitSet, table, rowType) {
- val tableSourceTable = table.unwrap(classOf[TableSourceTable])
+ val tableSourceTable = getTable.unwrap(classOf[TableSourceTable])
val tableSource = tableSourceTable.tableSource.asInstanceOf[BatchTableSource[_]]
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new BatchTableSourceScan(
cluster,
traitSet,
- table,
- rowType
+ getTable,
+ getRowType
)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index 8aa18ca..c826d83 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -40,13 +40,13 @@ class DataSetAggregate(
traitSet: RelTraitSet,
input: RelNode,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- rowType: RelDataType,
+ rowRelDataType: RelDataType,
inputType: RelDataType,
grouping: Array[Int])
extends SingleRel(cluster, traitSet, input)
with DataSetRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetAggregate(
@@ -54,7 +54,7 @@ class DataSetAggregate(
traitSet,
inputs.get(0),
namedAggregates,
- rowType,
+ getRowType,
inputType,
grouping)
}
@@ -91,15 +91,15 @@ class DataSetAggregate(
val groupingKeys = grouping.indices.toArray
// add grouping fields, position keys in the input, and input type
val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
- inputType, rowType, grouping, config)
+ inputType, getRowType, grouping, config)
- val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
+ val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
tableEnv,
// tell the input operator that this operator currently only supports Rows as input
Some(TypeConverter.DEFAULT_ROW_TYPE))
// get the output types
- val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
+ val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
.map(field => FlinkTypeFactory.toTypeInfo(field.getType))
.toArray
@@ -138,14 +138,14 @@ class DataSetAggregate(
// if the expected type is not a Row, inject a mapper to convert to the expected type
expectedType match {
case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- val mapName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+ val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
result.map(getConversionMapper(
- config,
- false,
- rowTypeInfo.asInstanceOf[TypeInformation[Any]],
- expectedType.get,
- "DataSetAggregateConversion",
- rowType.getFieldNames.asScala
+ config = config,
+ nullableInput = false,
+ inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
+ expectedType = expectedType.get,
+ conversionOperatorName = "DataSetAggregateConversion",
+ fieldNames = getRowType.getFieldNames.asScala
))
.name(mapName)
case _ => result
@@ -161,7 +161,7 @@ class DataSetAggregate(
private def aggregationToString: String = {
val inFields = inputType.getFieldNames.asScala
- val outFields = rowType.getFieldNames.asScala
+ val outFields = getRowType.getFieldNames.asScala
val groupStrings = grouping.map( inFields(_) )
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
index 75e4fd2..6d10089 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -40,32 +40,32 @@ class DataSetCalc(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
- rowType: RelDataType,
+ rowRelDataType: RelDataType,
calcProgram: RexProgram,
ruleDescription: String)
extends SingleRel(cluster, traitSet, input)
with FlinkCalc
with DataSetRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetCalc(
cluster,
traitSet,
inputs.get(0),
- rowType,
+ getRowType,
calcProgram,
ruleDescription)
}
- override def toString: String = calcToString(calcProgram, getExpressionString(_, _, _))
+ override def toString: String = calcToString(calcProgram, getExpressionString)
override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
- .item("select", selectionToString(calcProgram, getExpressionString(_, _, _)))
+ .item("select", selectionToString(calcProgram, getExpressionString))
.itemIf("where",
- conditionToString(calcProgram, getExpressionString(_, _, _)),
+ conditionToString(calcProgram, getExpressionString),
calcProgram.getCondition != null)
}
@@ -95,7 +95,7 @@ class DataSetCalc(
val config = tableEnv.getConfig
- val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val returnType = determineReturnType(
getRowType,
@@ -120,7 +120,7 @@ class DataSetCalc(
returnType)
val mapFunc = calcMapFunction(genFunction)
- inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString(_, _, _)))
+ inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
index 042c28b..d2203d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
@@ -38,14 +38,14 @@ import scala.collection.JavaConverters._
class DataSetIntersect(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- left: RelNode,
- right: RelNode,
- rowType: RelDataType,
+ leftNode: RelNode,
+ rightNode: RelNode,
+ rowRelDataType: RelDataType,
all: Boolean)
- extends BiRel(cluster, traitSet, left, right)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
with DataSetRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetIntersect(
@@ -53,7 +53,7 @@ class DataSetIntersect(
traitSet,
inputs.get(0),
inputs.get(1),
- rowType,
+ getRowType,
all
)
}
@@ -115,7 +115,7 @@ class DataSetIntersect(
"DataSetIntersectConversion",
getRowType.getFieldNames)
- val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+ val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
intersectDs.map(mapFunc).name(opName)
}
@@ -127,7 +127,7 @@ class DataSetIntersect(
}
private def intersectSelectionToString: String = {
- rowType.getFieldNames.asScala.toList.mkString(", ")
+ getRowType.getFieldNames.asScala.toList.mkString(", ")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index 50a9b2d..bbb6325 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -44,9 +44,9 @@ import scala.collection.mutable.ArrayBuffer
class DataSetJoin(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- left: RelNode,
- right: RelNode,
- rowType: RelDataType,
+ leftNode: RelNode,
+ rightNode: RelNode,
+ rowRelDataType: RelDataType,
joinCondition: RexNode,
joinRowType: RelDataType,
joinInfo: JoinInfo,
@@ -54,10 +54,10 @@ class DataSetJoin(
joinType: JoinRelType,
joinHint: JoinHint,
ruleDescription: String)
- extends BiRel(cluster, traitSet, left, right)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
with DataSetRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetJoin(
@@ -65,7 +65,7 @@ class DataSetJoin(
traitSet,
inputs.get(0),
inputs.get(1),
- rowType,
+ getRowType,
joinCondition,
joinRowType,
joinInfo,
@@ -113,7 +113,7 @@ class DataSetJoin(
val rightKeys = ArrayBuffer.empty[Int]
if (keyPairs.isEmpty) {
// if no equality keys => not supported
- throw new TableException(
+ throw TableException(
"Joins should have at least one equality condition.\n" +
s"\tLeft: ${left.toString},\n" +
s"\tRight: ${right.toString},\n" +
@@ -135,7 +135,7 @@ class DataSetJoin(
leftKeys.add(pair.source)
rightKeys.add(pair.target)
} else {
- throw new TableException(
+ throw TableException(
"Equality join predicate on incompatible types.\n" +
s"\tLeft: ${left.toString},\n" +
s"\tRight: ${right.toString},\n" +
@@ -156,7 +156,7 @@ class DataSetJoin(
}
if (nullCheck && !config.getNullCheck) {
- throw new TableException("Null check in TableConfig must be enabled for outer joins.")
+ throw TableException("Null check in TableConfig must be enabled for outer joins.")
}
val generator = new CodeGenerator(
@@ -205,7 +205,7 @@ class DataSetJoin(
}
private def joinSelectionToString: String = {
- rowType.getFieldNames.asScala.toList.mkString(", ")
+ getRowType.getFieldNames.asScala.toList.mkString(", ")
}
private def joinConditionToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
index d3a2fe7..6a5cbd1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
@@ -38,14 +38,14 @@ import scala.collection.JavaConverters._
class DataSetMinus(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- left: RelNode,
- right: RelNode,
- rowType: RelDataType,
+ leftNode: RelNode,
+ rightNode: RelNode,
+ rowRelDataType: RelDataType,
all: Boolean)
- extends BiRel(cluster, traitSet, left, right)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
with DataSetRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetMinus(
@@ -53,7 +53,7 @@ class DataSetMinus(
traitSet,
inputs.get(0),
inputs.get(1),
- rowType,
+ getRowType,
all
)
}
@@ -75,6 +75,17 @@ class DataSetMinus(
}
}
+ override def estimateRowCount(mq: RelMetadataQuery): Double = {
+ // from org.apache.calcite.rel.metadata.RelMdUtil.getMinusRowCount
+ val children = this.getInputs
+ var rowCnt = mq.getRowCount(children.head)
+ getInputs.tail.foreach(rowCnt -= 0.5 * mq.getRowCount(_))
+ if (rowCnt < 0) {
+ rowCnt = 0.0
+ }
+ rowCnt
+ }
+
override def translateToPlan(
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
@@ -108,14 +119,14 @@ class DataSetMinus(
// conversion
if (determinedType != leftType) {
val mapFunc = getConversionMapper(
- config,
- false,
- leftType,
- determinedType,
- "DataSetMinusConversion",
- getRowType.getFieldNames)
+ config = config,
+ nullableInput = false,
+ inputType = leftType,
+ expectedType = determinedType,
+ conversionOperatorName = "DataSetMinusConversion",
+ fieldNames = getRowType.getFieldNames)
- val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+ val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
minusDs.map(mapFunc).name(opName)
}
@@ -127,7 +138,7 @@ class DataSetMinus(
}
private def minusSelectionToString: String = {
- rowType.getFieldNames.asScala.toList.mkString(", ")
+ getRowType.getFieldNames.asScala.toList.mkString(", ")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
index 17a7db2..3c34bc3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
@@ -38,14 +38,14 @@ class DataSetScan(
rowType: RelDataType)
extends BatchScan(cluster, traitSet, table, rowType) {
- val dataSetTable: DataSetTable[Any] = table.unwrap(classOf[DataSetTable[Any]])
+ val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]])
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetScan(
cluster,
traitSet,
- table,
- rowType
+ getTable,
+ getRowType
)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
index 22930e7..661aeef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -39,7 +39,7 @@ class DataSetSort(
traitSet: RelTraitSet,
inp: RelNode,
collations: RelCollation,
- rowType2: RelDataType,
+ rowRelDataType: RelDataType,
offset: RexNode,
fetch: RexNode)
extends SingleRel(cluster, traitSet, inp)
@@ -57,13 +57,15 @@ class DataSetSort(
Long.MaxValue
}
+ override def deriveRowType(): RelDataType = rowRelDataType
+
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new DataSetSort(
cluster,
traitSet,
inputs.get(0),
collations,
- rowType2,
+ getRowType,
offset,
fetch
)
@@ -138,15 +140,15 @@ class DataSetSort(
if (determinedType != inputType) {
val mapFunc = getConversionMapper(
- config,
- false,
- partitionedDs.getType,
- determinedType,
- "DataSetSortConversion",
- getRowType.getFieldNames.asScala
+ config = config,
+ nullableInput = false,
+ inputType = partitionedDs.getType,
+ expectedType = determinedType,
+ conversionOperatorName = "DataSetSortConversion",
+ fieldNames = getRowType.getFieldNames.asScala
)
- val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+ val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
limitedDs.map(mapFunc).name(opName)
}
@@ -170,7 +172,7 @@ class DataSetSort(
.map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
private val sortFieldsToString = fieldCollations
- .map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
+ .map(col => s"${getRowType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
private val offsetToString = s"$offset"
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
index ff1ff29..6e43fae 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
@@ -36,13 +36,13 @@ import scala.collection.JavaConverters._
class DataSetUnion(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- left: RelNode,
- right: RelNode,
- rowType: RelDataType)
- extends BiRel(cluster, traitSet, left, right)
+ leftNode: RelNode,
+ rightNode: RelNode,
+ rowRelDataType: RelDataType)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
with DataSetRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetUnion(
@@ -50,7 +50,7 @@ class DataSetUnion(
traitSet,
inputs.get(0),
inputs.get(1),
- rowType
+ rowRelDataType
)
}
@@ -72,6 +72,11 @@ class DataSetUnion(
planner.getCostFactory.makeCost(rowCnt, 0, 0)
}
+ override def estimateRowCount(mq: RelMetadataQuery): Double = {
+ // adopted from org.apache.calcite.rel.metadata.RelMdUtil.getUnionAllRowCount
+ getInputs.foldLeft(0.0)(_ + mq.getRowCount(_))
+ }
+
override def translateToPlan(
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
@@ -93,7 +98,7 @@ class DataSetUnion(
}
private def unionSelectionToString: String = {
- rowType.getFieldNames.asScala.toList.mkString(", ")
+ rowRelDataType.getFieldNames.asScala.toList.mkString(", ")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
index a31f199..1b637c8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
@@ -41,24 +41,24 @@ import scala.collection.JavaConversions._
class DataSetValues(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- rowType: RelDataType,
+ rowRelDataType: RelDataType,
tuples: ImmutableList[ImmutableList[RexLiteral]])
- extends Values(cluster, rowType, tuples, traitSet)
+ extends Values(cluster, rowRelDataType, tuples, traitSet)
with DataSetRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetValues(
cluster,
traitSet,
- rowType,
- tuples
+ getRowType,
+ getTuples
)
}
override def toString: String = {
- "Values(values: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))"
+ s"Values(values: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
}
override def explainTerms(pw: RelWriter): RelWriter = {
@@ -78,7 +78,7 @@ class DataSetValues(
config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
// convert List[RexLiteral] to Row
- val rows: Seq[Row] = tuples.asList.map { t =>
+ val rows: Seq[Row] = getTuples.asList.map { t =>
val row = new Row(t.size())
t.zipWithIndex.foreach( x => row.setField(x._2, x._1.getValue.asInstanceOf[Any]) )
row
@@ -89,7 +89,7 @@ class DataSetValues(
}
private def valuesFieldsToString: String = {
- rowType.getFieldNames.asScala.toList.mkString(", ")
+ getRowType.getFieldNames.asScala.toList.mkString(", ")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
index 334c0aa..5312a5f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -38,33 +38,33 @@ class DataStreamCalc(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
- rowType: RelDataType,
+ rowRelDataType: RelDataType,
calcProgram: RexProgram,
ruleDescription: String)
extends SingleRel(cluster, traitSet, input)
with FlinkCalc
with DataStreamRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamCalc(
cluster,
traitSet,
inputs.get(0),
- rowType,
+ getRowType,
calcProgram,
ruleDescription
)
}
- override def toString: String = calcToString(calcProgram, getExpressionString(_, _, _))
+ override def toString: String = calcToString(calcProgram, getExpressionString)
override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
- .item("select", selectionToString(calcProgram, getExpressionString(_, _, _)))
+ .item("select", selectionToString(calcProgram, getExpressionString))
.itemIf("where",
- conditionToString(calcProgram, getExpressionString(_, _, _)),
+ conditionToString(calcProgram, getExpressionString),
calcProgram.getCondition != null)
}
@@ -74,7 +74,7 @@ class DataStreamCalc(
val config = tableEnv.getConfig
- val inputDataStream = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
val returnType = determineReturnType(
getRowType,
@@ -99,6 +99,6 @@ class DataStreamCalc(
returnType)
val mapFunc = calcMapFunction(genFunction)
- inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString(_, _, _)))
+ inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
index cfd04b0..463e1bc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala
@@ -38,14 +38,14 @@ class DataStreamScan(
rowType: RelDataType)
extends StreamScan(cluster, traitSet, table, rowType) {
- val dataStreamTable: DataStreamTable[Any] = table.unwrap(classOf[DataStreamTable[Any]])
+ val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamScan(
cluster,
traitSet,
- table,
- rowType
+ getTable,
+ getRowType
)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
index e72e9a8..f490d31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -34,13 +34,13 @@ import scala.collection.JavaConverters._
class DataStreamUnion(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- left: RelNode,
- right: RelNode,
- rowType: RelDataType)
- extends BiRel(cluster, traitSet, left, right)
+ leftNode: RelNode,
+ rightNode: RelNode,
+ rowRelDataType: RelDataType)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
with DataStreamRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamUnion(
@@ -48,7 +48,7 @@ class DataStreamUnion(
traitSet,
inputs.get(0),
inputs.get(1),
- rowType
+ getRowType
)
}
@@ -57,7 +57,7 @@ class DataStreamUnion(
}
override def toString = {
- "Union(union: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))"
+ s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
}
override def translateToPlan(
@@ -70,6 +70,6 @@ class DataStreamUnion(
}
private def unionSelectionToString: String = {
- rowType.getFieldNames.asScala.toList.mkString(", ")
+ getRowType.getFieldNames.asScala.toList.mkString(", ")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
index 3ae19ac..44130e7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
@@ -39,19 +39,19 @@ import scala.collection.JavaConversions._
class DataStreamValues(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- rowType: RelDataType,
+ rowRelDataType: RelDataType,
tuples: ImmutableList[ImmutableList[RexLiteral]])
- extends Values(cluster, rowType, tuples, traitSet)
+ extends Values(cluster, rowRelDataType, tuples, traitSet)
with DataStreamRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamValues(
cluster,
traitSet,
- rowType,
- tuples
+ getRowType,
+ getTuples
)
}
@@ -68,7 +68,7 @@ class DataStreamValues(
config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo]
// convert List[RexLiteral] to Row
- val rows: Seq[Row] = tuples.asList.map { t =>
+ val rows: Seq[Row] = getTuples.asList.map { t =>
val row = new Row(t.size())
t.zipWithIndex.foreach( x => row.setField(x._2, x._1.getValue.asInstanceOf[Any]) )
row
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
index 1f5fc6a..17620d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala
@@ -38,11 +38,11 @@ abstract class StreamScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
- rowType: RelDataType)
+ rowRelDataType: RelDataType)
extends TableScan(cluster, traitSet, table)
with DataStreamRel {
- override def deriveRowType() = rowType
+ override def deriveRowType() = rowRelDataType
protected def convertToExpectedType(
input: DataStream[Any],
@@ -72,7 +72,7 @@ abstract class StreamScan(
if (determinedType != inputType) {
val generator = new CodeGenerator(
config,
- false,
+ nullableInput = false,
input.getType,
flinkTable.fieldIndexes)
@@ -97,7 +97,7 @@ abstract class StreamScan(
genFunction.code,
genFunction.returnType)
- val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+ val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
input.map(mapFunc).name(opName)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/log4j-test.properties b/flink-libraries/flink-table/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..4c74d85
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/logback-test.xml b/flink-libraries/flink-table/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..b99489e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>