You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/04/17 19:52:45 UTC

[1/2] flink git commit: [FLINK-8366] [table] Fix UpsertTableSink tests.

Repository: flink
Updated Branches:
  refs/heads/master 518adb043 -> d38695b8e


[FLINK-8366] [table] Fix UpsertTableSink tests.

This closes #5244.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3adc21d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3adc21d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3adc21d4

Branch: refs/heads/master
Commit: 3adc21d489d78cd34748f2132e4e7659f65a33e4
Parents: 518adb0
Author: 军长 <he...@alibaba-inc.com>
Authored: Fri Jan 5 16:53:31 2018 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Apr 17 17:03:18 2018 +0200

----------------------------------------------------------------------
 .../table/runtime/stream/table/TableSinkITCase.scala     | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3adc21d4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index bda823e..3085c36 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -63,7 +63,7 @@ class TableSinkITCase extends AbstractTestBase {
     val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
     tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
 
-    val results = input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
+    input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
       .where('a < 3 || 'a > 19)
       .select('c, 't, 'b)
       .insertInto("targetTable")
@@ -96,7 +96,7 @@ class TableSinkITCase extends AbstractTestBase {
       .assignAscendingTimestamps(_._2)
       .map(x => x).setParallelism(4) // increase DOP to 4
 
-    val results = input.toTable(tEnv, 'a, 'b.rowtime, 'c)
+    input.toTable(tEnv, 'a, 'b.rowtime, 'c)
       .where('a < 5 || 'a > 17)
       .select('c, 'b)
       .writeToSink(new CsvTableSink(path))
@@ -677,11 +677,10 @@ object RowCollector {
   /** Converts a list of upsert messages into a list of final results. */
   def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = {
 
-    def getKeys(r: Row): List[String] =
-      keys.foldLeft(List[String]())((k, i) => r.getField(i).toString :: k)
+    def getKeys(r: Row): Row = Row.project(r, keys)
 
-    val upserted = results.foldLeft(Map[String, String]()){ (o: Map[String, String], r) =>
-      val key = getKeys(r.f1).mkString("")
+    val upserted = results.foldLeft(Map[Row, String]()){ (o: Map[Row, String], r) =>
+      val key = getKeys(r.f1)
       if (r.f0) {
         o + (key -> r.f1.toString)
       } else {


[2/2] flink git commit: [FLINK-6924] [table] Add Table API log() function.

Posted by fh...@apache.org.
[FLINK-6924] [table] Add Table API log() function.

This closes #5638.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d38695b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d38695b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d38695b8

Branch: refs/heads/master
Commit: d38695b8e99d62777b2bca964a5c487a67e42331
Parents: 3adc21d
Author: Jiayi Liao <bu...@163.com>
Authored: Sun Mar 4 17:15:10 2018 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Apr 17 17:03:19 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           | 24 +++++++-------
 docs/dev/table/tableApi.md                      | 26 +++++++++++++++
 .../flink/table/api/scala/expressionDsl.scala   | 14 ++++++--
 .../table/expressions/ExpressionParser.scala    |  9 ++++++
 .../table/expressions/mathExpressions.scala     | 22 +++++++++++++
 .../flink/table/validate/FunctionCatalog.scala  |  1 +
 .../table/expressions/ScalarFunctionsTest.scala | 34 ++++++++++++++++----
 7 files changed, 111 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 60ce03a..27d79af 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1363,6 +1363,19 @@ LOG10(numeric)
 
     <tr>
       <td>
+       {% highlight text %}
+LOG(x numeric)
+LOG(b numeric, x numeric)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the logarithm of a <i>numeric</i>.</p>
+        <p>If called with one parameter, this function returns the natural logarithm of <code>x</code>. If called with two parameters, this function returns the logarithm of <code>x</code> to the base <code>b</code>. <code>x</code> must be greater than 0. <code>b</code> must be greater than 1.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
         {% highlight text %}
 EXP(numeric)
 {% endhighlight %}
@@ -1582,17 +1595,6 @@ RAND_INTEGER(seed integer, bound integer)
    </tr>
 
     <tr>
-     <td>
-       {% highlight text %}
-LOG(x numeric), LOG(base numeric, x numeric)
-{% endhighlight %}
-     </td>
-    <td>
-      <p>Returns the natural logarithm of a specified number of a specified base. If called with one parameter, this function returns the natural logarithm of <code>x</code>. If called with two parameters, this function returns the logarithm of <code>x</code> to the base <code>b</code>. <code>x</code> must be greater than 0. <code>b</code> must be greater than 1.</p>
-    </td>
-   </tr>
-   
-    <tr>
       <td>
 {% highlight text %}
 BIN(numeric)

http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 4e7c544..69aaeea 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2030,6 +2030,19 @@ NUMERIC.log10()
         <p>Calculates the base 10 logarithm of given value.</p>
       </td>
     </tr>
+    
+    <tr>
+      <td>
+        {% highlight java %}
+numeric1.log()
+numeric1.log(numeric2)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Calculates the logarithm of a given numeric value.</p>
+        <p>If called without a parameter, this function returns the natural logarithm of <code>numeric1</code>. If called with a parameter <code>numeric2</code>, this function returns the logarithm of <code>numeric1</code> to the base <code>numeric2</code>. <code>numeric1</code> must be greater than 0. <code>numeric2</code> must be greater than 1.</p>
+      </td>
+    </tr>
 
     <tr>
       <td>
@@ -3515,6 +3528,19 @@ NUMERIC.log10()
     <tr>
       <td>
         {% highlight scala %}
+numeric1.log()
+numeric1.log(numeric2)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Calculates the logarithm of a given numeric value.</p>
+        <p>If called without a parameter, this function returns the natural logarithm of <code>numeric1</code>. If called with a parameter <code>numeric2</code>, this function returns the logarithm of <code>numeric1</code> to the base <code>numeric2</code>. <code>numeric1</code> must be greater than 0. <code>numeric2</code> must be greater than 1.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
 NUMERIC.exp()
 {% endhighlight %}
       </td>

http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index f73442b..09e7ef9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -296,16 +296,26 @@ trait ImplicitExpressionOperations {
   def exp() = Exp(expr)
 
   /**
-    * Calculates the base 10 logarithm of given value.
+    * Calculates the base 10 logarithm of the given value.
     */
   def log10() = Log10(expr)
 
   /**
-    * Calculates the natural logarithm of given value.
+    * Calculates the natural logarithm of the given value.
     */
   def ln() = Ln(expr)
 
   /**
+    * Calculates the natural logarithm of the given value.
+    */
+  def log() = Log(null, expr)
+
+  /**
+    * Calculates the logarithm of the given value to the given base.
+    */
+  def log(base: Expression) = Log(base, expr)
+
+  /**
     * Calculates the given number raised to the power of the other value.
     */
   def power(other: Expression) = Power(expr, other)

http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index aa82464..faf6268 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -57,6 +57,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val EXTRACT: Keyword = Keyword("extract")
   lazy val FLOOR: Keyword = Keyword("floor")
   lazy val CEIL: Keyword = Keyword("ceil")
+  lazy val LOG: Keyword = Keyword("log")
   lazy val YEARS: Keyword = Keyword("years")
   lazy val YEAR: Keyword = Keyword("year")
   lazy val MONTHS: Keyword = Keyword("months")
@@ -246,6 +247,12 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
       case operand ~ _  ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
     }
 
+  // required because op.log(base) changes order of a parameters
+  lazy val suffixLog: PackratParser[Expression] =
+    composite ~ "." ~ LOG ~ "(" ~ expression ~ ")" ^^ {
+      case operand ~ _ ~ _ ~ _ ~ base ~ _ => Log(base, operand)
+    }
+
   lazy val suffixFunctionCall: PackratParser[Expression] =
     composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
     case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
@@ -307,6 +314,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     // expressions that need special expression conversion
     suffixAs | suffixTimeInterval | suffixRowInterval | suffixToTimestamp | suffixToTime |
     suffixToDate |
+    // expression for log
+    suffixLog |
     // expressions that take enumerations
     suffixCast | suffixTrim | suffixTrimWithoutArgs | suffixExtract | suffixFloor | suffixCeil |
     // expressions that take literals

http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
index 9867dee..0378ce5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -92,6 +92,28 @@ case class Log10(child: Expression) extends UnaryExpression with InputTypeSpec {
   }
 }
 
+case class Log(base: Expression, antilogarithm: Expression) extends Expression with InputTypeSpec {
+  def this(antilogarithm: Expression) = this(null, antilogarithm)
+
+  override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  override private[flink] def children: Seq[Expression] =
+    if (base == null) Seq(antilogarithm) else Seq(base, antilogarithm)
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq.fill(children.length)(DOUBLE_TYPE_INFO)
+
+  override def toString: String = s"log(${children.mkString(",")})"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(ScalarSqlFunctions.LOG, children.map(_.toRexNode))
+  }
+}
+
+object Log {
+  def apply(antilogarithm: Expression): Log = Log(null, antilogarithm)
+}
+
 case class Ln(child: Expression) extends UnaryExpression with InputTypeSpec {
   override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 055bd13..51aac27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -212,6 +212,7 @@ object FunctionCatalog {
     "floor" -> classOf[Floor],
     "log10" -> classOf[Log10],
     "ln" -> classOf[Ln],
+    "log" -> classOf[Log],
     "power" -> classOf[Power],
     "mod" -> classOf[Mod],
     "sqrt" -> classOf[Sqrt],

http://git-wip-us.apache.org/repos/asf/flink/blob/d38695b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index d449fba..f3c48ac 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1286,30 +1286,52 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
 
   @Test
   def testLog(): Unit = {
-    testSqlApi(
+    testAllApis(
+      'f6.log(),
+      "f6.log",
       "LOG(f6)",
       "1.5260563034950492"
     )
 
-    testSqlApi(
-      "LOG(f6-f6 + 10, f6-f6+100)",
+    testTableApi(
+      Log('f6),
+      "Log(f6)",
+      "1.5260563034950492"
+    )
+
+    testAllApis(
+      ('f6 - 'f6 + 100).log('f6 - 'f6 + 10),
+      "(f6 - f6 + 100).log(f6 - f6 + 10)",
+      "LOG(f6 - f6 + 10, f6 - f6 + 100)",
       "2.0"
     )
 
-    testSqlApi(
+    testAllApis(
+      ('f6 + 20).log(),
+      "(f6+20).log",
       "LOG(f6+20)",
       "3.202746442938317"
     )
 
-    testSqlApi(
+    testAllApis(
+      10.log(),
+      "10.log",
       "LOG(10)",
       "2.302585092994046"
     )
 
-    testSqlApi(
+    testAllApis(
+      100.log(10),
+      "100.log(10)",
       "LOG(10, 100)",
       "2.0"
     )
+
+    testTableApi(
+      Log(10, 100),
+      "Log(10, 100)",
+      "2.0"
+    )
   }
 
   // ----------------------------------------------------------------------------------------------