You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/06/30 20:58:26 UTC

flink git commit: [FLINK-6075] [table] Add ORDER BY support for streaming table.

Repository: flink
Updated Branches:
  refs/heads/master d7d10a130 -> b8c8f204d


[FLINK-6075] [table] Add ORDER BY support for streaming table.

This closes #3889.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8c8f204
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8c8f204
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8c8f204

Branch: refs/heads/master
Commit: b8c8f204de718e6d5b7c3df837deafaed7c375f5
Parents: d7d10a1
Author: rtudoran <tu...@ymail.com>
Authored: Fri May 12 20:41:30 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jun 30 22:55:51 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |   6 +-
 .../calcite/RelTimeIndicatorConverter.scala     |   7 +-
 .../flink/table/plan/nodes/CommonSort.scala     | 114 ++++++++
 .../table/plan/nodes/dataset/DataSetSort.scala  |  53 +---
 .../plan/nodes/datastream/DataStreamSort.scala  | 191 +++++++++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   1 +
 .../rules/datastream/DataStreamSortRule.scala   |  85 ++++++
 .../aggregate/ProcTimeSortProcessFunction.scala | 101 +++++++
 .../aggregate/RowTimeSortProcessFunction.scala  | 139 ++++++++++
 .../table/runtime/aggregate/SortUtil.scala      | 193 +++++++++++++
 .../api/scala/stream/sql/OverWindowITCase.scala |  18 +-
 .../table/api/scala/stream/sql/SortITCase.scala | 122 ++++++++
 .../table/api/scala/stream/sql/SortTest.scala   |  81 ++++++
 .../api/scala/stream/sql/TimeTestUtil.scala     |  39 +++
 .../aggregate/TimeSortProcessFunctionTest.scala | 276 +++++++++++++++++++
 15 files changed, 1367 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index a736477..be586f1 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -486,13 +486,15 @@ FROM (
   	<tr>
       <td>
         <strong>Order By</strong><br>
-        <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
+<b>Note:</b> The result of streaming queries must be primarily sorted on an ascending <a href="streaming.html#time-attributes">time attribute</a>. Additional sorting attributes are supported. 
+
 {% highlight sql %}
 SELECT * 
 FROM Orders 
-ORDER BY users
+ORDER BY orderTime
 {% endhighlight %}
       </td>
     </tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index b28e3f8..385cab2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -92,8 +92,11 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
   override def visit(minus: LogicalMinus): RelNode =
     throw new TableException("Logical minus in a stream environment is not supported yet.")
 
-  override def visit(sort: LogicalSort): RelNode =
-    throw new TableException("Logical sort in a stream environment is not supported yet.")
+  override def visit(sort: LogicalSort): RelNode = {
+
+    val input = sort.getInput.accept(this)
+    LogicalSort.create(input, sort.collation, sort.offset, sort.fetch)
+  }
 
   override def visit(`match`: LogicalMatch): RelNode =
     throw new TableException("Logical match in a stream environment is not supported yet.")

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
new file mode 100644
index 0000000..2ad4083
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonSort.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConverters._
+import org.apache.flink.api.common.operators.Order
+import org.apache.calcite.rel.{RelWriter, RelCollation}
+
+/**
+ * Common methods for Flink sort operators.
+ */
+trait CommonSort {
+  
+  private def offsetToString(offset: RexNode): String = {
+    val offsetToString = s"$offset"
+    offsetToString
+  }
+  
+  private def sortFieldsToString(
+      collationSort: RelCollation, 
+      rowRelDataType: RelDataType): String = {
+    val fieldCollations = collationSort.getFieldCollations.asScala  
+      .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+    fieldCollations
+      .map(col => s"${rowRelDataType.getFieldNames.get(col._1)} ${col._2.getShortName}" )
+      .mkString(", ")
+  }
+  
+  private[flink] def directionToOrder(direction: Direction) = {
+    direction match {
+      case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
+      case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
+      case _ => throw new IllegalArgumentException("Unsupported direction.")
+    }
+  }
+  
+  private def fetchToString(fetch: RexNode, offset: RexNode): String = {
+    val limitEnd = getFetchLimitEnd(fetch, offset)
+    
+    if (limitEnd == Long.MaxValue) {
+      "unlimited"
+    } else {
+      s"$limitEnd"
+    }
+  }
+  
+  private[flink] def getFetchLimitEnd (fetch: RexNode, offset: RexNode): Long = {
+    if (fetch != null) {
+      RexLiteral.intValue(fetch) + getFetchLimitStart(offset)
+    } else {
+      Long.MaxValue
+    }
+  }
+  
+  private[flink] def getFetchLimitStart (offset: RexNode): Long = {
+    if (offset != null) {
+      RexLiteral.intValue(offset)
+    } else {
+      0L
+    }
+  }
+  
+  private[flink] def sortToString(
+    rowRelDataType: RelDataType,
+    sortCollation: RelCollation,
+    sortOffset: RexNode,
+    sortFetch: RexNode): String = {
+      s"Sort(by: ($$sortFieldsToString(sortCollation, rowRelDataType))," +
+        (if (sortOffset != null) {
+          " offset: $offsetToString(sortOffset)," 
+        } else {
+          ""  
+        }) +
+        (if (sortFetch != null) {
+          " fetch: $fetchToString(sortFetch, sortOffset))"
+        } else {
+          ""  
+        })
+  }
+  
+  private[flink] def sortExplainTerms(
+      pw: RelWriter,
+      rowRelDataType: RelDataType,
+      sortCollation: RelCollation,
+      sortOffset: RexNode,
+      sortFetch: RexNode) : RelWriter = {
+    
+    pw
+      .item("orderBy", sortFieldsToString(sortCollation, rowRelDataType))
+      .itemIf("offset", offsetToString(sortOffset), sortOffset != null)
+      .itemIf("fetch", fetchToString(sortFetch, sortOffset), sortFetch != null)
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
index 192237a..aa6620d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
 import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction}
 import org.apache.flink.types.Row
+import org.apache.flink.table.plan.nodes.CommonSort
 
 import scala.collection.JavaConverters._
 
@@ -43,19 +44,12 @@ class DataSetSort(
     offset: RexNode,
     fetch: RexNode)
   extends SingleRel(cluster, traitSet, inp)
+  with CommonSort
   with DataSetRel {
 
-  private val limitStart: Long = if (offset != null) {
-    RexLiteral.intValue(offset)
-  } else {
-    0L
-  }
+  private val limitStart: Long =  getFetchLimitStart(offset)
 
-  private val limitEnd: Long = if (fetch != null) {
-    RexLiteral.intValue(fetch) + limitStart
-  } else {
-    Long.MaxValue
-  }
+  private val limitEnd: Long = getFetchLimitEnd(fetch, offset)
 
   override def deriveRowType(): RelDataType = rowRelDataType
 
@@ -127,7 +121,7 @@ class DataSetSort(
         limitEnd,
         broadcastName)
 
-      val limitName = s"offset: $offsetToString, fetch: $fetchToString"
+      val limitName = s"offset: $$offsetToString(offset), fetch: $$fetchToString(fetch, offset))"
 
       partitionedDs
         .filter(limitFunction)
@@ -136,36 +130,19 @@ class DataSetSort(
     }
   }
 
-  private def directionToOrder(direction: Direction) = {
-    direction match {
-      case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
-      case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
-      case _ => throw new IllegalArgumentException("Unsupported direction.")
-    }
-
-  }
-
   private val fieldCollations = collations.getFieldCollations.asScala
     .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
-
-  private val sortFieldsToString = fieldCollations
-    .map(col => s"${getRowType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
-
-  private val offsetToString = s"$offset"
-
-  private val fetchToString = if (limitEnd == Long.MaxValue) {
-    "unlimited"
-  } else {
-    s"$limitEnd"
+    
+  override def toString: String = {
+    sortToString(getRowType, collations, offset, fetch)
   }
-
-  override def toString: String =
-    s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: $fetchToString)"
-
+    
   override def explainTerms(pw: RelWriter) : RelWriter = {
-    super.explainTerms(pw)
-      .item("orderBy", sortFieldsToString)
-      .item("offset", offsetToString)
-      .item("fetch", fetchToString)
+     sortExplainTerms(
+      super.explainTerms(pw),
+      getRowType, 
+      collations, 
+      offset, 
+      fetch)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
new file mode 100644
index 0000000..a11e6c1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{ RelNode, RelWriter }
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.nodes.CommonSort
+import org.apache.calcite.rel.core.Sort
+
+/**
+ * Flink RelNode which matches along with Sort Rule.
+ *
+ */
+class DataStreamSort(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputNode: RelNode,
+    inputSchema: RowSchema,
+    schema: RowSchema,
+    sortCollation: RelCollation,
+    sortOffset: RexNode,
+    sortFetch: RexNode,
+    description: String)
+  extends Sort(cluster, traitSet, inputNode, sortCollation, sortOffset, sortFetch)
+  with CommonSort
+  with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = schema.logicalType
+
+  override def copy(
+    traitSet: RelTraitSet,
+    input: RelNode,
+    newCollation: RelCollation,
+    offset: RexNode,
+    fetch: RexNode): Sort = {
+    
+    new DataStreamSort(
+      cluster,
+      traitSet,
+      input,
+      inputSchema,
+      schema,
+      newCollation,
+      offset,
+      fetch,
+      description)
+  }
+
+  override def toString: String = {
+    sortToString(schema.logicalType, sortCollation, sortOffset, sortFetch)
+  }
+  
+  override def explainTerms(pw: RelWriter) : RelWriter = {
+    sortExplainTerms(
+      pw.input("input", getInput()),
+      schema.logicalType,
+      sortCollation, 
+      sortOffset, 
+      sortFetch)
+  }
+
+  override def translateToPlan(
+      tableEnv: StreamTableEnvironment,
+      queryConfig: StreamQueryConfig): DataStream[CRow] = {
+    
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig)
+    
+    // need to identify time between others order fields. Time needs to be first sort element
+    val timeType = SortUtil.getFirstSortField(sortCollation, schema.logicalType).getType
+    
+    // time ordering needs to be ascending
+    if (SortUtil.getFirstSortDirection(sortCollation) != Direction.ASCENDING) {
+      throw new TableException("Primary sort order of a streaming table must be ascending on time.")
+    }
+    
+    val execCfg = tableEnv.execEnv.getConfig
+      
+    // enable to extend for other types of aggregates that will not be implemented in a window
+    timeType match {
+        case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType)  =>
+            (sortOffset, sortFetch) match {
+              case (_: RexNode, _: RexNode)  => // offset and fetch needs retraction
+                throw new TableException(
+                  "Streaming tables do not support sort with offset and fetch.")
+              case (_, _: RexNode) => // offset needs retraction
+                throw new TableException("Streaming tables do not support sort with fetch.")
+              case (_: RexNode, _) =>  // fetch needs retraction
+                throw new TableException("Streaming tables do not support sort with offset.")
+              case _ => createSortProcTime(inputDS, execCfg)
+            }
+        case _ if FlinkTypeFactory.isRowtimeIndicatorType(timeType) =>
+            (sortOffset, sortFetch) match {
+              case (_: RexNode, _: RexNode)  => // offset and fetch needs retraction
+                throw new TableException(
+                  "Streaming tables do not support sort with offset and fetch")
+              case (_, _: RexNode) => // offset needs retraction
+                throw new TableException("Streaming tables do not support sort with fetch")
+              case (_: RexNode, _) =>  // fetch needs retraction
+                throw new TableException("Streaming tables do not support sort with offset")
+              case _ => createSortRowTime(inputDS, execCfg)
+            }
+        case _ =>
+          throw new TableException(
+            "Primary sort order of a streaming table must be ascending on time.")
+    }
+    
+  }
+
+  /**
+   * Create Sort logic based on processing time
+   */
+  def createSortProcTime(
+    inputDS: DataStream[CRow],
+    execCfg: ExecutionConfig): DataStream[CRow] = {
+
+   val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+    
+    // if the order has secondary sorting fields in addition to the proctime
+    if (sortCollation.getFieldCollations.size() > 1) {
+    
+      val processFunction = SortUtil.createProcTimeSortFunction(
+        sortCollation,
+        inputSchema.logicalType, 
+        inputSchema.physicalTypeInfo, 
+        execCfg)
+      
+      inputDS.keyBy(new NullByteKeySelector[CRow])
+        .process(processFunction).setParallelism(1).setMaxParallelism(1)
+        .returns(returnTypeInfo)
+        .asInstanceOf[DataStream[CRow]]
+    } else {
+      // if the order is done only on proctime we only need to forward the elements
+      inputDS
+        .map(new IdentityCRowMap())
+        .setParallelism(1).setMaxParallelism(1)
+        .returns(returnTypeInfo)
+        .asInstanceOf[DataStream[CRow]]
+    }   
+  }
+  
+  /**
+   * Create Sort logic based on row time
+   */
+  def createSortRowTime(
+    inputDS: DataStream[CRow],
+    execCfg: ExecutionConfig): DataStream[CRow] = {
+
+    val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
+       
+    val processFunction = SortUtil.createRowTimeSortFunction(
+      sortCollation,
+      inputSchema.logicalType, 
+      inputSchema.physicalTypeInfo, 
+      execCfg)
+      
+    inputDS.keyBy(new NullByteKeySelector[CRow])
+      .process(processFunction).setParallelism(1).setMaxParallelism(1)
+      .returns(returnTypeInfo)
+      .asInstanceOf[DataStream[CRow]]
+       
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index bb3833a..ebfbeb9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -182,6 +182,7 @@ object FlinkRuleSets {
     */
   val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
     // translate to DataStream nodes
+    DataStreamSortRule.INSTANCE,
     DataStreamGroupAggregateRule.INSTANCE,
     DataStreamOverAggregateRule.INSTANCE,
     DataStreamGroupWindowAggregateRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
new file mode 100644
index 0000000..17a643a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.plan.nodes.datastream.DataStreamSort
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSort
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.flink.table.runtime.aggregate.SortUtil
+
+/**
+ * Rule to convert a LogicalSort into a DataStreamSort.
+ */
+class DataStreamSortRule
+    extends ConverterRule(
+      classOf[FlinkLogicalSort],
+      FlinkConventions.LOGICAL,
+      FlinkConventions.DATASTREAM,
+      "DataStreamSortRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val sort: FlinkLogicalSort = call.rel(0).asInstanceOf[FlinkLogicalSort]
+    // Check if first sort attribute is time attribute and order is ascending
+    checkTimeOrder(sort)
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val sort: FlinkLogicalSort = rel.asInstanceOf[FlinkLogicalSort]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
+    val convInput: RelNode = RelOptRule.convert(sort.getInput(0), FlinkConventions.DATASTREAM)
+    
+    val inputRowType = convInput.asInstanceOf[RelSubset].getOriginal.getRowType
+
+    new DataStreamSort(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      new RowSchema(inputRowType),
+      new RowSchema(rel.getRowType),
+      sort.collation,
+      sort.offset,
+      sort.fetch,
+      description)
+  }
+  
+   
+  /**
+   * Checks if first sort attribute is time attribute and order is ascending.
+   */
+  def checkTimeOrder(sort: FlinkLogicalSort): Boolean = {
+    
+    val sortCollation = sort.collation
+    // get type of first sort field
+    val firstSortType = SortUtil.getFirstSortField(sortCollation, sort.getRowType).getType
+    // get direction of first sort field
+    val firstSortDirection = SortUtil.getFirstSortDirection(sortCollation)
+
+    FlinkTypeFactory.isTimeIndicatorType(firstSortType) && firstSortDirection == Direction.ASCENDING
+  }
+}
+
+object DataStreamSortRule {
+  val INSTANCE: RelOptRule = new DataStreamSortRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
new file mode 100644
index 0000000..2d0b14b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.util.{Collector, Preconditions}
+
+import java.util.ArrayList
+import java.util.Collections
+
+
+/**
+ * ProcessFunction to sort on processing time and additional attributes.
+ *
+ * @param inputRowType The data type of the input data.
+ * @param rowComparator A comparator to sort rows.
+ */
+class ProcTimeSortProcessFunction(
+    private val inputRowType: CRowTypeInfo,
+    private val rowComparator: CollectionRowComparator)
+  extends ProcessFunction[CRow, CRow] {
+
+  Preconditions.checkNotNull(rowComparator)
+
+  private var bufferedEvents: ListState[Row] = _
+  private val sortBuffer: ArrayList[Row] = new ArrayList[Row]
+  
+  private var outputC: CRow = _
+  
+  override def open(config: Configuration) {
+    val sortDescriptor = new ListStateDescriptor[Row](
+      "sortState",
+      inputRowType.asInstanceOf[CRowTypeInfo].rowType)
+    bufferedEvents = getRuntimeContext.getListState(sortDescriptor)
+
+    outputC = new CRow()
+  }
+
+  override def processElement(
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
+    val currentTime = ctx.timerService.currentProcessingTime
+
+    // buffer the event incoming event
+    bufferedEvents.add(input)
+
+    // register a timer for the next millisecond to sort and emit buffered data
+    ctx.timerService.registerProcessingTimeTimer(currentTime + 1)
+    
+  }
+  
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
+    
+    val iter =  bufferedEvents.get.iterator()
+
+    // insert all rows into the sort buffer
+    sortBuffer.clear()
+    while(iter.hasNext) {
+      sortBuffer.add(iter.next())
+    }
+    // sort the rows
+    Collections.sort(sortBuffer, rowComparator)
+    
+    // Emit the rows in order
+    var i = 0
+    while (i < sortBuffer.size) {
+      outputC.row = sortBuffer.get(i)
+      out.collect(outputC)
+      i += 1
+    }
+    
+    // remove all buffered rows
+    bufferedEvents.clear()
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
new file mode 100644
index 0000000..737f32c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+import java.util.Collections
+import java.util.{List => JList, ArrayList => JArrayList}
+
+/**
+ * ProcessFunction to sort on event-time and possibly addtional secondary sort attributes.
+ *
+  * @param inputRowType The data type of the input data.
+  * @param rowComparator A comparator to sort rows.
+ */
+class RowTimeSortProcessFunction(
+    private val inputRowType: CRowTypeInfo,
+    private val rowComparator: Option[CollectionRowComparator])
+  extends ProcessFunction[CRow, CRow] {
+
+  Preconditions.checkNotNull(rowComparator)
+
+  // State to collect rows between watermarks.
+  private var dataState: MapState[Long, JList[Row]] = _
+
+  // the state keep the last triggering timestamp. Used to filter late events.
+  private var lastTriggeringTsState: ValueState[Long] = _
+
+  private var outputC: CRow = _
+  
+  override def open(config: Configuration) {
+     
+    val keyTypeInformation: TypeInformation[Long] =
+      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
+    val valueTypeInformation: TypeInformation[JList[Row]] =
+      new ListTypeInfo[Row](inputRowType.asInstanceOf[CRowTypeInfo].rowType)
+
+    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+      new MapStateDescriptor[Long, JList[Row]](
+        "dataState",
+        keyTypeInformation,
+        valueTypeInformation)
+
+    dataState = getRuntimeContext.getMapState(mapStateDescriptor)
+    
+    val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
+      new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
+    lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor)
+    
+    outputC = new CRow()
+  }
+
+  
+  override def processElement(
+    inputC: CRow,
+    ctx: ProcessFunction[CRow, CRow]#Context,
+    out: Collector[CRow]): Unit = {
+
+    val input = inputC.row
+    
+    // timestamp of the processed row
+    val rowtime = ctx.timestamp
+
+    val lastTriggeringTs = lastTriggeringTsState.value
+
+    // check if the row is late and drop it if it is late
+    if (rowtime > lastTriggeringTs) {
+      // get list for timestamp
+      val rows = dataState.get(rowtime)
+      if (null != rows) {
+        rows.add(input)
+        dataState.put(rowtime, rows)
+      } else {
+        val rows = new JArrayList[Row]
+        rows.add(input)
+        dataState.put(rowtime, rows)
+
+        // register event time timer
+        ctx.timerService.registerEventTimeTimer(rowtime)
+      }
+    }
+  }
+  
+  
+  override def onTimer(
+    timestamp: Long,
+    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    out: Collector[CRow]): Unit = {
+    
+    // gets all rows for the triggering timestamps
+    val inputs: JList[Row] = dataState.get(timestamp)
+
+    if (null != inputs) {
+
+      // sort rows on secondary fields if necessary
+      if (rowComparator.isDefined) {
+        Collections.sort(inputs, rowComparator.get)
+      }
+
+      // emit rows in order
+      var i = 0
+      while (i < inputs.size) {
+        outputC.row = inputs.get(i)
+        out.collect(outputC)
+        i += 1
+      }
+    
+      // remove emitted rows from state
+      dataState.remove(timestamp)
+      lastTriggeringTsState.update(timestamp)
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
new file mode 100644
index 0000000..5f83f1d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rel.RelFieldCollation.Direction
+
+import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.java.typeutils.runtime.RowComparator
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeinfo.AtomicType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.table.plan.schema.RowSchema
+
+import java.util.Comparator
+
+import scala.collection.JavaConverters._
+
+/**
+ * Class represents a collection of helper methods to build the sort logic.
+ * It encapsulates as well the implementation for ordering and generic interfaces
+ */
+object SortUtil {
+
+  /**
+   * Creates a ProcessFunction to sort rows based on event time and possibly other secondary fields.
+   *
+   * @param collationSort The list of sort collations.
+   * @param inputType The row type of the input.
+   * @param execCfg Execution configuration to configure comparators.
+   * @return A function to sort stream values based on event-time and secondary sort fields.
+   */
+  private[flink] def createRowTimeSortFunction(
+    collationSort: RelCollation,
+    inputType: RelDataType,
+    inputTypeInfo: TypeInformation[Row],
+    execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
+
+    val collectionRowComparator = if (collationSort.getFieldCollations.size() > 1) {
+
+      val rowComp = createRowComparator(
+        inputType,
+        collationSort.getFieldCollations.asScala.tail, // strip off time collation
+        execCfg)
+
+      Some(new CollectionRowComparator(rowComp))
+    } else {
+      None
+    }
+
+    val inputCRowType = CRowTypeInfo(inputTypeInfo)
+ 
+    new RowTimeSortProcessFunction(
+      inputCRowType,
+      collectionRowComparator)
+
+  }
+  
+  /**
+   * Creates a ProcessFunction to sort rows based on processing time and additional fields.
+   *
+   * @param collationSort The list of sort collations.
+   * @param inputType The row type of the input.
+   * @param execCfg Execution configuration to configure comparators.
+   * @return A function to sort stream values based on proctime and other secondary sort fields.
+   */
+  private[flink] def createProcTimeSortFunction(
+    collationSort: RelCollation,
+    inputType: RelDataType,
+    inputTypeInfo: TypeInformation[Row],
+    execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
+
+    val rowComp = createRowComparator(
+      inputType,
+      collationSort.getFieldCollations.asScala.tail, // strip off time collation
+      execCfg)
+
+    val collectionRowComparator = new CollectionRowComparator(rowComp)
+    
+    val inputCRowType = CRowTypeInfo(inputTypeInfo)
+    
+    new ProcTimeSortProcessFunction(
+      inputCRowType,
+      collectionRowComparator)
+
+  }
+  
+  /**
+   * Creates a RowComparator for the provided field collations and input type.
+   *
+   * @param inputType the row type of the input.
+   * @param fieldCollations the field collations
+   * @param execConfig the execution configuration.
+    *
+   * @return A RowComparator for the provided sort collations and input type.
+   */
+  private def createRowComparator(
+      inputType: RelDataType,
+      fieldCollations: Seq[RelFieldCollation],
+      execConfig: ExecutionConfig): RowComparator = {
+
+    val sortFields = fieldCollations.map(_.getFieldIndex)
+    val sortDirections = fieldCollations.map(_.direction).map {
+      case Direction.ASCENDING => true
+      case Direction.DESCENDING => false
+      case _ =>  throw new TableException("SQL/Table does not support such sorting")
+    }
+
+    val fieldComps = for ((k, o) <- sortFields.zip(sortDirections)) yield {
+      FlinkTypeFactory.toTypeInfo(inputType.getFieldList.get(k).getType) match {
+        case a: AtomicType[AnyRef] => a.createComparator(o, execConfig)
+        case x: TypeInformation[_] =>  
+          throw new TableException(s"Unsupported field type $x to sort on.")
+      }
+    }
+
+    new RowComparator(
+      new RowSchema(inputType).physicalArity,
+      sortFields.toArray,
+      fieldComps.toArray,
+      new Array[TypeSerializer[AnyRef]](0), // not required because we only compare objects.
+      sortDirections.toArray)
+    
+  }
+ 
+  /**
+   * Returns the direction of the first sort field.
+   *
+   * @param collationSort The list of sort collations.
+   * @return The direction of the first sort field.
+   */
+  def getFirstSortDirection(collationSort: RelCollation): Direction = {
+    collationSort.getFieldCollations.get(0).direction
+  }
+  
+  /**
+   * Returns the first sort field.
+   *
+   * @param collationSort The list of sort collations.
+   * @param rowType The row type of the input.
+   * @return The first sort field.
+   */
+  def getFirstSortField(collationSort: RelCollation, rowType: RelDataType): RelDataTypeField = {
+    val idx = collationSort.getFieldCollations.get(0).getFieldIndex
+    rowType.getFieldList.get(idx)
+  }
+  
+}
+
+/**
+ * Wrapper for Row TypeComparator to a Java Comparator object
+ */
+class CollectionRowComparator(
+    private val rowComp: TypeComparator[Row]) extends Comparator[Row] with Serializable {
+  
+  override def compare(arg0:Row, arg1:Row):Int = {
+    rowComp.compare(arg0, arg1)
+  }
+}
+
+
+/**
+ * Identity map for forwarding the fields based on their arriving times
+ */
+private[flink] class IdentityCRowMap extends MapFunction[CRow,CRow] {
+   override def map(value:CRow):CRow ={
+     value
+   }
+ }

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
index 397e72c..253d54a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.api.scala.stream.sql
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.stream.sql.OverWindowITCase.EventTimeSourceFunction
+import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
@@ -862,19 +862,3 @@ class OverWindowITCase extends StreamingWithStateTestBase {
   }
 
 }
-
-object OverWindowITCase {
-
-  class EventTimeSourceFunction[T](
-      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
-    override def run(ctx: SourceContext[T]): Unit = {
-      dataWithTimestampList.foreach {
-        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
-        case Right(w) => ctx.emitWatermark(new Watermark(w))
-      }
-    }
-
-    override def cancel(): Unit = ???
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
new file mode 100644
index 0000000..2340591
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala.stream.sql.SortITCase.StringRowSelectorSink
+import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class SortITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testEventTimeOrderBy(): Unit = {
+    val data = Seq(
+      Left((1500L, (1L, 15, "Hello"))),
+      Left((1600L, (1L, 16, "Hello"))),
+      Left((1000L, (1L, 1, "Hello"))),
+      Left((2000L, (2L, 2, "Hello"))),
+      Right(1000L),
+      Left((2000L, (2L, 2, "Hello"))),
+      Left((2000L, (2L, 3, "Hello"))),
+      Left((3000L, (3L, 3, "Hello"))),
+      Left((2000L, (3L, 1, "Hello"))),
+      Right(2000L),
+      Left((4000L, (4L, 4, "Hello"))),
+      Right(3000L),
+      Left((5000L, (5L, 5, "Hello"))),
+      Right(5000L),
+      Left((6000L, (6L, 65, "Hello"))),
+      Left((6000L, (6L, 6, "Hello"))),
+      Left((6000L, (6L, 67, "Hello"))),
+      Left((6000L, (6L, -1, "Hello"))),
+      Left((6000L, (6L, 6, "Hello"))),
+      Right(7000L),
+      Left((9000L, (6L, 9, "Hello"))),
+      Left((8500L, (6L, 18, "Hello"))),
+      Left((9000L, (6L, 7, "Hello"))),
+      Right(10000L),
+      Left((10000L, (7L, 7, "Hello World"))),
+      Left((11000L, (7L, 77, "Hello World"))),
+      Left((11000L, (7L, 17, "Hello World"))),
+      Right(12000L),
+      Left((14000L, (7L, 18, "Hello World"))),
+      Right(14000L),
+      Left((15000L, (8L, 8, "Hello World"))),
+      Right(17000L),
+      Left((20000L, (20L, 20, "Hello World"))), 
+      Right(19000L))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+      
+    tEnv.registerTable("T1", t1)
+
+    val  sqlQuery = "SELECT b FROM T1 ORDER BY rowtime, b ASC "
+      
+      
+    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    result.addSink(new StringRowSelectorSink(0)).setParallelism(1)
+    env.execute()
+    
+    val expected = mutable.MutableList(
+      "1", "15", "16",
+      "1", "2", "2", "3",
+      "3",
+      "4",
+      "5",
+      "-1", "6", "6", "65", "67",
+      "18", "7", "9",
+      "7", "17", "77", 
+      "18",
+      "8",
+      "20")
+    assertEquals(expected, SortITCase.testResults)
+  }
+}
+
+object SortITCase {
+
+  final class StringRowSelectorSink(private val field:Int) extends RichSinkFunction[Row]() {
+    def invoke(value: Row) {
+      testResults.synchronized {
+        testResults += value.getField(field).toString
+      }
+    }
+  }
+  
+  var testResults: mutable.MutableList[String] = mutable.MutableList.empty[String]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
new file mode 100644
index 0000000..1d50fc1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class SortTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c,
+      'proctime.proctime, 'rowtime.rowtime)
+  
+  @Test
+  def testSortProcessingTime() = {
+
+    val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime, c"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode("DataStreamSort",
+          streamTableNode(0),
+          term("orderBy", "proctime ASC", "c ASC")),
+        term("select", "a", "TIME_MATERIALIZATION(proctime) AS proctime", "c"))
+
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testSortRowTime() = {
+
+    val sqlQuery = "SELECT a FROM MyTable ORDER BY rowtime, c"
+      
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode("DataStreamSort",
+          streamTableNode(0),
+          term("orderBy", "rowtime ASC, c ASC")),
+        term("select", "a", "TIME_MATERIALIZATION(rowtime) AS rowtime", "c"))
+       
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  // test should fail because time order is descending
+  @Test(expected = classOf[TableException])
+  def testSortProcessingTimeDesc() = {
+
+    val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime DESC, c"
+    streamUtil.verifySql(sqlQuery, "")
+  }
+  
+  
+  // test should fail because time is not the primary order field
+  @Test(expected = classOf[TableException])
+  def testSortProcessingTimeSecondaryField() = {
+
+    val sqlQuery = "SELECT a FROM MyTable ORDER BY c, proctime"
+    streamUtil.verifySql(sqlQuery, "")
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
new file mode 100644
index 0000000..b883870
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.watermark.Watermark
+
+object TimeTestUtil {
+  
+  class EventTimeSourceFunction[T](
+      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
+    override def run(ctx: SourceContext[T]): Unit = {
+      dataWithTimestampList.foreach {
+        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
+        case Right(w) => ctx.emitWatermark(new Watermark(w))
+      }
+    }
+
+    override def cancel(): Unit = ???
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8c8f204/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala
new file mode 100644
index 0000000..df4e1aa
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/TimeSortProcessFunctionTest.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.lang.{Integer => JInt, Long => JLong}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.types.Row
+import org.junit.Test
+import org.apache.flink.table.runtime.aggregate.TimeSortProcessFunctionTest._
+import org.apache.flink.api.java.typeutils.runtime.RowComparator
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.streaming.api.TimeCharacteristic
+
+class TimeSortProcessFunctionTest {
+
+  
+  @Test
+  def testSortProcTimeHarnessPartitioned(): Unit = {
+    
+    val rT =  new RowTypeInfo(Array[TypeInformation[_]](
+      INT_TYPE_INFO,
+      LONG_TYPE_INFO,
+      INT_TYPE_INFO,
+      STRING_TYPE_INFO,
+      LONG_TYPE_INFO),
+      Array("a", "b", "c", "d", "e"))
+    
+    val indexes = Array(1, 2)
+      
+    val fieldComps = Array[TypeComparator[AnyRef]](
+      LONG_TYPE_INFO.createComparator(true, null).asInstanceOf[TypeComparator[AnyRef]],
+      INT_TYPE_INFO.createComparator(false, null).asInstanceOf[TypeComparator[AnyRef]] )
+    val booleanOrders = Array(true, false)    
+    
+
+    val rowComp = new RowComparator(
+      rT.getTotalFields,
+      indexes,
+      fieldComps,
+      new Array[TypeSerializer[AnyRef]](0), //used only for serialized comparisons
+      booleanOrders)
+    
+    val collectionRowComparator = new CollectionRowComparator(rowComp)
+    
+    val inputCRowType = CRowTypeInfo(rT)
+    
+    val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+      new ProcTimeSortProcessFunction(
+        inputCRowType,
+        collectionRowComparator))
+  
+   val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer,CRow,CRow](
+      processFunction, 
+      new TupleRowSelector(0), 
+      BasicTypeInfo.INT_TYPE_INFO)
+    
+   testHarness.open()
+
+   testHarness.setProcessingTime(3)
+
+      // timestamp is ignored in processing time
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true), 1001))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2002))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2003))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2004))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006))
+
+    //move the timestamp to ensure the execution
+    testHarness.setProcessingTime(1005)
+    
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
+    
+    testHarness.setProcessingTime(1008)
+    
+    val result = testHarness.getOutput
+    
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+    
+    // all elements at the same proc timestamp have the same value
+    // elements should be sorted ascending on field 1 and descending on field 2
+    // (10,0) (11,1) (12,2) (12,1) (12,0)
+    // (1,0) (2,0)
+    
+     expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true), 4))
+     expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true), 4))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong),true), 4))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true), 4))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong),true), 4))
+    
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
+    expectedOutput.add(new StreamRecord(new CRow(
+        Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
+
+    TestHarnessUtil.assertOutputEquals("Output was not correctly sorted.", expectedOutput, result)
+    
+    testHarness.close()
+  }
+  
+  @Test
+  def testSortRowTimeHarnessPartitioned(): Unit = {
+    
+    val rT =  new RowTypeInfo(Array[TypeInformation[_]](
+      INT_TYPE_INFO,
+      LONG_TYPE_INFO,
+      INT_TYPE_INFO,
+      STRING_TYPE_INFO,
+      LONG_TYPE_INFO),
+      Array("a", "b", "c", "d", "e"))
+
+    val indexes = Array(1, 2)
+      
+    val fieldComps = Array[TypeComparator[AnyRef]](
+      LONG_TYPE_INFO.createComparator(true, null).asInstanceOf[TypeComparator[AnyRef]],
+      INT_TYPE_INFO.createComparator(false, null).asInstanceOf[TypeComparator[AnyRef]] )
+    val booleanOrders = Array(true, false)    
+
+    val rowComp = new RowComparator(
+      rT.getTotalFields,
+      indexes,
+      fieldComps,
+      new Array[TypeSerializer[AnyRef]](0), //used only for serialized comparisons
+      booleanOrders)
+    
+    val collectionRowComparator = new CollectionRowComparator(rowComp)
+    
+    val inputCRowType = CRowTypeInfo(rT)
+    
+    val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+      new RowTimeSortProcessFunction(
+        inputCRowType,
+        Some(collectionRowComparator)))
+  
+   val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, CRow, CRow](
+      processFunction, 
+      new TupleRowSelector(0), 
+      BasicTypeInfo.INT_TYPE_INFO)
+    
+   testHarness.open()
+
+   testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime)
+   testHarness.processWatermark(3)
+
+      // timestamp is ignored in processing time
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true), 1001))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2002))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2002))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2002))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2002))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2004))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006))
+
+    // move watermark forward
+    testHarness.processWatermark(2007)
+
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2008))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2002)) // too late
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2019)) // too early
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2008))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2010))
+    testHarness.processElement(new StreamRecord(new CRow(
+      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2008))
+
+    // move watermark forward
+    testHarness.processWatermark(2012)
+
+    val result = testHarness.getOutput
+    
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+    
+    // all elements at the same proc timestamp have the same value
+    // elements should be sorted ascending on field 1 and descending on field 2
+    // (10,0) (11,1) (12,2) (12,1) (12,0)
+    expectedOutput.add(new Watermark(3))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true), 1001))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong),true), 2002))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true), 2002))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 11L: JLong),true), 2002))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong),true), 2002))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong),true), 2004))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true), 2006))
+    expectedOutput.add(new Watermark(2007))
+
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2008))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2008))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2008))
+    expectedOutput.add(new StreamRecord(new CRow(
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2010))
+
+    expectedOutput.add(new Watermark(2012))
+
+    TestHarnessUtil.assertOutputEquals("Output was not correctly sorted.", expectedOutput, result)
+        
+    testHarness.close()
+        
+  }
+}
+
+object TimeSortProcessFunctionTest {
+
+  /**
+   * Simple test class that returns a specified field as the selector function
+   */
+  class TupleRowSelector(private val selectorField: Int) extends KeySelector[CRow, Integer] {
+
+    override def getKey(value: CRow): Integer = {
+      value.row.getField(selectorField).asInstanceOf[Integer]
+    }
+  }
+
+}