You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/06/23 09:44:11 UTC

[1/2] flink git commit: [FLINK-6237] [table] Performance improvements and code clean up

Repository: flink
Updated Branches:
  refs/heads/master 44ae21d7e -> a0b781461


[FLINK-6237] [table] Performance improvements and code clean up


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0b78146
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0b78146
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0b78146

Branch: refs/heads/master
Commit: a0b781461bcf8c2f1d00b93464995f03eda589f1
Parents: 6230cc5
Author: twalthr <tw...@apache.org>
Authored: Fri Jun 23 11:22:30 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Fri Jun 23 11:43:31 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |  44 +++
 docs/dev/table/tableApi.md                      | 332 +++++--------------
 .../flink/table/api/scala/expressionDsl.scala   |  68 ++--
 .../flink/table/codegen/CodeGenerator.scala     |  39 ++-
 .../table/codegen/calls/FunctionGenerator.scala |  40 +--
 .../flink/table/codegen/calls/RandCallGen.scala |  88 ++---
 .../table/codegen/calls/ScalarOperators.scala   |  66 +---
 .../apache/flink/table/codegen/generated.scala  |  12 +-
 .../table/expressions/mathExpressions.scala     |  63 +++-
 .../apache/flink/table/expressions/random.scala |  87 -----
 .../flink/table/validate/FunctionCatalog.scala  |  14 +-
 .../table/api/java/batch/sql/SqlITCase.java     | 125 -------
 .../table/api/java/stream/sql/SqlITCase.java    |  46 ---
 .../table/expressions/ScalarFunctionsTest.scala | 125 ++-----
 14 files changed, 332 insertions(+), 817 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 26f4f1b..931a7ca 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -1386,6 +1386,50 @@ PI()
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight text %}
+RAND()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
+RAND(seed integer)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed. Two RAND functions will return identical sequences of numbers if they have same initial seed.</p>
+      </td>
+    </tr>
+
+    <tr>
+     <td>
+       {% highlight text %}
+RAND_INTEGER(bound integer)
+{% endhighlight %}
+     </td>
+    <td>
+      <p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive).</p>
+    </td>
+   </tr>
+
+    <tr>
+     <td>
+       {% highlight text %}
+RAND_INTEGER(seed integer, bound integer)
+{% endhighlight %}
+     </td>
+    <td>
+      <p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed. Two RAND_INTEGER functions will return identical sequences of numbers if they have same initial seed and same bound.</p>
+    </td>
+   </tr>
+
   </tbody>
 </table>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index e379078..34cee24 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1411,12 +1411,8 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov
 // Bounded Event-time over window (assuming an event-time attribute "rowtime")
 .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
 
-<<<<<<< HEAD:docs/dev/table/tableApi.md
 // Bounded Processing-time over window (assuming a processing-time attribute "proctime")
 .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
-=======
-A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUN
 T, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, F
 REE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, O
 N, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE
 , SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION,
  TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE, RAND, RAND_INTEGER
->>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation:docs/dev/table_api.md
 
 // Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
 .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
@@ -2071,6 +2067,50 @@ pi()
         <p>Returns a value that is closer than any other value to pi.</p>
       </td>
     </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+rand()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+rand(seed integer)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed. Two rand functions will return identical sequences of numbers if they have same initial seed.</p>
+      </td>
+    </tr>
+
+    <tr>
+     <td>
+       {% highlight java %}
+randInteger(bound integer)
+{% endhighlight %}
+     </td>
+    <td>
+      <p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive).</p>
+    </td>
+   </tr>
+
+    <tr>
+     <td>
+       {% highlight java %}
+randInteger(seed integer, bound integer)
+{% endhighlight %}
+     </td>
+    <td>
+      <p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed. Two randInteger functions will return identical sequences of numbers if they have same initial seed and same bound.</p>
+    </td>
+   </tr>
     
   </tbody>
 </table>
@@ -3280,6 +3320,50 @@ pi()
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight scala %}
+rand()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+rand(seed integer)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed. Two rand functions will return identical sequences of numbers if they have same initial seed.</p>
+      </td>
+    </tr>
+
+    <tr>
+     <td>
+       {% highlight scala %}
+randInteger(bound integer)
+{% endhighlight %}
+     </td>
+    <td>
+      <p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive).</p>
+    </td>
+   </tr>
+
+    <tr>
+     <td>
+       {% highlight scala %}
+randInteger(seed integer, bound integer)
+{% endhighlight %}
+     </td>
+    <td>
+      <p>Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed. Two randInteger functions will return identical sequences of numbers if they have same initial seed and same bound.</p>
+    </td>
+   </tr>
+
   </tbody>
 </table>
 
@@ -3938,246 +4022,6 @@ ARRAY.element()
   </tbody>
 </table>
 
-<<<<<<< HEAD:docs/dev/table/tableApi.md
-=======
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 40%">Random functions</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-
-  <tbody>
-    <tr>
-      <td>
-        {% highlight text %}
-RAND()
-{% endhighlight %}
-      </td>
-      <td>
-        <p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).</p>
-      </td>
-    </tr>
-
-    <tr>
-      <td>
-        {% highlight text %}
-RAND(seed integer)
-{% endhighlight %}
-      </td>
-      <td>
-        <p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed. Two RAND functions will get identical sequences of numbers if they have same initial seed.</p>
-      </td>
-    </tr>
-
-    <tr>
-     <td>
-       {% highlight text %}
-RAND_INTEGER(bound integer)
-{% endhighlight %}
-     </td>
-    <td>
-      <p>Returns a pseudorandom int value between 0.0 (inclusive) and the specified value (exclusive).</p>
-    </td>
-   </tr>
-
-    <tr>
-     <td>
-       {% highlight text %}
-RAND_INTEGER(seed integer, bound integer)
-{% endhighlight %}
-     </td>
-    <td>
-      <p>Returns a pseudorandom int value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed. Two RAND_INTEGER functions will get identical sequences of numbers if they have same initial seed and same bound.</p>
-    </td>
-   </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-{% top %}
-
-User-defined Functions
-----------------
-
-### User-defined Scalar Functions
-
-If a required scalar function is not contained in the built-in functions, it is possible to define custom, user-defined scalar functions for both the Table API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.
-
-In order to define a scalar function one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named `eval`.
-
-The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public class HashCode extends ScalarFunction {
-  private int factor = 12;
-  
-  public HashCode(int factor) {
-      this.factor = factor;
-  }
-  
-  public int eval(String s) {
-      return s.hashCode() * factor;
-  }
-}
-
-BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-// register the function
-tableEnv.registerFunction("hashCode", new HashCode(10))
-
-// use the function in Java Table API
-myTable.select("string, string.hashCode(), hashCode(string)");
-
-// use the function in SQL API
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-// must be defined in static/object context
-class HashCode(factor: Int) extends ScalarFunction {
-  def eval(s: String): Int = {
-    s.hashCode() * factor
-  }
-}
-
-val tableEnv = TableEnvironment.getTableEnvironment(env)
-
-// use the function in Scala Table API
-val hashCode = new HashCode(10)
-myTable.select('string, hashCode('string))
-
-// register and use the function in SQL
-tableEnv.registerFunction("hashCode", new HashCode(10))
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
-{% endhighlight %}
-</div>
-</div>
-
-By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.
-
-Internally, the Table API and SQL code generation works with primitive values as much as possible. If a user-defined scalar function should not introduce much overhead through object creation/casting during runtime, it is recommended to declare parameters and result types as primitive types instead of their boxed classes. `Types.DATE` and `Types.TIME` can also be represented as `int`. `Types.TIMESTAMP` can be represented as `long`.
-
-The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding `ScalarFunction#getResultType()` we define that the returned long value should be interpreted as a `Types.TIMESTAMP` by the code generation.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public static class TimestampModifier extends ScalarFunction {
-  public long eval(long t) {
-    return t % 1000;
-  }
-
-  public TypeInformation<?> getResultType(signature: Class<?>[]) {
-    return Types.TIMESTAMP;
-  }
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-object TimestampModifier extends ScalarFunction {
-  def eval(t: Long): Long = {
-    t % 1000
-  }
-
-  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
-    Types.TIMESTAMP
-  }
-}
-{% endhighlight %}
-</div>
-</div>
-
-### User-defined Table Functions
-
-Similar to a user-defined scalar function, a user-defined table function takes zero, one, or multiple scalar values as input parameters. However in contrast to a scalar function, it can return an arbitrary number of rows as output instead of a single value. The returned rows may consist of one or more columns. 
-
-In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table function. The type of the returned table is determined by the generic type of `TableFunction`. Evaluation methods emit output rows using the protected `collect(T)` method.
-
-In the Table API, a table function is used with `.join(Expression)` or `.leftOuterJoin(Expression)` for Scala users and `.join(String)` or `.leftOuterJoin(String)` for Java users. The `join` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoin` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator) and preserves outer rows for which the table function returns an empty table. In SQL use `LATERAL TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join condition (see examples below).
-
-The following example shows how to define table-valued function, register it in the TableEnvironment, and call it in a query. Note that you can configure your table function via a constructor before it is registered: 
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
-public class Split extends TableFunction<Tuple2<String, Integer>> {
-    private String separator = " ";
-    
-    public Split(String separator) {
-        this.separator = separator;
-    }
-    
-    public void eval(String str) {
-        for (String s : str.split(separator)) {
-            // use collect(...) to emit a row
-            collect(new Tuple2<String, Integer>(s, s.length()));
-        }
-    }
-}
-
-BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-Table myTable = ...         // table schema: [a: String]
-
-// Register the function.
-tableEnv.registerFunction("split", new Split("#"));
-
-// Use the table function in the Java Table API. "as" specifies the field names of the table.
-myTable.join("split(a) as (word, length)").select("a, word, length");
-myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");
-
-// Use the table function in SQL with LATERAL and TABLE keywords.
-// CROSS JOIN a table function (equivalent to "join" in Table API).
-tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
-// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
-tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-// The generic type "(String, Int)" determines the schema of the returned table as (String, Integer).
-class Split(separator: String) extends TableFunction[(String, Int)] {
-  def eval(str: String): Unit = {
-    // use collect(...) to emit a row.
-    str.split(separator).foreach(x -> collect((x, x.length))
-  }
-}
-
-val tableEnv = TableEnvironment.getTableEnvironment(env)
-val myTable = ...         // table schema: [a: String]
-
-// Use the table function in the Scala Table API (Note: No registration required in Scala Table API).
-val split = new Split("#")
-// "as" specifies the field names of the generated table.
-myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length);
-myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length);
-
-// Register the table function to use it in SQL queries.
-tableEnv.registerFunction("split", new Split("#"))
-
-// Use the table function in SQL with LATERAL and TABLE keywords.
-// CROSS JOIN a table function (equivalent to "join" in Table API)
-tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
-// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
-tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
-{% endhighlight %}
-**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
-</div>
-</div>
-
-Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
->>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation:docs/dev/table_api.md
 
 
 <table class="table table-bordered">

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 1e5a579..0fb940f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -22,18 +22,11 @@ import java.sql.{Date, Time, Timestamp}
 
 import org.apache.calcite.avatica.util.DateTimeUtils._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-<<<<<<< HEAD
 import org.apache.flink.table.api.{TableException, CurrentRow, CurrentRange, UnboundedRow, UnboundedRange}
 import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.AggregateFunction
