You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/31 12:26:08 UTC
flink git commit: [hotfix] [table] Fix IndexOOBE in
DataStreamSortRule.
Repository: flink
Updated Branches:
refs/heads/master 2cb58960e -> 2b76ecab8
[hotfix] [table] Fix IndexOOBE in DataStreamSortRule.
This closes #5370.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b76ecab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b76ecab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b76ecab
Branch: refs/heads/master
Commit: 2b76ecab8cca8ec8de9b1187fcd656019d36f9f5
Parents: 2cb5896
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Jan 26 10:19:17 2018 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Jan 31 13:25:28 2018 +0100
----------------------------------------------------------------------
.../plan/rules/datastream/DataStreamSortRule.scala | 4 ++++
.../apache/flink/table/runtime/aggregate/SortUtil.scala | 4 ++++
.../apache/flink/table/api/stream/sql/SortTest.scala | 4 ++--
.../api/stream/sql/validation/SortValidationTest.scala | 12 ++++++++++--
4 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
index 17a643a..0186347 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamSortRule.scala
@@ -71,6 +71,10 @@ class DataStreamSortRule
def checkTimeOrder(sort: FlinkLogicalSort): Boolean = {
val sortCollation = sort.collation
+ if (sortCollation.getFieldCollations.size() == 0) {
+ // no sort fields defined
+ return false
+ }
// get type of first sort field
val firstSortType = SortUtil.getFirstSortField(sortCollation, sort.getRowType).getType
// get direction of first sort field
http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
index 2bafed9..67bb603 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
@@ -35,6 +35,7 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.util.Preconditions
import java.util.Comparator
@@ -60,6 +61,7 @@ object SortUtil {
inputTypeInfo: TypeInformation[Row],
execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
+ Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0)
val rowtimeIdx = collationSort.getFieldCollations.get(0).getFieldIndex
val collectionRowComparator = if (collationSort.getFieldCollations.size() > 1) {
@@ -158,6 +160,7 @@ object SortUtil {
* @return The direction of the first sort field.
*/
def getFirstSortDirection(collationSort: RelCollation): Direction = {
+ Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0)
collationSort.getFieldCollations.get(0).direction
}
@@ -169,6 +172,7 @@ object SortUtil {
* @return The first sort field.
*/
def getFirstSortField(collationSort: RelCollation, rowType: RelDataType): RelDataTypeField = {
+ Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0)
val idx = collationSort.getFieldCollations.get(0).getFieldIndex
rowType.getFieldList.get(idx)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
index d20002a..087ae7d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala
@@ -31,7 +31,7 @@ class SortTest extends TableTestBase {
'proctime.proctime, 'rowtime.rowtime)
@Test
- def testSortProcessingTime() = {
+ def testSortProcessingTime(): Unit = {
val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime, c"
@@ -47,7 +47,7 @@ class SortTest extends TableTestBase {
}
@Test
- def testSortRowTime() = {
+ def testSortRowTime(): Unit = {
val sqlQuery = "SELECT a FROM MyTable ORDER BY rowtime, c"
http://git-wip-us.apache.org/repos/asf/flink/blob/2b76ecab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
index ae7486d..083ed94 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
@@ -32,7 +32,7 @@ class SortValidationTest extends TableTestBase {
// test should fail because time order is descending
@Test(expected = classOf[TableException])
- def testSortProcessingTimeDesc() = {
+ def testSortProcessingTimeDesc(): Unit = {
val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime DESC, c"
streamUtil.verifySql(sqlQuery, "")
@@ -41,9 +41,17 @@ class SortValidationTest extends TableTestBase {
// test should fail because time is not the primary order field
@Test(expected = classOf[TableException])
- def testSortProcessingTimeSecondaryField() = {
+ def testSortProcessingTimeSecondaryField(): Unit = {
val sqlQuery = "SELECT a FROM MyTable ORDER BY c, proctime"
streamUtil.verifySql(sqlQuery, "")
}
+
+ // test should fail because LIMIT is not supported without sorting
+ @Test(expected = classOf[TableException])
+ def testLimitWithoutSorting(): Unit = {
+
+ val sqlQuery = "SELECT a FROM MyTable LIMIT 3"
+ streamUtil.verifySql(sqlQuery, "")
+ }
}