You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/24 10:46:48 UTC

[1/2] flink git commit: [FLINK-6892] [table] Add LPAD/RPAD to Table API

Repository: flink
Updated Branches:
  refs/heads/master f1e4d25c1 -> 2e11a404c


[FLINK-6892] [table] Add LPAD/RPAD to Table API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e11a404
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e11a404
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e11a404

Branch: refs/heads/master
Commit: 2e11a404c876f64df042637175351534320f0512
Parents: a11dd2c
Author: twalthr <tw...@apache.org>
Authored: Wed Jan 24 11:38:19 2018 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Jan 24 11:40:23 2018 +0100

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |  4 +--
 docs/dev/table/tableApi.md                      | 25 ++++++++++++++
 .../flink/table/api/scala/expressionDsl.scala   | 16 +++++++++
 .../table/expressions/stringExpressions.scala   | 34 ++++++++++++++++++++
 .../runtime/functions/ScalarFunctions.scala     | 31 +++++++++---------
 .../flink/table/validate/FunctionCatalog.scala  |  2 ++
 .../table/expressions/ScalarFunctionsTest.scala | 13 ++++++++
 7 files changed, 107 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2e11a404/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 9003ec6..eb7204a 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1765,7 +1765,7 @@ LPAD(text string, len integer, pad string)
 {% endhighlight %}
       </td>
       <td>
-        <p>Returns the string text, left-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. <code>LPAD('hi',4,'??')</code> returns <code>??hi</code> <code>LPAD('hi',1,'??')</code> returns <code>h</code></p>
+        <p>Returns the string text left-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. <code>LPAD('hi',4,'??')</code> returns <code>??hi</code>, <code>LPAD('hi',1,'??')</code> returns <code>h</code>.</p>
       </td>
     </tr>
     <tr>
@@ -1775,7 +1775,7 @@ RPAD(text string, len integer, pad string)
 {% endhighlight %}
       </td>
       <td>
-        <p>Returns the string text, right-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. <code>RPAD('hi',4,'??')</code> returns <code>hi??</code> <code>RPAD('hi',1,'??')</code> returns <code>h</code></p>
+        <p>Returns the string text right-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. <code>RPAD('hi',4,'??')</code> returns <code>hi??</code>, <code>RPAD('hi',1,'??')</code> returns <code>h</code>.</p>
       </td>
     </tr>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2e11a404/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index a200c7e..0408c3c 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2390,6 +2390,31 @@ STRING.initCap()
         <p>Converts the initial letter of each word in a string to uppercase. Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace.</p>
       </td>
     </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+STRING.lpad(len INT, pad STRING)
+{% endhighlight %}
+      </td>
+
+      <td>
+        <p>Returns a string left-padded with the given pad string to a length of len characters. If the string is longer than len, the return value is shortened to len characters. E.g. "hi".lpad(4, '??') returns "??hi",  "hi".lpad(1, '??') returns "h".</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+STRING.rpad(len INT, pad STRING)
+{% endhighlight %}
+      </td>
+
+      <td>
+        <p>Returns a string right-padded with the given pad string to a length of len characters. If the string is longer than len, the return value is shortened to len characters. E.g. "hi".rpad(4, '??') returns "hi??",  "hi".rpad(1, '??') returns "h".</p>
+      </td>
+    </tr>
+
     <tr>
       <td>
         {% highlight text %}