-=======
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
-import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
-import org.apache.flink.table.expressions._
->>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation
 
 import scala.language.implicitConversions
 
@@ -917,7 +910,6 @@ object array {
   }
 }
 
-<<<<<<< HEAD
 /**
   * Returns a value that is closer than any other value to pi.
   */
@@ -928,44 +920,52 @@ object pi {
     */
   def apply(): Expression = {
     Pi()
-=======
-object rand {
-
-  def apply(expr: Expression*): Expression = {
-    expr.size match {
-      case 0 => new Rand()
-      case 1 => Rand(expr.head)
-      case _ => throw new ValidationException("Invalid arguments for rand([seed]).")
-    }
   }
 }
 
-object rand_integer {
+/**
+  * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
+  */
+object rand {
 
-  def apply(expr: Expression*): Expression = {
-    expr.size match {
-      case 1 => new RandInteger(expr.head)
-      case 2 => RandInteger(expr.head, expr(1))
-      case _ =>
-        throw new ValidationException("Invalid arguments for rand_integer([seed, ] bound).")
-    }
-<<<<<<< HEAD
->>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation
-=======
+  /**
+    * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).
+    */
+  def apply(): Expression = {
+    new Rand()
+  }
+
+  /**
+    * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a
+    * initial seed. Two rand() functions will return identical sequences of numbers if they
+    * have same initial seed.
+    */
+  def apply(seed: Expression): Expression = {
+    Rand(seed)
   }
 }
 
 /**
-  * Returns a value that is closer than any other value to pi.
+  * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified
+  * value (exclusive).
   */
-object pi {
+object randInteger {
 
   /**
-    * Returns a value that is closer than any other value to pi.
+    * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified
+    * value (exclusive).
     */
-  def apply(): Expression = {
-    Pi()
->>>>>>> fix merging error
+  def apply(bound: Expression): Expression = {
+   new RandInteger(bound)
+  }
+
+  /**
+    * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value
+    * (exclusive) with a initial seed. Two randInteger() functions will return identical sequences
+    * of numbers if they have same initial seed and same bound.
+    */
+  def apply(seed: Expression, bound: Expression): Expression = {
+    RandInteger(seed, bound)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 343a3c4..e7dc033 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -43,15 +43,11 @@ import org.apache.flink.table.codegen.Indenter.toISC
 import org.apache.flink.table.codegen.calls.FunctionGenerator
 import org.apache.flink.table.codegen.calls.ScalarOperators._
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-<<<<<<< HEAD
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString}
 import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction}
-=======
-import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
->>>>>>> [FLINK-6237] [table] support RAND and RAND_INTEGER on SQL
 import org.apache.flink.table.runtime.TableFunctionCollector
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.types.Row
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod, signatureToString}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -1730,7 +1726,7 @@ class CodeGenerator(
         |$resultTypeTerm $resultTerm = $defaultValue;
         |boolean $nullTerm = true;
         |""".stripMargin
-      GeneratedExpression(resultTerm, nullTerm, wrappedCode, resultType)
+      GeneratedExpression(resultTerm, nullTerm, wrappedCode, resultType, literal = true)
     } else {
       throw new CodeGenException("Null literals are not allowed if nullCheck is disabled.")
     }
@@ -1755,7 +1751,7 @@ class CodeGenerator(
         |""".stripMargin
     }
 
-    GeneratedExpression(resultTerm, nullTerm, resultCode, literalType)
+    GeneratedExpression(resultTerm, nullTerm, resultCode, literalType, literal = true)
   }
 
   private[flink] def generateSymbol(enum: Enum[_]): GeneratedExpression = {
@@ -1965,34 +1961,37 @@ class CodeGenerator(
   /**
     * Adds a reusable [[java.util.Random]] to the member area of the generated [[Function]].
     *
+    * The seed parameter must be a literal/constant expression.
+    *
     * @return member variable term
     */
-  def addReusableRandom(seedExpr: GeneratedExpression): String = {
+  def addReusableRandom(seedExpr: Option[GeneratedExpression]): String = {
     val fieldTerm = newName("random")
 
     val field =
       s"""
-         |final java.util.Random $fieldTerm;
+         |transient java.util.Random $fieldTerm;
          |""".stripMargin
     reusableMemberStatements.add(field)
 
-    val fieldInit = if (seedExpr != null && nullCheck) {
-      s"""
-         |${seedExpr.code}
-         |if(!${seedExpr.nullTerm}) {
-         |  $fieldTerm = new java.util.Random(${seedExpr.resultTerm});
+    val fieldInit = seedExpr match {
+      case Some(s) if nullCheck =>
+        s"""
+         |${s.code}
+         |if(!${s.nullTerm}) {
+         |  $fieldTerm = new java.util.Random(${s.resultTerm});
          |}
          |else {
          |  $fieldTerm = new java.util.Random();
          |}
          |""".stripMargin
-    } else if (seedExpr != null) {
-      s"""
-         |${seedExpr.code}
-         |$fieldTerm = new java.util.Random(${seedExpr.resultTerm});
+      case Some(s) =>
+        s"""
+         |${s.code}
+         |$fieldTerm = new java.util.Random(${s.resultTerm});
          |""".stripMargin
-    } else {
-      s"""
+      case _ =>
+        s"""
          |$fieldTerm = new java.util.Random();
          |""".stripMargin
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 55e43c3..ed18811 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -375,6 +375,26 @@ object FunctionGenerator {
     Seq(),
     new ConstantCallGen(DOUBLE_TYPE_INFO, Math.PI.toString))
 
+  addSqlFunction(
+    RAND,
+    Seq(),
+    new RandCallGen(isRandInteger = false, hasSeed = false))
+
+  addSqlFunction(
+    RAND,
+    Seq(INT_TYPE_INFO),
+    new RandCallGen(isRandInteger = false, hasSeed = true))
+
+  addSqlFunction(
+    RAND_INTEGER,
+    Seq(INT_TYPE_INFO),
+    new RandCallGen(isRandInteger = true, hasSeed = false))
+
+  addSqlFunction(
+    RAND_INTEGER,
+    Seq(INT_TYPE_INFO, INT_TYPE_INFO),
+    new RandCallGen(isRandInteger = true, hasSeed = true))
+
   // ----------------------------------------------------------------------------------------------
   // Temporal functions
   // ----------------------------------------------------------------------------------------------
@@ -458,26 +478,6 @@ object FunctionGenerator {
     Seq(),
     new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
 
-  addSqlFunction(
-    RAND,
-    Seq(),
-    new RandCallGen(BuiltInMethod.RAND))
-
-  addSqlFunction(
-    RAND,
-    Seq(INT_TYPE_INFO),
-    new RandCallGen(BuiltInMethod.RAND_SEED))
-
-  addSqlFunction(
-    RAND_INTEGER,
-    Seq(INT_TYPE_INFO),
-    new RandCallGen(BuiltInMethod.RAND_INTEGER))
-
-  addSqlFunction(
-    RAND_INTEGER,
-    Seq(INT_TYPE_INFO, INT_TYPE_INFO),
-    new RandCallGen(BuiltInMethod.RAND_INTEGER_SEED))
-
   // ----------------------------------------------------------------------------------------------
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/RandCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/RandCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/RandCallGen.scala
index 7fbfa4d..f69ba91 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/RandCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/RandCallGen.scala
@@ -18,82 +18,40 @@
 
 package org.apache.flink.table.codegen.calls
 
-import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
 
 /**
-  * Generates a rand/rand_integer function call.
-  * Supports: RAND([seed]) and  RAND_INTEGER([seed, ] bound)
+  * Generates a random function call.
+  * Supports: RAND([seed]) and RAND_INTEGER([seed, ] bound)
   */
-class RandCallGen(method: BuiltInMethod) extends CallGenerator {
+class RandCallGen(isRandInteger: Boolean, hasSeed: Boolean) extends CallGenerator {
 
   override def generate(
-    codeGenerator: CodeGenerator,
-    operands: Seq[GeneratedExpression]): GeneratedExpression = {
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
 
-    method match {
-      case BuiltInMethod.RAND =>
-        assert(operands.isEmpty)
-        val randField = codeGenerator.addReusableRandom(null)
-        generateCallIfArgsNotNull(codeGenerator.nullCheck, DOUBLE_TYPE_INFO, operands) {
-          _ => s"""$randField.nextDouble()""".stripMargin
-        }
-      case BuiltInMethod.RAND_SEED =>
-        assert(operands.size == 1)
-        val (randField, newOperands) = if (operands.head.code.isEmpty) {
-          val (randField, initRandomExpr) = genInitRandomExpression(operands.head)
-          (randField, Seq(initRandomExpr))
-        } else {
-          val randField = codeGenerator.addReusableRandom(operands.head)
-          (randField, Seq.empty)
-        }
-        generateCallIfArgsNotNull(codeGenerator.nullCheck, DOUBLE_TYPE_INFO, newOperands) {
-          _ => s"""$randField.nextDouble()""".stripMargin
-        }
-      case BuiltInMethod.RAND_INTEGER =>
-        assert(operands.size == 1)
-        val randField = codeGenerator.addReusableRandom(null)
-        generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
-          (terms) => s"""$randField.nextInt(${terms.head})""".stripMargin
-        }
-      case BuiltInMethod.RAND_INTEGER_SEED =>
-        assert(operands.size == 2)
-        val (randField, newOperands) = if (operands.head.code.isEmpty) {
-          val (randField, initRandomExpr) = genInitRandomExpression(operands.head)
-          (randField, Seq(initRandomExpr, operands(1)))
-        } else {
-          val randField = codeGenerator.addReusableRandom(operands.head)
-          (randField, Seq(operands(1)))
-        }
-        generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, newOperands) {
-          (terms) => s"""$randField.nextInt(${terms.last})""".stripMargin
-        }
+    val randField = if (hasSeed) {
+      if (operands.head.literal) {
+        codeGenerator.addReusableRandom(Some(operands.head))
+      } else {
+        s"(new java.util.Random(${operands.head.resultTerm}))"
+      }
+    } else {
+      codeGenerator.addReusableRandom(None)
     }
-  }
 
-  private def genInitRandomExpression(
-    seedExpr: GeneratedExpression): (String, GeneratedExpression) = {
-    val randField = newName("random")
-    val initRandomCode =
-      s"""
-         |java.util.Random $randField;
-         |if(!${seedExpr.nullTerm}) {
-         |  $randField = new java.util.Random(${seedExpr.resultTerm});
-         |}
-         |else {
-         |  $randField = new java.util.Random();
-         |}
-         |""".stripMargin
-    val initRandomExpr = GeneratedExpression(
-      randField,
-      GeneratedExpression.NEVER_NULL,
-      initRandomCode,
-      TypeInformation.of(classOf[java.util.Random]))
-    (randField, initRandomExpr)
+    if (isRandInteger) {
+      generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) { terms =>
+        s"$randField.nextInt(${terms.last})"
+      }
+    } else {
+      generateCallIfArgsNotNull(codeGenerator.nullCheck, DOUBLE_TYPE_INFO, operands) { _ =>
+        s"$randField.nextDouble()"
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index fa9d3d9..f34b0d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -934,8 +934,6 @@ object ScalarOperators {
     }
   }
 
-<<<<<<< HEAD
-<<<<<<< HEAD
   def generateMapGet(
       codeGenerator: CodeGenerator,
       map: GeneratedExpression,
@@ -964,68 +962,8 @@ object ScalarOperators {
              |""".stripMargin
         }
     GeneratedExpression(resultTerm, nullTerm, accessCode, resultType)
-=======
-  def generateRand(
-    randField: String,
-    seedExpr: GeneratedExpression,
-    resultType: TypeInformation[_])
-  : GeneratedExpression = {
-    val resultTerm = newName("result")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-    val randCode = if (seedExpr != null) {
-      s"""
-         |if ($randField == null) {
-         |  ${seedExpr.code}
-         |  $randField = new java.util.Random(${seedExpr.resultTerm});
-         |}
-         |$resultTypeTerm $resultTerm = $randField.nextDouble();
-       """.stripMargin
-    } else {
-      s"""
-         |if ($randField == null) {
-         |  $randField = new java.util.Random();
-         |}
-         |$resultTypeTerm $resultTerm = $randField.nextDouble();
-       """.stripMargin
-    }
-
-    GeneratedExpression(resultTerm, GeneratedExpression.NEVER_NULL, randCode, resultType)
-  }
-
-  def generateRandInteger(
-    randField: String,
-    seedExpr: GeneratedExpression,
-    boundExpr: GeneratedExpression,
-    resultType: TypeInformation[_])
-  : GeneratedExpression = {
-    assert(boundExpr != null)
-    val resultTerm = newName("result")
-    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-    val randCode = if (seedExpr != null) {
-      s"""
-         |if ($randField == null) {
-         |  ${seedExpr.code}
-         |  $randField = new java.util.Random(${seedExpr.resultTerm});
-         |}
-         |${boundExpr.code}
-         |$resultTypeTerm $resultTerm = $randField.nextInt(${boundExpr.resultTerm});
-       """.stripMargin
-    } else {
-      s"""
-         |if ($randField == null) {
-         |  $randField = new java.util.Random();
-         |}
-         |${boundExpr.code}
-         |$resultTypeTerm $resultTerm = $randField.nextInt(${boundExpr.resultTerm});
-       """.stripMargin
-    }
-
-    GeneratedExpression(resultTerm, GeneratedExpression.NEVER_NULL, randCode, resultType)
->>>>>>> [FLINK-6237] [table] support RAND and RAND_INTEGER on SQL
   }
 
-=======
->>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
   // ----------------------------------------------------------------------------------------------
 
   private def generateUnaryOperatorIfNotNull(
@@ -1114,7 +1052,7 @@ object ScalarOperators {
     case "*" => "multiply"
     case "/" => "divide"
     case "%" => "remainder"
-    case _ => throw new CodeGenException(s"Unsupported decimal arithmetic operator: '${operator}'")
+    case _ => throw new CodeGenException(s"Unsupported decimal arithmetic operator: '$operator'")
   }
 
   private def numericCasting(
@@ -1129,7 +1067,7 @@ object ScalarOperators {
       case LONG_TYPE_INFO => "longValueExact"
       case FLOAT_TYPE_INFO => "floatValue"
       case DOUBLE_TYPE_INFO => "doubleValue"
-      case _ => throw new CodeGenException(s"Unsupported decimal casting type: '${targetType}'")
+      case _ => throw new CodeGenException(s"Unsupported decimal casting type: '$targetType'")
     }
 
     val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
index e26fb64..c6d722a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
@@ -30,12 +30,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
   * @param nullTerm boolean term that indicates if expression is null
   * @param code code necessary to produce resultTerm and nullTerm
   * @param resultType type of the resultTerm
+  * @param literal flag to indicate a constant expression do not reference input and can thus
+  *                 be used in the member area (e.g. as constructor parameter of a reusable
+  *                 instance)
   */
 case class GeneratedExpression(
-    resultTerm: String,
-    nullTerm: String,
-    code: String,
-    resultType: TypeInformation[_])
+  resultTerm: String,
+  nullTerm: String,
+  code: String,
+  resultType: TypeInformation[_],
+  literal: Boolean = false)
 
 object GeneratedExpression {
   val ALWAYS_NULL = "true"

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
index fae4470..f78d917 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -20,12 +20,13 @@ package org.apache.flink.table.expressions
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
-
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.table.typeutils.TypeCheckUtils
 import org.apache.flink.table.validate._
 
+import scala.collection.JavaConversions._
+
 case class Abs(child: Expression) extends UnaryExpression {
   override private[flink] def resultType: TypeInformation[_] = child.resultType
 
@@ -285,3 +286,61 @@ case class Pi() extends LeafExpression {
     relBuilder.call(SqlStdOperatorTable.PI)
   }
 }
+
+case class Rand(seed: Expression) extends Expression with InputTypeSpec {
+
+  def this() = this(null)
+
+  override private[flink] def children: Seq[Expression] = if (seed != null) {
+    seed :: Nil
+  } else {
+    Nil
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
+    INT_TYPE_INFO :: Nil
+  } else {
+    Nil
+  }
+
+  override def toString: String = if (seed != null) {
+    s"rand($seed)"
+  } else {
+    s"rand()"
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.RAND, children.map(_.toRexNode))
+  }
+}
+
+case class RandInteger(seed: Expression, bound: Expression) extends Expression with InputTypeSpec {
+
+  def this(bound: Expression) = this(null, bound)
+
+  override private[flink] def children: Seq[Expression] = if (seed != null) {
+    seed :: bound :: Nil
+  } else {
+    bound :: Nil
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.INT_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
+    INT_TYPE_INFO :: INT_TYPE_INFO :: Nil
+  } else {
+    INT_TYPE_INFO :: Nil
+  }
+
+  override def toString: String = if (seed != null) {
+    s"randInteger($seed, $bound)"
+  } else {
+    s"randInteger($bound)"
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.RAND_INTEGER, children.map(_.toRexNode))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/random.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/random.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/random.scala
deleted file mode 100644
index ad6a914..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/random.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-
-import scala.collection.JavaConversions._
-
-case class Rand(seed: Expression) extends Expression with InputTypeSpec {
-
-  def this() = this(null)
-
-  override private[flink] def children: Seq[Expression] = if (seed != null) {
-    seed :: Nil
-  } else {
-    Nil
-  }
-
-  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.DOUBLE_TYPE_INFO
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
-    INT_TYPE_INFO :: Nil
-  } else {
-    Nil
-  }
-
-  override def toString: String = if (seed != null) {
-    s"rand($seed)"
-  } else {
-    s"rand()"
-  }
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.RAND, children.map(_.toRexNode))
-  }
-
-}
-
-case class RandInteger(seed: Expression, bound: Expression) extends Expression with InputTypeSpec {
-
-  def this(bound: Expression) = this(null, bound)
-
-  override private[flink] def children: Seq[Expression] = if (seed != null) {
-    seed :: bound :: Nil
-  } else {
-    bound :: Nil
-  }
-
-  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.INT_TYPE_INFO
-
-  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
-    INT_TYPE_INFO :: INT_TYPE_INFO :: Nil
-  } else {
-    INT_TYPE_INFO :: Nil
-  }
-
-  override def toString: String = if (seed != null) {
-    s"rand($seed, $bound)"
-  } else {
-    s"rand($bound)"
-  }
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.call(SqlStdOperatorTable.RAND_INTEGER, children.map(_.toRexNode))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 9d04543..a280bc8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -222,6 +222,8 @@ object FunctionCatalog {
     "sign" -> classOf[Sign],
     "round" -> classOf[Round],
     "pi" -> classOf[Pi],
+    "rand" -> classOf[Rand],
+    "randInteger" -> classOf[RandInteger],
 
     // temporal functions
     "extract" -> classOf[Extract],
@@ -244,18 +246,9 @@ object FunctionCatalog {
     "start" -> classOf[WindowStart],
     "end" -> classOf[WindowEnd],
 
-<<<<<<< HEAD
     // ordering
     "asc" -> classOf[Asc],
     "desc" -> classOf[Desc]
-=======
-    // extensions to support streaming query
-    "rowtime" -> classOf[RowTime],
-    "proctime" -> classOf[ProcTime],
-
-    "rand" -> classOf[Rand],
-    "rand_integer" -> classOf[RandInteger]
->>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation
   )
 
   /**
@@ -384,7 +377,6 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.QUARTER,
     SqlStdOperatorTable.SCALAR_QUERY,
     SqlStdOperatorTable.EXISTS,
-<<<<<<< HEAD
     SqlStdOperatorTable.SIN,
     SqlStdOperatorTable.COS,
     SqlStdOperatorTable.TAN,
@@ -397,10 +389,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.SIGN,
     SqlStdOperatorTable.ROUND,
     SqlStdOperatorTable.PI,
-=======
     SqlStdOperatorTable.RAND,
     SqlStdOperatorTable.RAND_INTEGER,
->>>>>>> [FLINK-6237] [table] support RAND and RAND_INTEGER on SQL
     // EXTENSIONS
     SqlStdOperatorTable.TUMBLE,
     SqlStdOperatorTable.TUMBLE_START,

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
index ff38130..f4e5daf 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -18,39 +18,22 @@
 
 package org.apache.flink.table.api.java.batch.sql;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-<<<<<<< HEAD
-<<<<<<< HEAD
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.MapTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-=======
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
->>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
-=======
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
->>>>>>> support non-constant field arguments for rand and rand_integer
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.types.Row;
-<<<<<<< HEAD
 
-=======
->>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -58,14 +41,7 @@ import org.junit.runners.Parameterized;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-<<<<<<< HEAD
 import java.util.Map;
-=======
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
->>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
 
 /**
  * Integration tests for batch SQL.
@@ -176,106 +152,6 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
 	}
 
 	@Test
-<<<<<<< HEAD
-	public void testMap() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<Tuple2<Integer, Map<String, String>>> rows = new ArrayList<>();
-		rows.add(new Tuple2<>(1, Collections.singletonMap("foo", "bar")));
-		rows.add(new Tuple2<>(2, Collections.singletonMap("foo", "spam")));
-
-		TypeInformation<Tuple2<Integer, Map<String, String>>> ty = new TupleTypeInfo<>(
-			BasicTypeInfo.INT_TYPE_INFO,
-			new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
-
-		DataSet<Tuple2<Integer, Map<String, String>>> ds1 = env.fromCollection(rows, ty);
-		tableEnv.registerDataSet("t1", ds1, "a, b");
-
-		String sqlQuery = "SELECT b['foo'] FROM t1";
-=======
-	public void testRandAndRandInteger() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		String sqlQuery = "SELECT i, RAND() AS r1, RAND(1) AS r2, " +
-			"RAND_INTEGER(10) AS r3, RAND_INTEGER(3, 10) AS r4 " +
-			"FROM (VALUES 1, 2, 3, 4, 5) AS t(i)";
->>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-<<<<<<< HEAD
-		String expected = "bar\n" + "spam\n";
-		compareResultAsText(results, expected);
-=======
-		Random expectedRandom1 = new Random(1);
-		Random expectedRandom2 = new Random(3);
-		assertEquals(5, results.size());
-		for (int i = 0; i < 5; ++i) {
-			Row row = results.get(i);
-			assertEquals(i + 1, row.getField(0));
-			double r1 = (double) row.getField(1);
-			double r2 = (double) row.getField(2);
-			int r3 = (int) row.getField(3);
-			int r4 = (int) row.getField(4);
-
-			assertTrue(0 <= r1 && r1 < 1);
-			assertEquals("" + expectedRandom1.nextDouble(), "" + r2);
-			assertTrue(0 <= r3 && r3 < 10);
-			assertEquals("" + expectedRandom2.nextInt(10), "" + r4);
-		}
-<<<<<<< HEAD
->>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
-=======
-	}
-
-	@Test
-	public void testRandAndRandIntegerWithField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-		tableEnv.registerDataSet("t", ds, "a, b, c");
-
-		String sqlQuery = "SELECT a, b, RAND(a) AS r1, " +
-			"RAND_INTEGER(a) AS r2, RAND_INTEGER(a, 10) AS r3, RAND_INTEGER(4, a) AS r4, " +
-			"RAND_INTEGER(a, CAST(b AS INT)) AS r5 FROM t ORDER BY a";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-
-		List<Integer> aValues = Lists.newArrayList(1, 2, 3);
-		List<Long> bValues = Lists.newArrayList(1L, 2L, 2L);
-		Random expectedRandom4 = new Random(4);
-		assertEquals(3, results.size());
-		for (int i = 0; i < 3; ++i) {
-			Row row = results.get(i);
-			int a = aValues.get(i);
-			Long b = bValues.get(i);
-			assertEquals(a, row.getField(0));
-			assertEquals(b, row.getField(1));
-			double expectedR1 = new Random(a).nextDouble();
-			double r1 = (double) row.getField(2);
-			int r2 = (int) row.getField(3);
-			int expectedR3 = new Random(a).nextInt(10);
-			int r3 = (int) row.getField(4);
-			int expectedR4 = expectedRandom4.nextInt(a);
-			int r4 = (int) row.getField(5);
-			int expectedR5 = new Random(a).nextInt(b.intValue());
-			int r5 = (int) row.getField(6);
-
-			assertEquals("" + expectedR1, "" + r1);
-			assertTrue(0 <= r2 && r2 < a);
-			assertEquals(expectedR3, r3);
-			assertEquals(expectedR4, r4);
-			assertEquals(expectedR5, r5);
-		}
-	}
-
-	@Test
 	public void testMap() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
@@ -298,6 +174,5 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
 		List<Row> results = resultSet.collect();
 		String expected = "bar\n" + "spam\n";
 		compareResultAsText(results, expected);
->>>>>>> support non-constant field arguments for rand and rand_integer
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index b52ccbc..4f32382 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -18,16 +18,11 @@
 
 package org.apache.flink.table.api.java.stream.sql;
 
-<<<<<<< HEAD
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-=======
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
->>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
@@ -37,18 +32,11 @@ import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.api.java.stream.utils.StreamTestData;
 import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
 import org.apache.flink.types.Row;
-<<<<<<< HEAD
 
-=======
->>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Integration tests for streaming SQL.
@@ -176,38 +164,4 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 
 		StreamITCase.compareWithList(expected);
 	}
-
-	@Test
-	public void testRandAndRandInteger() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		StreamITCase.clear();
-
-		String sqlQuery = "SELECT i, RAND() AS r1, RAND(1) AS r2, " +
-			"RAND_INTEGER(10) AS r3, RAND_INTEGER(3, 10) AS r4 " +
-			"FROM (VALUES 1, 2, 3, 4, 5) AS t(i)";
-		Table result = tableEnv.sql(sqlQuery);
-
-		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
-		resultSet.addSink(new StreamITCase.StringSink());
-		env.execute();
-
-		Random expectedRandom1 = new Random(1);
-		Random expectedRandom2 = new Random(3);
-		assertEquals(5, StreamITCase.testResults().size());
-		for (int i = 0; i < 5; ++i) {
-			String row = StreamITCase.testResults().get(i).get();
-			String[] values = row.split(",");
-			assertEquals(Integer.valueOf(i + 1), Integer.valueOf(values[0]));
-			double r1 = Double.valueOf(values[1]);
-			double r2 = Double.valueOf(values[2]);
-			int r3 = Integer.valueOf(values[3]);
-			int r4 = Integer.valueOf(values[4]);
-
-			assertTrue(0 <= r1 && r1 < 1);
-			assertEquals("" + expectedRandom1.nextDouble(), "" + r2);
-			assertTrue(0 <= r3 && r3 < 10);
-			assertEquals("" + expectedRandom2.nextInt(10), "" + r4);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a0b78146/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 2f85957..5f3baa3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1116,6 +1116,37 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       math.Pi.toString)
   }
 
+  @Test
+  def testRandAndRandInteger(): Unit = {
+    val random1 = new java.util.Random(1)
+    testAllApis(
+      rand(1),
+      "rand(1)",
+      "RAND(1)",
+      random1.nextDouble().toString)
+
+    val random2 = new java.util.Random(3)
+    testAllApis(
+      rand('f7),
+      "rand(f7)",
+      "RAND(f7)",
+      random2.nextDouble().toString)
+
+    val random3 = new java.util.Random(1)
+    testAllApis(
+      randInteger(1, 10),
+      "randInteger(1, 10)",
+      "RAND_INTEGER(1, 10)",
+      random3.nextInt(10).toString)
+
+    val random4 = new java.util.Random(3)
+    testAllApis(
+      randInteger('f7, 'f4.cast(Types.INT)),
+      "randInteger(f7, f4.cast(INT))",
+      "RAND_INTEGER(f7, CAST(f4 AS INT))",
+      random4.nextInt(44).toString)
+  }
+
   // ----------------------------------------------------------------------------------------------
   // Temporal functions
   // ----------------------------------------------------------------------------------------------
@@ -1520,100 +1551,6 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       "true")
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testInvalidTableApiRand1(): Unit = {
-    // Must fail. Parameter of rand must be an Integer not a Double.
-    testTableApi(rand(2.0.toExpr), "FAIL", "FAIL")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidTableApiRand2(): Unit = {
-    // Must fail. The parameter number of rand must be less than 2.
-    testTableApi(rand(1, 10), "FAIL", "FAIL")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidTableApiStringExpressionRand1(): Unit = {
-    // Must fail. Parameter of rand must be an Integer not a Double.
-    testTableApi("FAIL", "rand(2.0)", "FAIL")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidTableApiStringExpressionRand2(): Unit = {
-    // Must fail. The parameter number of rand must be less than 2.
-    testTableApi("FAIL", "rand(1, 10)", "FAIL")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidSqlRand(): Unit = {
-    // Must fail. The parameter number of rand must be less than 2.
-    testSqlApi("RAND(1, 10)", "FAIL")
-  }
-
-  @Test
-  def testRand(): Unit = {
-    val random1 = new java.util.Random(1)
-    testAllApis(
-      rand(1),
-      "rand(1)",
-      "RAND(1)",
-      "" + random1.nextDouble())
-
-    val random2 = new java.util.Random(3)
-    testAllApis(
-      rand('f7),
-      "rand(f7)",
-      "RAND(f7)",
-      "" + random2.nextDouble())
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidTableApiRandInteger1(): Unit = {
-    // Must fail. Parameter of rand_integer must be an Integer not a Double.
-    testTableApi(rand_integer(2.0.toExpr), "FAIL", "FAIL")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidTableApiRandInteger2(): Unit = {
-    // Must fail. The parameter number of rand_integer must be less than 3 and greater than 0.
-    testTableApi(rand_integer(), "FAIL", "FAIL")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidTableApiStringExpressionRandInteger1(): Unit = {
-    // Must fail. Parameter of rand_integer must be an Integer not a Double.
-    testTableApi("FAIL", "rand_integer(2.0)", "FAIL")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidTableApiStringExpressionRandInteger2(): Unit = {
-    // Must fail. The parameter number of rand_integer must be less than 3 and greater than 0.
-    testTableApi("FAIL", "rand_integer()", "FAIL")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidSqlRandInteger(): Unit = {
-    // Must fail. The parameter number of rand_integer must be less than 3 and greater than 0.
-    testSqlApi("RAND_INTEGER()", "FAIL")
-  }
-
-  @Test
-  def testRandInteger(): Unit = {
-    val random1 = new java.util.Random(1)
-    testAllApis(
-      rand_integer(1, 10),
-      "rand_integer(1, 10)",
-      "RAND_INTEGER(1, 10)",
-      "" + random1.nextInt(10))
-
-    val random2 = new java.util.Random(3)
-    testAllApis(
-      rand_integer('f7, 'f4.cast(Types.INT)),
-      "rand_integer(f7, f4.cast(INT))",
-      "RAND_INTEGER(f7, CAST(f4 AS INT))",
-      "" + random2.nextInt(44))
-  }
-
   // ----------------------------------------------------------------------------------------------
 
   def testData = {


[2/2] flink git commit: [FLINK-6237] [table] support RAND and RAND_INTEGER on SQL

Posted by tw...@apache.org.
[FLINK-6237] [table] support RAND and RAND_INTEGER on SQL

This closes #3660.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6230cc55
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6230cc55
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6230cc55

Branch: refs/heads/master
Commit: 6230cc55048b21ac5488e2a42372ba1a14500a8a
Parents: 44ae21d
Author: godfreyhe <go...@163.com>
Authored: Sat Apr 1 14:28:49 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Fri Jun 23 11:43:31 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/tableApi.md                      | 244 +++++++++++++++++++
 .../flink/table/api/scala/expressionDsl.scala   |  46 ++++
 .../flink/table/codegen/CodeGenerator.scala     |  43 ++++
 .../table/codegen/calls/FunctionGenerator.scala |  20 ++
 .../flink/table/codegen/calls/RandCallGen.scala |  99 ++++++++
 .../table/codegen/calls/ScalarOperators.scala   |  62 +++++
 .../apache/flink/table/expressions/random.scala |  87 +++++++
 .../flink/table/validate/FunctionCatalog.scala  |  14 ++
 .../table/api/java/batch/sql/SqlITCase.java     | 125 ++++++++++
 .../table/api/java/stream/sql/SqlITCase.java    |  46 ++++
 .../table/expressions/ScalarFunctionsTest.scala |  94 +++++++
 11 files changed, 880 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6230cc55/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 0811232..e379078 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1411,8 +1411,12 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov
 // Bounded Event-time over window (assuming an event-time attribute "rowtime")
 .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
 
+<<<<<<< HEAD:docs/dev/table/tableApi.md
 // Bounded Processing-time over window (assuming a processing-time attribute "proctime")
 .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
+=======
+A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUN
 T, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, F
 REE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, O
 N, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE
 , SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION,
  TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE, RAND, RAND_INTEGER
+>>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation:docs/dev/table_api.md
 
 // Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
 .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
@@ -3934,6 +3938,246 @@ ARRAY.element()
   </tbody>
 </table>
 
+<<<<<<< HEAD:docs/dev/table/tableApi.md
+=======
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Random functions</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td>
+        {% highlight text %}
+RAND()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
+RAND(seed integer)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed. Two RAND functions will get identical sequences of numbers if they have same initial seed.</p>
+      </td>
+    </tr>
+
+    <tr>
+     <td>
+       {% highlight text %}
+RAND_INTEGER(bound integer)
+{% endhighlight %}
+     </td>
+    <td>
+      <p>Returns a pseudorandom int value between 0.0 (inclusive) and the specified value (exclusive).</p>
+    </td>
+   </tr>
+
+    <tr>
+     <td>
+       {% highlight text %}
+RAND_INTEGER(seed integer, bound integer)
+{% endhighlight %}
+     </td>
+    <td>
+      <p>Returns a pseudorandom int value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed. Two RAND_INTEGER functions will get identical sequences of numbers if they have same initial seed and same bound.</p>
+    </td>
+   </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+{% top %}
+
+User-defined Functions
+----------------
+
+### User-defined Scalar Functions
+
+If a required scalar function is not contained in the built-in functions, it is possible to define custom, user-defined scalar functions for both the Table API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar values to a new scalar value.
+
+In order to define a scalar function one has to extend the base class `ScalarFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a scalar function is determined by the evaluation method. An evaluation method must be declared publicly and named `eval`. The parameter types and return type of the evaluation method also determine the parameter and return types of the scalar function. Evaluation methods can also be overloaded by implementing multiple methods named `eval`.
+
+The following example shows how to define your own hash code function, register it in the TableEnvironment, and call it in a query. Note that you can configure your scalar function via a constructor before it is registered:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class HashCode extends ScalarFunction {
+  private int factor = 12;
+  
+  public HashCode(int factor) {
+      this.factor = factor;
+  }
+  
+  public int eval(String s) {
+      return s.hashCode() * factor;
+  }
+}
+
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// register the function
+tableEnv.registerFunction("hashCode", new HashCode(10))
+
+// use the function in Java Table API
+myTable.select("string, string.hashCode(), hashCode(string)");
+
+// use the function in SQL API
+tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// must be defined in static/object context
+class HashCode(factor: Int) extends ScalarFunction {
+  def eval(s: String): Int = {
+    s.hashCode() * factor
+  }
+}
+
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// use the function in Scala Table API
+val hashCode = new HashCode(10)
+myTable.select('string, hashCode('string))
+
+// register and use the function in SQL
+tableEnv.registerFunction("hashCode", new HashCode(10))
+tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+{% endhighlight %}
+</div>
+</div>
+
+By default the result type of an evaluation method is determined by Flink's type extraction facilities. This is sufficient for basic types or simple POJOs but might be wrong for more complex, custom, or composite types. In these cases `TypeInformation` of the result type can be manually defined by overriding `ScalarFunction#getResultType()`.
+
+Internally, the Table API and SQL code generation works with primitive values as much as possible. If a user-defined scalar function should not introduce much overhead through object creation/casting during runtime, it is recommended to declare parameters and result types as primitive types instead of their boxed classes. `Types.DATE` and `Types.TIME` can also be represented as `int`. `Types.TIMESTAMP` can be represented as `long`.
+
+The following example shows an advanced example which takes the internal timestamp representation and also returns the internal timestamp representation as a long value. By overriding `ScalarFunction#getResultType()` we define that the returned long value should be interpreted as a `Types.TIMESTAMP` by the code generation.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public static class TimestampModifier extends ScalarFunction {
+  public long eval(long t) {
+    return t % 1000;
+  }
+
+  public TypeInformation<?> getResultType(signature: Class<?>[]) {
+    return Types.TIMESTAMP;
+  }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+object TimestampModifier extends ScalarFunction {
+  def eval(t: Long): Long = {
+    t % 1000
+  }
+
+  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+    Types.TIMESTAMP
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+### User-defined Table Functions
+
+Similar to a user-defined scalar function, a user-defined table function takes zero, one, or multiple scalar values as input parameters. However in contrast to a scalar function, it can return an arbitrary number of rows as output instead of a single value. The returned rows may consist of one or more columns. 
+
+In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table function. The type of the returned table is determined by the generic type of `TableFunction`. Evaluation methods emit output rows using the protected `collect(T)` method.
+
+In the Table API, a table function is used with `.join(Expression)` or `.leftOuterJoin(Expression)` for Scala users and `.join(String)` or `.leftOuterJoin(String)` for Java users. The `join` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoin` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator) and preserves outer rows for which the table function returns an empty table. In SQL use `LATERAL TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join condition (see examples below).
+
+The following example shows how to define table-valued function, register it in the TableEnvironment, and call it in a query. Note that you can configure your table function via a constructor before it is registered: 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
+public class Split extends TableFunction<Tuple2<String, Integer>> {
+    private String separator = " ";
+    
+    public Split(String separator) {
+        this.separator = separator;
+    }
+    
+    public void eval(String str) {
+        for (String s : str.split(separator)) {
+            // use collect(...) to emit a row
+            collect(new Tuple2<String, Integer>(s, s.length()));
+        }
+    }
+}
+
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+Table myTable = ...         // table schema: [a: String]
+
+// Register the function.
+tableEnv.registerFunction("split", new Split("#"));
+
+// Use the table function in the Java Table API. "as" specifies the field names of the table.
+myTable.join("split(a) as (word, length)").select("a, word, length");
+myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");
+
+// Use the table function in SQL with LATERAL and TABLE keywords.
+// CROSS JOIN a table function (equivalent to "join" in Table API).
+tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
+tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// The generic type "(String, Int)" determines the schema of the returned table as (String, Integer).
+class Split(separator: String) extends TableFunction[(String, Int)] {
+  def eval(str: String): Unit = {
+    // use collect(...) to emit a row.
+    str.split(separator).foreach(x -> collect((x, x.length))
+  }
+}
+
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+val myTable = ...         // table schema: [a: String]
+
+// Use the table function in the Scala Table API (Note: No registration required in Scala Table API).
+val split = new Split("#")
+// "as" specifies the field names of the generated table.
+myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length);
+myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length);
+
+// Register the table function to use it in SQL queries.
+tableEnv.registerFunction("split", new Split("#"))
+
+// Use the table function in SQL with LATERAL and TABLE keywords.
+// CROSS JOIN a table function (equivalent to "join" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
+{% endhighlight %}
+**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
+</div>
+</div>
+
+Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`.
+>>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation:docs/dev/table_api.md
 
 
 <table class="table table-bordered">

http://git-wip-us.apache.org/repos/asf/flink/blob/6230cc55/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 7b424b2..1e5a579 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -22,11 +22,18 @@ import java.sql.{Date, Time, Timestamp}
 
 import org.apache.calcite.avatica.util.DateTimeUtils._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+<<<<<<< HEAD
 import org.apache.flink.table.api.{TableException, CurrentRow, CurrentRange, UnboundedRow, UnboundedRange}
 import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.AggregateFunction
+=======
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRowInterval}
+import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions._
+>>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation
 
 import scala.language.implicitConversions
 
@@ -910,6 +917,44 @@ object array {
   }
 }
 
+<<<<<<< HEAD
+/**
+  * Returns a value that is closer than any other value to pi.
+  */
+object pi {
+
+  /**
+    * Returns a value that is closer than any other value to pi.
+    */
+  def apply(): Expression = {
+    Pi()
+=======
+object rand {
+
+  def apply(expr: Expression*): Expression = {
+    expr.size match {
+      case 0 => new Rand()
+      case 1 => Rand(expr.head)
+      case _ => throw new ValidationException("Invalid arguments for rand([seed]).")
+    }
+  }
+}
+
+object rand_integer {
+
+  def apply(expr: Expression*): Expression = {
+    expr.size match {
+      case 1 => new RandInteger(expr.head)
+      case 2 => RandInteger(expr.head, expr(1))
+      case _ =>
+        throw new ValidationException("Invalid arguments for rand_integer([seed, ] bound).")
+    }
+<<<<<<< HEAD
+>>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation
+=======
+  }
+}
+
 /**
   * Returns a value that is closer than any other value to pi.
   */
@@ -920,6 +965,7 @@ object pi {
     */
   def apply(): Expression = {
     Pi()
+>>>>>>> fix merging error
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6230cc55/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 73abb70..343a3c4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -43,7 +43,11 @@ import org.apache.flink.table.codegen.Indenter.toISC
 import org.apache.flink.table.codegen.calls.FunctionGenerator
 import org.apache.flink.table.codegen.calls.ScalarOperators._
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+<<<<<<< HEAD
 import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction}
+=======
+import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
+>>>>>>> [FLINK-6237] [table] support RAND and RAND_INTEGER on SQL
 import org.apache.flink.table.runtime.TableFunctionCollector
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 import org.apache.flink.types.Row
@@ -1959,6 +1963,45 @@ class CodeGenerator(
   }
 
   /**
+    * Adds a reusable [[java.util.Random]] to the member area of the generated [[Function]].
+    *
+    * @return member variable term
+    */
+  def addReusableRandom(seedExpr: GeneratedExpression): String = {
+    val fieldTerm = newName("random")
+
+    val field =
+      s"""
+         |final java.util.Random $fieldTerm;
+         |""".stripMargin
+    reusableMemberStatements.add(field)
+
+    val fieldInit = if (seedExpr != null && nullCheck) {
+      s"""
+         |${seedExpr.code}
+         |if(!${seedExpr.nullTerm}) {
+         |  $fieldTerm = new java.util.Random(${seedExpr.resultTerm});
+         |}
+         |else {
+         |  $fieldTerm = new java.util.Random();
+         |}
+         |""".stripMargin
+    } else if (seedExpr != null) {
+      s"""
+         |${seedExpr.code}
+         |$fieldTerm = new java.util.Random(${seedExpr.resultTerm});
+         |""".stripMargin
+    } else {
+      s"""
+         |$fieldTerm = new java.util.Random();
+         |""".stripMargin
+    }
+
+    reusableInitStatements.add(fieldInit)
+    fieldTerm
+  }
+
+  /**
     * Adds a reusable [[UserDefinedFunction]] to the member area of the generated [[Function]].
     *
     * @param function [[UserDefinedFunction]] object to be instantiated during runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/6230cc55/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 64280c8..55e43c3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -458,6 +458,26 @@ object FunctionGenerator {
     Seq(),
     new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
 
+  addSqlFunction(
+    RAND,
+    Seq(),
+    new RandCallGen(BuiltInMethod.RAND))
+
+  addSqlFunction(
+    RAND,
+    Seq(INT_TYPE_INFO),
+    new RandCallGen(BuiltInMethod.RAND_SEED))
+
+  addSqlFunction(
+    RAND_INTEGER,
+    Seq(INT_TYPE_INFO),
+    new RandCallGen(BuiltInMethod.RAND_INTEGER))
+
+  addSqlFunction(
+    RAND_INTEGER,
+    Seq(INT_TYPE_INFO, INT_TYPE_INFO),
+    new RandCallGen(BuiltInMethod.RAND_INTEGER_SEED))
+
   // ----------------------------------------------------------------------------------------------
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6230cc55/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/RandCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/RandCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/RandCallGen.scala
new file mode 100644
index 0000000..7fbfa4d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/RandCallGen.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+
+/**
+  * Generates a rand/rand_integer function call.
+  * Supports: RAND([seed]) and  RAND_INTEGER([seed, ] bound)
+  */
+class RandCallGen(method: BuiltInMethod) extends CallGenerator {
+
+  override def generate(
+    codeGenerator: CodeGenerator,
+    operands: Seq[GeneratedExpression]): GeneratedExpression = {
+
+    method match {
+      case BuiltInMethod.RAND =>
+        assert(operands.isEmpty)
+        val randField = codeGenerator.addReusableRandom(null)
+        generateCallIfArgsNotNull(codeGenerator.nullCheck, DOUBLE_TYPE_INFO, operands) {
+          _ => s"""$randField.nextDouble()""".stripMargin
+        }
+      case BuiltInMethod.RAND_SEED =>
+        assert(operands.size == 1)
+        val (randField, newOperands) = if (operands.head.code.isEmpty) {
+          val (randField, initRandomExpr) = genInitRandomExpression(operands.head)
+          (randField, Seq(initRandomExpr))
+        } else {
+          val randField = codeGenerator.addReusableRandom(operands.head)
+          (randField, Seq.empty)
+        }
+        generateCallIfArgsNotNull(codeGenerator.nullCheck, DOUBLE_TYPE_INFO, newOperands) {
+          _ => s"""$randField.nextDouble()""".stripMargin
+        }
+      case BuiltInMethod.RAND_INTEGER =>
+        assert(operands.size == 1)
+        val randField = codeGenerator.addReusableRandom(null)
+        generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, operands) {
+          (terms) => s"""$randField.nextInt(${terms.head})""".stripMargin
+        }
+      case BuiltInMethod.RAND_INTEGER_SEED =>
+        assert(operands.size == 2)
+        val (randField, newOperands) = if (operands.head.code.isEmpty) {
+          val (randField, initRandomExpr) = genInitRandomExpression(operands.head)
+          (randField, Seq(initRandomExpr, operands(1)))
+        } else {
+          val randField = codeGenerator.addReusableRandom(operands.head)
+          (randField, Seq(operands(1)))
+        }
+        generateCallIfArgsNotNull(codeGenerator.nullCheck, INT_TYPE_INFO, newOperands) {
+          (terms) => s"""$randField.nextInt(${terms.last})""".stripMargin
+        }
+    }
+  }
+
+  private def genInitRandomExpression(
+    seedExpr: GeneratedExpression): (String, GeneratedExpression) = {
+    val randField = newName("random")
+    val initRandomCode =
+      s"""
+         |java.util.Random $randField;
+         |if(!${seedExpr.nullTerm}) {
+         |  $randField = new java.util.Random(${seedExpr.resultTerm});
+         |}
+         |else {
+         |  $randField = new java.util.Random();
+         |}
+         |""".stripMargin
+    val initRandomExpr = GeneratedExpression(
+      randField,
+      GeneratedExpression.NEVER_NULL,
+      initRandomCode,
+      TypeInformation.of(classOf[java.util.Random]))
+    (randField, initRandomExpr)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6230cc55/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 1af4a34..fa9d3d9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -934,6 +934,8 @@ object ScalarOperators {
     }
   }
 
+<<<<<<< HEAD
+<<<<<<< HEAD
   def generateMapGet(
       codeGenerator: CodeGenerator,
       map: GeneratedExpression,
@@ -962,8 +964,68 @@ object ScalarOperators {
              |""".stripMargin
         }
     GeneratedExpression(resultTerm, nullTerm, accessCode, resultType)
+=======
+  def generateRand(
+    randField: String,
+    seedExpr: GeneratedExpression,
+    resultType: TypeInformation[_])
+  : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+    val randCode = if (seedExpr != null) {
+      s"""
+         |if ($randField == null) {
+         |  ${seedExpr.code}
+         |  $randField = new java.util.Random(${seedExpr.resultTerm});
+         |}
+         |$resultTypeTerm $resultTerm = $randField.nextDouble();
+       """.stripMargin
+    } else {
+      s"""
+         |if ($randField == null) {
+         |  $randField = new java.util.Random();
+         |}
+         |$resultTypeTerm $resultTerm = $randField.nextDouble();
+       """.stripMargin
+    }
+
+    GeneratedExpression(resultTerm, GeneratedExpression.NEVER_NULL, randCode, resultType)
+  }
+
+  def generateRandInteger(
+    randField: String,
+    seedExpr: GeneratedExpression,
+    boundExpr: GeneratedExpression,
+    resultType: TypeInformation[_])
+  : GeneratedExpression = {
+    assert(boundExpr != null)
+    val resultTerm = newName("result")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+    val randCode = if (seedExpr != null) {
+      s"""
+         |if ($randField == null) {
+         |  ${seedExpr.code}
+         |  $randField = new java.util.Random(${seedExpr.resultTerm});
+         |}
+         |${boundExpr.code}
+         |$resultTypeTerm $resultTerm = $randField.nextInt(${boundExpr.resultTerm});
+       """.stripMargin
+    } else {
+      s"""
+         |if ($randField == null) {
+         |  $randField = new java.util.Random();
+         |}
+         |${boundExpr.code}
+         |$resultTypeTerm $resultTerm = $randField.nextInt(${boundExpr.resultTerm});
+       """.stripMargin
+    }
+
+    GeneratedExpression(resultTerm, GeneratedExpression.NEVER_NULL, randCode, resultType)
+>>>>>>> [FLINK-6237] [table] support RAND and RAND_INTEGER on SQL
   }
 
+=======
+>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
   // ----------------------------------------------------------------------------------------------
 
   private def generateUnaryOperatorIfNotNull(

http://git-wip-us.apache.org/repos/asf/flink/blob/6230cc55/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/random.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/random.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/random.scala
new file mode 100644
index 0000000..ad6a914
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/random.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+
+import scala.collection.JavaConversions._
+
+case class Rand(seed: Expression) extends Expression with InputTypeSpec {
+
+  def this() = this(null)
+
+  override private[flink] def children: Seq[Expression] = if (seed != null) {
+    seed :: Nil
+  } else {
+    Nil
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.DOUBLE_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
+    INT_TYPE_INFO :: Nil
+  } else {
+    Nil
+  }
+
+  override def toString: String = if (seed != null) {
+    s"rand($seed)"
+  } else {
+    s"rand()"
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.RAND, children.map(_.toRexNode))
+  }
+
+}
+
+case class RandInteger(seed: Expression, bound: Expression) extends Expression with InputTypeSpec {
+
+  def this(bound: Expression) = this(null, bound)
+
+  override private[flink] def children: Seq[Expression] = if (seed != null) {
+    seed :: bound :: Nil
+  } else {
+    bound :: Nil
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.INT_TYPE_INFO
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = if (seed != null) {
+    INT_TYPE_INFO :: INT_TYPE_INFO :: Nil
+  } else {
+    INT_TYPE_INFO :: Nil
+  }
+
+  override def toString: String = if (seed != null) {
+    s"rand($seed, $bound)"
+  } else {
+    s"rand($bound)"
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.RAND_INTEGER, children.map(_.toRexNode))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6230cc55/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 4d945a7..9d04543 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -244,9 +244,18 @@ object FunctionCatalog {
     "start" -> classOf[WindowStart],
     "end" -> classOf[WindowEnd],
 
+<<<<<<< HEAD
     // ordering
     "asc" -> classOf[Asc],
     "desc" -> classOf[Desc]
+=======
+    // extensions to support streaming query
+    "rowtime" -> classOf[RowTime],
+    "proctime" -> classOf[ProcTime],
+
+    "rand" -> classOf[Rand],
+    "rand_integer" -> classOf[RandInteger]
+>>>>>>> support RAND and RAND_INTEGER on TableAPI, update the documentation
   )
 
   /**
@@ -375,6 +384,7 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.QUARTER,
     SqlStdOperatorTable.SCALAR_QUERY,
     SqlStdOperatorTable.EXISTS,
+<<<<<<< HEAD
     SqlStdOperatorTable.SIN,
     SqlStdOperatorTable.COS,
     SqlStdOperatorTable.TAN,
@@ -387,6 +397,10 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.SIGN,
     SqlStdOperatorTable.ROUND,
     SqlStdOperatorTable.PI,
+=======
+    SqlStdOperatorTable.RAND,
+    SqlStdOperatorTable.RAND_INTEGER,
+>>>>>>> [FLINK-6237] [table] support RAND and RAND_INTEGER on SQL
     // EXTENSIONS
     SqlStdOperatorTable.TUMBLE,
     SqlStdOperatorTable.TUMBLE_START,

http://git-wip-us.apache.org/repos/asf/flink/blob/6230cc55/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
index f4e5daf..ff38130 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -18,22 +18,39 @@
 
 package org.apache.flink.table.api.java.batch.sql;
 
+import com.google.common.collect.Lists;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+<<<<<<< HEAD
+<<<<<<< HEAD
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.MapTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+=======
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
+=======
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+>>>>>>> support non-constant field arguments for rand and rand_integer
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.types.Row;
+<<<<<<< HEAD
 
+=======
+>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -41,7 +58,14 @@ import org.junit.runners.Parameterized;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+<<<<<<< HEAD
 import java.util.Map;
+=======
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
 
 /**
  * Integration tests for batch SQL.
@@ -152,6 +176,106 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
 	}
 
 	@Test
+<<<<<<< HEAD
+	public void testMap() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<Tuple2<Integer, Map<String, String>>> rows = new ArrayList<>();
+		rows.add(new Tuple2<>(1, Collections.singletonMap("foo", "bar")));
+		rows.add(new Tuple2<>(2, Collections.singletonMap("foo", "spam")));
+
+		TypeInformation<Tuple2<Integer, Map<String, String>>> ty = new TupleTypeInfo<>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+
+		DataSet<Tuple2<Integer, Map<String, String>>> ds1 = env.fromCollection(rows, ty);
+		tableEnv.registerDataSet("t1", ds1, "a, b");
+
+		String sqlQuery = "SELECT b['foo'] FROM t1";
+=======
+	public void testRandAndRandInteger() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		String sqlQuery = "SELECT i, RAND() AS r1, RAND(1) AS r2, " +
+			"RAND_INTEGER(10) AS r3, RAND_INTEGER(3, 10) AS r4 " +
+			"FROM (VALUES 1, 2, 3, 4, 5) AS t(i)";
+>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+<<<<<<< HEAD
+		String expected = "bar\n" + "spam\n";
+		compareResultAsText(results, expected);
+=======
+		Random expectedRandom1 = new Random(1);
+		Random expectedRandom2 = new Random(3);
+		assertEquals(5, results.size());
+		for (int i = 0; i < 5; ++i) {
+			Row row = results.get(i);
+			assertEquals(i + 1, row.getField(0));
+			double r1 = (double) row.getField(1);
+			double r2 = (double) row.getField(2);
+			int r3 = (int) row.getField(3);
+			int r4 = (int) row.getField(4);
+
+			assertTrue(0 <= r1 && r1 < 1);
+			assertEquals("" + expectedRandom1.nextDouble(), "" + r2);
+			assertTrue(0 <= r3 && r3 < 10);
+			assertEquals("" + expectedRandom2.nextInt(10), "" + r4);
+		}
+<<<<<<< HEAD
+>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
+=======
+	}
+
+	@Test
+	public void testRandAndRandIntegerWithField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		tableEnv.registerDataSet("t", ds, "a, b, c");
+
+		String sqlQuery = "SELECT a, b, RAND(a) AS r1, " +
+			"RAND_INTEGER(a) AS r2, RAND_INTEGER(a, 10) AS r3, RAND_INTEGER(4, a) AS r4, " +
+			"RAND_INTEGER(a, CAST(b AS INT)) AS r5 FROM t ORDER BY a";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		List<Integer> aValues = Lists.newArrayList(1, 2, 3);
+		List<Long> bValues = Lists.newArrayList(1L, 2L, 2L);
+		Random expectedRandom4 = new Random(4);
+		assertEquals(3, results.size());
+		for (int i = 0; i < 3; ++i) {
+			Row row = results.get(i);
+			int a = aValues.get(i);
+			Long b = bValues.get(i);
+			assertEquals(a, row.getField(0));
+			assertEquals(b, row.getField(1));
+			double expectedR1 = new Random(a).nextDouble();
+			double r1 = (double) row.getField(2);
+			int r2 = (int) row.getField(3);
+			int expectedR3 = new Random(a).nextInt(10);
+			int r3 = (int) row.getField(4);
+			int expectedR4 = expectedRandom4.nextInt(a);
+			int r4 = (int) row.getField(5);
+			int expectedR5 = new Random(a).nextInt(b.intValue());
+			int r5 = (int) row.getField(6);
+
+			assertEquals("" + expectedR1, "" + r1);
+			assertTrue(0 <= r2 && r2 < a);
+			assertEquals(expectedR3, r3);
+			assertEquals(expectedR4, r4);
+			assertEquals(expectedR5, r5);
+		}
+	}
+
+	@Test
 	public void testMap() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
@@ -174,5 +298,6 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
 		List<Row> results = resultSet.collect();
 		String expected = "bar\n" + "spam\n";
 		compareResultAsText(results, expected);
+>>>>>>> support non-constant field arguments for rand and rand_integer
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6230cc55/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index 4f32382..b52ccbc 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -18,11 +18,16 @@
 
 package org.apache.flink.table.api.java.stream.sql;
 
+<<<<<<< HEAD
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+=======
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
@@ -32,11 +37,18 @@ import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.api.java.stream.utils.StreamTestData;
 import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
 import org.apache.flink.types.Row;
+<<<<<<< HEAD
 
+=======
+>>>>>>> add the rand functions to FunctionGenerator class, and init random field in constructor
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Integration tests for streaming SQL.
@@ -164,4 +176,38 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase {
 
 		StreamITCase.compareWithList(expected);
 	}
+
+	@Test
+	public void testRandAndRandInteger() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		String sqlQuery = "SELECT i, RAND() AS r1, RAND(1) AS r2, " +
+			"RAND_INTEGER(10) AS r3, RAND_INTEGER(3, 10) AS r4 " +
+			"FROM (VALUES 1, 2, 3, 4, 5) AS t(i)";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink());
+		env.execute();
+
+		Random expectedRandom1 = new Random(1);
+		Random expectedRandom2 = new Random(3);
+		assertEquals(5, StreamITCase.testResults().size());
+		for (int i = 0; i < 5; ++i) {
+			String row = StreamITCase.testResults().get(i).get();
+			String[] values = row.split(",");
+			assertEquals(Integer.valueOf(i + 1), Integer.valueOf(values[0]));
+			double r1 = Double.valueOf(values[1]);
+			double r2 = Double.valueOf(values[2]);
+			int r3 = Integer.valueOf(values[3]);
+			int r4 = Integer.valueOf(values[4]);
+
+			assertTrue(0 <= r1 && r1 < 1);
+			assertEquals("" + expectedRandom1.nextDouble(), "" + r2);
+			assertTrue(0 <= r3 && r3 < 10);
+			assertEquals("" + expectedRandom2.nextInt(10), "" + r4);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6230cc55/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 76c02c0..2f85957 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1520,6 +1520,100 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       "true")
   }
 
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTableApiRand1(): Unit = {
+    // Must fail. Parameter of rand must be an Integer not a Double.
+    testTableApi(rand(2.0.toExpr), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTableApiRand2(): Unit = {
+    // Must fail. The parameter number of rand must be less than 2.
+    testTableApi(rand(1, 10), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTableApiStringExpressionRand1(): Unit = {
+    // Must fail. Parameter of rand must be an Integer not a Double.
+    testTableApi("FAIL", "rand(2.0)", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTableApiStringExpressionRand2(): Unit = {
+    // Must fail. The parameter number of rand must be less than 2.
+    testTableApi("FAIL", "rand(1, 10)", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSqlRand(): Unit = {
+    // Must fail. The parameter number of rand must be less than 2.
+    testSqlApi("RAND(1, 10)", "FAIL")
+  }
+
+  @Test
+  def testRand(): Unit = {
+    val random1 = new java.util.Random(1)
+    testAllApis(
+      rand(1),
+      "rand(1)",
+      "RAND(1)",
+      "" + random1.nextDouble())
+
+    val random2 = new java.util.Random(3)
+    testAllApis(
+      rand('f7),
+      "rand(f7)",
+      "RAND(f7)",
+      "" + random2.nextDouble())
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTableApiRandInteger1(): Unit = {
+    // Must fail. Parameter of rand_integer must be an Integer not a Double.
+    testTableApi(rand_integer(2.0.toExpr), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTableApiRandInteger2(): Unit = {
+    // Must fail. The parameter number of rand_integer must be less than 3 and greater than 0.
+    testTableApi(rand_integer(), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTableApiStringExpressionRandInteger1(): Unit = {
+    // Must fail. Parameter of rand_integer must be an Integer not a Double.
+    testTableApi("FAIL", "rand_integer(2.0)", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTableApiStringExpressionRandInteger2(): Unit = {
+    // Must fail. The parameter number of rand_integer must be less than 3 and greater than 0.
+    testTableApi("FAIL", "rand_integer()", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSqlRandInteger(): Unit = {
+    // Must fail. The parameter number of rand_integer must be less than 3 and greater than 0.
+    testSqlApi("RAND_INTEGER()", "FAIL")
+  }
+
+  @Test
+  def testRandInteger(): Unit = {
+    val random1 = new java.util.Random(1)
+    testAllApis(
+      rand_integer(1, 10),
+      "rand_integer(1, 10)",
+      "RAND_INTEGER(1, 10)",
+      "" + random1.nextInt(10))
+
+    val random2 = new java.util.Random(3)
+    testAllApis(
+      rand_integer('f7, 'f4.cast(Types.INT)),
+      "rand_integer(f7, f4.cast(INT))",
+      "RAND_INTEGER(f7, CAST(f4 AS INT))",
+      "" + random2.nextInt(44))
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   def testData = {