You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/04/17 19:52:45 UTC
[1/2] flink git commit: [FLINK-8366] [table] Fix UpsertTableSink
tests.
Repository: flink
Updated Branches:
refs/heads/master 518adb043 -> d38695b8e
[FLINK-8366] [table] Fix UpsertTableSink tests.
This closes #5244.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3adc21d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3adc21d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3adc21d4
Branch: refs/heads/master
Commit: 3adc21d489d78cd34748f2132e4e7659f65a33e4
Parents: 518adb0
Author: 军长 <he...@alibaba-inc.com>
Authored: Fri Jan 5 16:53:31 2018 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Apr 17 17:03:18 2018 +0200
----------------------------------------------------------------------
.../table/runtime/stream/table/TableSinkITCase.scala | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3adc21d4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index bda823e..3085c36 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -63,7 +63,7 @@ class TableSinkITCase extends AbstractTestBase {
val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
- val results = input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
+ input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
.where('a < 3 || 'a > 19)
.select('c, 't, 'b)
.insertInto("targetTable")
@@ -96,7 +96,7 @@ class TableSinkITCase extends AbstractTestBase {
.assignAscendingTimestamps(_._2)
.map(x => x).setParallelism(4) // increase DOP to 4
- val results = input.toTable(tEnv, 'a, 'b.rowtime, 'c)
+ input.toTable(tEnv, 'a, 'b.rowtime, 'c)
.where('a < 5 || 'a > 17)
.select('c, 'b)
.writeToSink(new CsvTableSink(path))
@@ -677,11 +677,10 @@ object RowCollector {
/** Converts a list of upsert messages into a list of final results. */
def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = {
- def getKeys(r: Row): List[String] =
- keys.foldLeft(List[String]())((k, i) => r.getField(i).toString :: k)
+ def getKeys(r: Row): Row = Row.project(r, keys)
- val upserted = results.foldLeft(Map[String, String]()){ (o: Map[String, String], r) =>
- val key = getKeys(r.f1).mkString("")
+ val upserted = results.foldLeft(Map[Row, String]()){ (o: Map[Row, String], r) =>
+ val key = getKeys(r.f1)
if (r.f0) {
o + (key -> r.f1.toString)
} else {
[2/2] flink git commit: [FLINK-6924] [table] Add Table API log()
function.
Posted by fh...@apache.org.
[FLINK-6924] [table] Add Table API log() function.
This closes #5638.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d38695b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d38695b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d38695b8
Branch: refs/heads/master
Commit: d38695b8e99d62777b2bca964a5c487a67e42331
Parents: 3adc21d
Author: Jiayi Liao <bu...@163.com>
Authored: Sun Mar 4 17:15:10 2018 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Apr 17 17:03:19 2018 +0200
----------------------------------------------------------------------
docs/dev/table/sql.md | 24 +++++++-------
docs/dev/table/tableApi.md | 26 +++++++++++++++
.../flink/table/api/scala/expressionDsl.scala | 14 ++++++--
.../table/expressions/ExpressionParser.scala | 9 ++++++
.../table/expressions/mathExpressions.scala | 22 +++++++++++++
.../flink/table/validate/FunctionCatalog.scala | 1 +
.../table/expressions/ScalarFunctionsTest.scala | 34 ++++++++++++++++----
7 files changed, 111 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 60ce03a..27d79af 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1363,6 +1363,19 @@ LOG10(numeric)
<tr>
<td>
+ {% highlight text %}
+LOG(x numeric)
+LOG(b numeric, x numeric)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the logarithm of a <i>numeric</i>.</p>
+ <p>If called with one parameter, this function returns the natural logarithm of <code>x</code>. If called with two parameters, this function returns the logarithm of <code>x</code> to the base <code>b</code>. <code>x</code> must be greater than 0. <code>b</code> must be greater than 1.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
{% highlight text %}
EXP(numeric)
{% endhighlight %}
@@ -1582,17 +1595,6 @@ RAND_INTEGER(seed integer, bound integer)
</tr>
<tr>
- <td>
- {% highlight text %}
-LOG(x numeric), LOG(base numeric, x numeric)
-{% endhighlight %}
- </td>
- <td>
- <p>Returns the natural logarithm of a specified number of a specified base. If called with one parameter, this function returns the natural logarithm of <code>x</code>. If called with two parameters, this function returns the logarithm of <code>x</code> to the base <code>b</code>. <code>x</code> must be greater than 0. <code>b</code> must be greater than 1.</p>
- </td>
- </tr>
-
- <tr>
<td>
{% highlight text %}
BIN(numeric)
http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 4e7c544..69aaeea 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2030,6 +2030,19 @@ NUMERIC.log10()
<p>Calculates the base 10 logarithm of given value.</p>
</td>
</tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+numeric1.log()
+numeric1.log(numeric2)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Calculates the logarithm of a given numeric value.</p>
+ <p>If called without a parameter, this function returns the natural logarithm of <code>numeric1</code>. If called with a parameter <code>numeric2</code>, this function returns the logarithm of <code>numeric1</code> to the base <code>numeric2</code>. <code>numeric1</code> must be greater than 0. <code>numeric2</code> must be greater than 1.</p>
+ </td>
+ </tr>
<tr>
<td>
@@ -3515,6 +3528,19 @@ NUMERIC.log10()
<tr>
<td>
{% highlight scala %}
+numeric1.log()
+numeric1.log(numeric2)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Calculates the logarithm of a given numeric value.</p>
+ <p>If called without a parameter, this function returns the natural logarithm of <code>numeric1</code>. If called with a parameter <code>numeric2</code>, this function returns the logarithm of <code>numeric1</code> to the base <code>numeric2</code>. <code>numeric1</code> must be greater than 0. <code>numeric2</code> must be greater than 1.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
NUMERIC.exp()
{% endhighlight %}
</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index f73442b..09e7ef9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -296,16 +296,26 @@ trait ImplicitExpressionOperations {
def exp() = Exp(expr)
/**
- * Calculates the base 10 logarithm of given value.
+ * Calculates the base 10 logarithm of the given value.
*/
def log10() = Log10(expr)
/**
- * Calculates the natural logarithm of given value.
+ * Calculates the natural logarithm of the given value.
*/
def ln() = Ln(expr)
/**
+ * Calculates the natural logarithm of the given value.
+ */
+ def log() = Log(null, expr)
+
+ /**
+ * Calculates the logarithm of the given value to the given base.
+ */
+ def log(base: Expression) = Log(base, expr)
+
+ /**
* Calculates the given number raised to the power of the other value.
*/
def power(other: Expression) = Power(expr, other)
http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index aa82464..faf6268 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -57,6 +57,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val EXTRACT: Keyword = Keyword("extract")
lazy val FLOOR: Keyword = Keyword("floor")
lazy val CEIL: Keyword = Keyword("ceil")
+ lazy val LOG: Keyword = Keyword("log")
lazy val YEARS: Keyword = Keyword("years")
lazy val YEAR: Keyword = Keyword("year")
lazy val MONTHS: Keyword = Keyword("months")
@@ -246,6 +247,12 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
case operand ~ _ ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
}
+ // required because op.log(base) changes order of a parameters
+ lazy val suffixLog: PackratParser[Expression] =
+ composite ~ "." ~ LOG ~ "(" ~ expression ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ base ~ _ => Log(base, operand)
+ }
+
lazy val suffixFunctionCall: PackratParser[Expression] =
composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
@@ -307,6 +314,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
// expressions that need special expression conversion
suffixAs | suffixTimeInterval | suffixRowInterval | suffixToTimestamp | suffixToTime |
suffixToDate |
+ // expression for log
+ suffixLog |
// expressions that take enumerations
suffixCast | suffixTrim | suffixTrimWithoutArgs | suffixExtract | suffixFloor | suffixCeil |
// expressions that take literals
http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
index 9867dee..0378ce5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -92,6 +92,28 @@ case class Log10(child: Expression) extends UnaryExpression with InputTypeSpec {
}
}
+case class Log(base: Expression, antilogarithm: Expression) extends Expression with InputTypeSpec {
+ def this(antilogarithm: Expression) = this(null, antilogarithm)
+
+ override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+ override private[flink] def children: Seq[Expression] =
+ if (base == null) Seq(antilogarithm) else Seq(base, antilogarithm)
+
+ override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+ Seq.fill(children.length)(DOUBLE_TYPE_INFO)
+
+ override def toString: String = s"log(${children.mkString(",")})"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(ScalarSqlFunctions.LOG, children.map(_.toRexNode))
+ }
+}
+
+object Log {
+ def apply(antilogarithm: Expression): Log = Log(null, antilogarithm)
+}
+
case class Ln(child: Expression) extends UnaryExpression with InputTypeSpec {
override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 055bd13..51aac27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -212,6 +212,7 @@ object FunctionCatalog {
"floor" -> classOf[Floor],
"log10" -> classOf[Log10],
"ln" -> classOf[Ln],
+ "log" -> classOf[Log],
"power" -> classOf[Power],
"mod" -> classOf[Mod],
"sqrt" -> classOf[Sqrt],
http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index d449fba..f3c48ac 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1286,30 +1286,52 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
@Test
def testLog(): Unit = {
- testSqlApi(
+ testAllApis(
+ 'f6.log(),
+ "f6.log",
"LOG(f6)",
"1.5260563034950492"
)
- testSqlApi(
- "LOG(f6-f6 + 10, f6-f6+100)",
+ testTableApi(
+ Log('f6),
+ "Log(f6)",
+ "1.5260563034950492"
+ )
+
+ testAllApis(
+ ('f6 - 'f6 + 100).log('f6 - 'f6 + 10),
+ "(f6 - f6 + 100).log(f6 - f6 + 10)",
+ "LOG(f6 - f6 + 10, f6 - f6 + 100)",
"2.0"
)
- testSqlApi(
+ testAllApis(
+ ('f6 + 20).log(),
+ "(f6+20).log",
"LOG(f6+20)",
"3.202746442938317"
)
- testSqlApi(
+ testAllApis(
+ 10.log(),
+ "10.log",
"LOG(10)",
"2.302585092994046"
)
- testSqlApi(
+ testAllApis(
+ 100.log(10),
+ "100.log(10)",
"LOG(10, 100)",
"2.0"
)
+
+ testTableApi(
+ Log(10, 100),
+ "Log(10, 100)",
+ "2.0"
+ )
}
// ----------------------------------------------------------------------------------------------