You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/31 12:26:08 UTC

flink git commit: [hotfix] [table] Fix IndexOOBE in DataStreamSortRule.

Repository: flink
Updated Branches:
  refs/heads/master 2cb58960e -> 2b76ecab8


[hotfix] [table] Fix IndexOOBE in DataStreamSortRule.

This closes #5370.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b76ecab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b76ecab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b76ecab

Branch: refs/heads/master
Commit: 2b76ecab8cca8ec8de9b1187fcd656019d36f9f5
Parents: 2cb5896
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Jan 26 10:19:17 2018 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Jan 31 13:25:28 2018 +0100

----------------------------------------------------------------------
 .../plan/rules/datastream/DataStreamSortRule.scala      |  4 ++++
 .../apache/flink/table/runtime/aggregate/SortUtil.scala |  4 ++++
 .../apache/flink/table/api/stream/sql/SortTest.scala    |  4 ++--
 .../api/stream/sql/validation/SortValidationTest.scala  | 12 ++++++++++--
 4 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
index 17a643a..0186347 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
@@ -71,6 +71,10 @@ class DataStreamSortRule
   def checkTimeOrder(sort: FlinkLogicalSort): Boolean = {
     
     val sortCollation = sort.collation
+    if (sortCollation.getFieldCollations.size() == 0) {
+      // no sort fields defined
+      return false
+    }
     // get type of first sort field
     val firstSortType = SortUtil.getFirstSortField(sortCollation, sort.getRowType).getType
     // get direction of first sort field

http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
index 2bafed9..67bb603 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
@@ -35,6 +35,7 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.util.Preconditions
 
 import java.util.Comparator
 
@@ -60,6 +61,7 @@ object SortUtil {
     inputTypeInfo: TypeInformation[Row],
     execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
 
+    Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0)
     val rowtimeIdx = collationSort.getFieldCollations.get(0).getFieldIndex
 
     val collectionRowComparator = if (collationSort.getFieldCollations.size() > 1) {
@@ -158,6 +160,7 @@ object SortUtil {
    * @return The direction of the first sort field.
    */
   def getFirstSortDirection(collationSort: RelCollation): Direction = {
+    Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0)
     collationSort.getFieldCollations.get(0).direction
   }
   
@@ -169,6 +172,7 @@ object SortUtil {
    * @return The first sort field.
    */
   def getFirstSortField(collationSort: RelCollation, rowType: RelDataType): RelDataTypeField = {
+    Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0)
     val idx = collationSort.getFieldCollations.get(0).getFieldIndex
     rowType.getFieldList.get(idx)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
index d20002a..087ae7d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
@@ -31,7 +31,7 @@ class SortTest extends TableTestBase {
       'proctime.proctime, 'rowtime.rowtime)
   
   @Test
-  def testSortProcessingTime() = {
+  def testSortProcessingTime(): Unit = {
 
     val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime, c"
 
@@ -47,7 +47,7 @@ class SortTest extends TableTestBase {
   }
 
   @Test
-  def testSortRowTime() = {
+  def testSortRowTime(): Unit = {
 
     val sqlQuery = "SELECT a FROM MyTable ORDER BY rowtime, c"
       

http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
index ae7486d..083ed94 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
@@ -32,7 +32,7 @@ class SortValidationTest extends TableTestBase {
 
   // test should fail because time order is descending
   @Test(expected = classOf[TableException])
-  def testSortProcessingTimeDesc() = {
+  def testSortProcessingTimeDesc(): Unit = {
 
     val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime DESC, c"
     streamUtil.verifySql(sqlQuery, "")
@@ -41,9 +41,17 @@ class SortValidationTest extends TableTestBase {
 
   // test should fail because time is not the primary order field
   @Test(expected = classOf[TableException])
-  def testSortProcessingTimeSecondaryField() = {
+  def testSortProcessingTimeSecondaryField(): Unit = {
 
     val sqlQuery = "SELECT a FROM MyTable ORDER BY c, proctime"
     streamUtil.verifySql(sqlQuery, "")
   }
+
+  // test should fail because LIMIT is not supported without sorting
+  @Test(expected = classOf[TableException])
+  def testLimitWithoutSorting(): Unit = {
+
+    val sqlQuery = "SELECT a FROM MyTable LIMIT 3"
+    streamUtil.verifySql(sqlQuery, "")
+  }
 }