http://git-wip-us.apache.org/repos/asf/flink/blob/2e11a404/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 820303f..c520433 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -476,6 +476,22 @@ trait ImplicitExpressionOperations {
   def position(haystack: Expression) = Position(expr, haystack)
 
   /**
+    * Returns a string left-padded with the given pad string to a length of len characters. If
+    * the string is longer than len, the return value is shortened to len characters.
+    *
+    * e.g. "hi".lpad(4, '??') returns "??hi",  "hi".lpad(1, '??') returns "h"
+    */
+  def lpad(len: Expression, pad: Expression) = Lpad(expr, len, pad)
+
+  /**
+    * Returns a string right-padded with the given pad string to a length of len characters. If
+    * the string is longer than len, the return value is shortened to len characters.
+    *
+    * e.g. "hi".rpad(4, '??') returns "hi??",  "hi".rpad(1, '??') returns "h"
+    */
+  def rpad(len: Expression, pad: Expression) = Rpad(expr, len, pad)
+
+  /**
     * For windowing function to config over window
     * e.g.:
     * table

http://git-wip-us.apache.org/repos/asf/flink/blob/2e11a404/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
index 69c7058..e69485f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
@@ -323,3 +323,37 @@ case class ConcatWs(separator: Expression, strings: Seq[Expression])
     relBuilder.call(ScalarSqlFunctions.CONCAT_WS, children.map(_.toRexNode))
   }
 }
+
+case class Lpad(text: Expression, len: Expression, pad: Expression)
+  extends Expression with InputTypeSpec {
+
+  override private[flink] def children: Seq[Expression] = Seq(text, len, pad)
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+
+  override def toString: String = s"($text).lpad($len, $pad)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(ScalarSqlFunctions.LPAD, children.map(_.toRexNode))
+  }
+}
+
+case class Rpad(text: Expression, len: Expression, pad: Expression)
+  extends Expression with InputTypeSpec {
+
+  override private[flink] def children: Seq[Expression] = Seq(text, len, pad)
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.STRING_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
+    Seq(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+
+  override def toString: String = s"($text).rpad($len, $pad)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(ScalarSqlFunctions.RPAD, children.map(_.toRexNode))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2e11a404/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 87e58a5..2e7c9f6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -109,17 +109,17 @@ object ScalarFunctions {
   }
 
   /**
-    * Returns the string str, left-padded with the string pad to a length of len characters.
+    * Returns the string str left-padded with the string pad to a length of len characters.
     * If str is longer than len, the return value is shortened to len characters.
     */
   def lpad(base: String, len: Integer, pad: String): String = {
     if (len < 0) {
       return null
+    } else if (len == 0) {
+      return ""
     }
-    var data = "".toCharArray
-    if (data.length < len) {
-      data = new Array[Char](len)
-    }
+
+    val data = new Array[Char](len)
     val baseChars = base.toCharArray
     val padChars = pad.toCharArray
 
@@ -129,12 +129,10 @@ object ScalarFunctions {
     // Copy the padding
     var i = 0
     while (i < pos) {
-      {
-        var j = 0
-        while (j < pad.length && j < pos - i) {
-          data(i + j) = padChars(j)
-          j += 1
-        }
+      var j = 0
+      while (j < pad.length && j < pos - i) {
+        data(i + j) = padChars(j)
+        j += 1
       }
       i += pad.length
     }
@@ -145,21 +143,22 @@ object ScalarFunctions {
       data(pos + i) = baseChars(i)
       i += 1
     }
+
     new String(data)
   }
 
   /**
-    * Returns the string str, right-padded with the string pad to a length of len characters.
+    * Returns the string str right-padded with the string pad to a length of len characters.
     * If str is longer than len, the return value is shortened to len characters.
     */
   def rpad(base: String, len: Integer, pad: String): String = {
     if (len < 0) {
       return null
+    } else if (len == 0) {
+      return ""
     }
-    var data = "".toCharArray
-    if (data.length < len) {
-      data = new Array[Char](len)
-    }
+
+    val data = new Array[Char](len)
     val baseChars = base.toCharArray
     val padChars = pad.toCharArray
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2e11a404/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 37df23a..add393d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -193,6 +193,8 @@ object FunctionCatalog {
     "overlay" -> classOf[Overlay],
     "concat" -> classOf[Concat],
     "concat_ws" -> classOf[ConcatWs],
+    "lpad" -> classOf[Lpad],
+    "rpad" -> classOf[Rpad],
 
     // math functions
     "plus" -> classOf[Plus],

http://git-wip-us.apache.org/repos/asf/flink/blob/2e11a404/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index e4f88b5..281dd90 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -362,7 +362,14 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     testSqlApi("LPAD(f33,1,'??')", "null")
     testSqlApi("LPAD('\u0061\u0062',1,'??')", "a") // the unicode of ab is \u0061\u0062
     testSqlApi("LPAD('⎨⎨',1,'??')", "⎨")
+    testSqlApi("LPAD('äääääääää',2,'??')", "ää")
+    testSqlApi("LPAD('äääääääää',10,'??')", "?äääääääää")
 
+    testAllApis(
+      "äää".lpad(13, "12345"),
+      "'äää'.lpad(13, '12345')",
+      "LPAD('äää',13,'12345')",
+      "1234512345äää")
   }
 
   @Test
@@ -375,6 +382,12 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     testSqlApi("RPAD(f33,1,'??')", "null")
     testSqlApi("RPAD('\u0061\u0062',1,'??')", "a") // the unicode of ab is \u0061\u0062
     testSqlApi("RPAD('üö',1,'??')", "ü")
+
+    testAllApis(
+      "äää".rpad(13, "12345"),
+      "'äää'.rpad(13, '12345')",
+      "RPAD('äää',13,'12345')",
+      "äää1234512345")
   }
 
   @Test


[2/2] flink git commit: [FLINK-6892] [table] Add L/RPAD support in SQL

Posted by tw...@apache.org.
[FLINK-6892] [table] Add L/RPAD support in SQL

This closes #4127.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a11dd2c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a11dd2c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a11dd2c4

Branch: refs/heads/master
Commit: a11dd2c4cb297cd6677a7103a28e5d7888ec5711
Parents: f1e4d25
Author: sunjincheng121 <su...@gmail.com>
Authored: Thu Jun 15 13:23:32 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Wed Jan 24 11:40:23 2018 +0100

----------------------------------------------------------------------
 docs/dev/table/sql.md                           | 21 +++++
 .../table/codegen/calls/BuiltInMethods.scala    | 13 ++++
 .../table/codegen/calls/FunctionGenerator.scala | 11 +++
 .../functions/sql/ScalarSqlFunctions.scala      | 18 +++++
 .../runtime/functions/ScalarFunctions.scala     | 82 +++++++++++++++++++-
 .../flink/table/validate/FunctionCatalog.scala  |  3 +
 .../table/expressions/ScalarFunctionsTest.scala | 25 ++++++
 7 files changed, 169 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 42b3965..9003ec6 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1755,6 +1755,27 @@ CONCAT_WS(separator, string1, string2,...)
       </td>
       <td>
         <p>Returns the string that results from concatenating the arguments using a separator. The separator is added between the strings to be concatenated. Returns NULL If the separator is NULL. CONCAT_WS() does not skip empty strings. However, it does skip any NULL argument. E.g. <code>CONCAT_WS("~", "AA", "BB", "", "CC")</code> returns <code>AA~BB~~CC</code></p>
+  </td>
+    </tr>
+
+        <tr>
+      <td>
+        {% highlight text %}
+LPAD(text string, len integer, pad string)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the string text, left-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. <code>LPAD('hi',4,'??')</code> returns <code>??hi</code> <code>LPAD('hi',1,'??')</code> returns <code>h</code></p>
+      </td>
+    </tr>
+    <tr>
+      <td>
+        {% highlight text %}
+RPAD(text string, len integer, pad string)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the string text, right-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. <code>RPAD('hi',4,'??')</code> returns <code>hi??</code> <code>RPAD('hi',1,'??')</code> returns <code>h</code></p>
       </td>
     </tr>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 6791e1b..ea4f0fd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -96,5 +96,18 @@ object BuiltInMethods {
     Types.lookupMethod(
       classOf[ScalarFunctions], "concat_ws", classOf[String], classOf[Array[String]])
 
+  val LPAD = Types.lookupMethod(
+    classOf[ScalarFunctions],
+    "lpad",
+    classOf[String],
+    classOf[Integer],
+    classOf[String])
+  val RPAD = Types.lookupMethod(
+    classOf[ScalarFunctions],
+    "rpad",
+    classOf[String],
+    classOf[Integer],
+    classOf[String])
+
   val BIN = Types.lookupMethod(classOf[JLong], "toBinaryString", classOf[Long])
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 0a2789f..412cdfc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -512,6 +512,17 @@ object FunctionGenerator {
     Seq(SqlTimeTypeInfo.TIMESTAMP, STRING_TYPE_INFO),
     new DateFormatCallGen
   )
+  addSqlFunctionMethod(
+    ScalarSqlFunctions.LPAD,
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.LPAD)
+
+  addSqlFunctionMethod(
+    ScalarSqlFunctions.RPAD,
+    Seq(STRING_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO),
+    STRING_TYPE_INFO,
+    BuiltInMethods.RPAD)
 
   // ----------------------------------------------------------------------------------------------
   // Cryptographic Hash functions

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
index 05881b3..891aba9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ScalarSqlFunctions.scala
@@ -67,6 +67,24 @@ object ScalarSqlFunctions {
       OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)),
     SqlFunctionCategory.NUMERIC)
 
+  val LPAD = new SqlFunction(
+    "LPAD",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.cascade(
+      ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE),
+    null,
+    OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER),
+    SqlFunctionCategory.STRING)
+
+  val RPAD = new SqlFunction(
+    "RPAD",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.cascade(
+      ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.FORCE_NULLABLE),
+    null,
+    OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER),
+    SqlFunctionCategory.STRING)
+
   val MD5 = new SqlFunction(
     "MD5",
     SqlKind.OTHER_FUNCTION,

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 36edba2..87e58a5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -89,8 +89,7 @@ object ScalarFunctions {
   def log(x: Double): Double = {
     if (x <= 0.0) {
       throw new IllegalArgumentException(s"x of 'log(x)' must be > 0, but x = $x")
-    }
-    else {
+    } else {
       Math.log(x)
     }
   }
@@ -104,9 +103,84 @@ object ScalarFunctions {
     }
     if (base <= 1.0) {
       throw new IllegalArgumentException(s"base of 'log(base, x)' must be > 1, but base = $base")
-    }
-    else {
+    } else {
       Math.log(x) / Math.log(base)
     }
   }
+
+  /**
+    * Returns the string str, left-padded with the string pad to a length of len characters.
+    * If str is longer than len, the return value is shortened to len characters.
+    */
+  def lpad(base: String, len: Integer, pad: String): String = {
+    if (len < 0) {
+      return null
+    }
+    var data = "".toCharArray
+    if (data.length < len) {
+      data = new Array[Char](len)
+    }
+    val baseChars = base.toCharArray
+    val padChars = pad.toCharArray
+
+    // The length of the padding needed
+    val pos = Math.max(len - base.length, 0)
+
+    // Copy the padding
+    var i = 0
+    while (i < pos) {
+      {
+        var j = 0
+        while (j < pad.length && j < pos - i) {
+          data(i + j) = padChars(j)
+          j += 1
+        }
+      }
+      i += pad.length
+    }
+
+    // Copy the base
+    i = 0
+    while (pos + i < len && i < base.length) {
+      data(pos + i) = baseChars(i)
+      i += 1
+    }
+    new String(data)
+  }
+
+  /**
+    * Returns the string str, right-padded with the string pad to a length of len characters.
+    * If str is longer than len, the return value is shortened to len characters.
+    */
+  def rpad(base: String, len: Integer, pad: String): String = {
+    if (len < 0) {
+      return null
+    }
+    var data = "".toCharArray
+    if (data.length < len) {
+      data = new Array[Char](len)
+    }
+    val baseChars = base.toCharArray
+    val padChars = pad.toCharArray
+
+    var pos = 0
+
+    // Copy the base
+    while (pos < base.length && pos < len) {
+      data(pos) = baseChars(pos)
+      pos += 1
+    }
+
+    // Copy the padding
+    while (pos < len) {
+      var i = 0
+      while (i < pad.length && i < len - pos) {
+        data(pos + i) = padChars(i)
+        i += 1
+      }
+      pos += pad.length
+    }
+
+    new String(data)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index fb359c2..37df23a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -420,9 +420,12 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     ScalarSqlFunctions.BIN,
     SqlStdOperatorTable.TIMESTAMP_ADD,
     ScalarSqlFunctions.LOG,
+    ScalarSqlFunctions.LPAD,
+    ScalarSqlFunctions.RPAD,
     ScalarSqlFunctions.MD5,
     ScalarSqlFunctions.SHA1,
     ScalarSqlFunctions.SHA256,
+
     // EXTENSIONS
     BasicOperatorTable.TUMBLE,
     BasicOperatorTable.HOP,

http://git-wip-us.apache.org/repos/asf/flink/blob/a11dd2c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 79dc67c..e4f88b5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -353,6 +353,31 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   }
 
   @Test
+  def testLPad(): Unit = {
+    testSqlApi("LPAD('hi',4,'??')", "??hi")
+    testSqlApi("LPAD('hi',1,'??')", "h")
+    testSqlApi("LPAD('',1,'??')", "?")
+    testSqlApi("LPAD('',30,'??')", "??????????????????????????????")
+    testSqlApi("LPAD('111',-2,'??')", "null")
+    testSqlApi("LPAD(f33,1,'??')", "null")
+    testSqlApi("LPAD('\u0061\u0062',1,'??')", "a") // the unicode of ab is \u0061\u0062
+    testSqlApi("LPAD('⎨⎨',1,'??')", "⎨")
+
+  }
+
+  @Test
+  def testRPad(): Unit = {
+    testSqlApi("RPAD('hi',4,'??')", "hi??")
+    testSqlApi("RPAD('hi',1,'??')", "h")
+    testSqlApi("RPAD('',1,'??')", "?")
+    testSqlApi("RPAD('1',30,'??')", "1?????????????????????????????")
+    testSqlApi("RPAD('111',-2,'??')", "null")
+    testSqlApi("RPAD(f33,1,'??')", "null")
+    testSqlApi("RPAD('\u0061\u0062',1,'??')", "a") // the unicode of ab is \u0061\u0062
+    testSqlApi("RPAD('üö',1,'??')", "ü")
+  }
+
+  @Test
   def testBin(): Unit = {
 
     testAllApis(