You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/01/04 13:29:19 UTC

[flink] branch release-1.7 updated: [FLINK-11227] [table] Fix bound checking errors in DescriptorProperties

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new 8ee0465  [FLINK-11227] [table] Fix bound checking errors in DescriptorProperties
8ee0465 is described below

commit 8ee0465761d58eb7fd96846c09614410d644ed65
Author: Xingcan Cui <xc...@apache.org>
AuthorDate: Tue Dec 25 12:03:39 2018 +0800

    [FLINK-11227] [table] Fix bound checking errors in DescriptorProperties
    
    This closes #7373.
---
 .../table/descriptors/DescriptorProperties.java    |  4 +-
 .../descriptors/DescriptorPropertiesTest.scala     | 45 ++++++++++++++++++++--
 2 files changed, 44 insertions(+), 5 deletions(-)

diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index c476981..7628912 100644
--- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -938,7 +938,7 @@ public class DescriptorProperties {
 		}
 
 		// validate
-		for (int i = 0; i < maxIndex; i++) {
+		for (int i = 0; i <= maxIndex; i++) {
 			for (Map.Entry<String, Consumer<String>> subKey : subKeyValidation.entrySet()) {
 				final String fullKey = key + '.' + i + '.' + subKey.getKey();
 				if (properties.containsKey(fullKey)) {
@@ -1134,7 +1134,7 @@ public class DescriptorProperties {
 		}
 
 		// validate array elements
-		for (int i = 0; i < maxIndex; i++) {
+		for (int i = 0; i <= maxIndex; i++) {
 			final String fullKey = key + '.' + i;
 			if (properties.containsKey(fullKey)) {
 				// run validation logic
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala
index b2a8ec9..fe7c75d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.descriptors
 
 import java.util
 import java.util.Collections
+import java.util.function.Consumer
 
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.util.JavaScalaConversionUtil.toJava
@@ -32,6 +33,9 @@ import org.junit.Test
 class DescriptorPropertiesTest {
 
   private val ARRAY_KEY = "my-array"
+  private val FIXED_INDEXED_PROPERTY_KEY = "my-fixed-indexed-property"
+  private val PROPERTY_1_KEY = "property-1"
+  private val PROPERTY_2_KEY = "property-2"
 
   @Test
   def testEquals(): Unit = {
@@ -97,8 +101,8 @@ class DescriptorPropertiesTest {
   def testArrayInvalidValues(): Unit = {
     val properties = new DescriptorProperties()
     properties.putString(s"$ARRAY_KEY.0", "12")
-    properties.putString(s"$ARRAY_KEY.1", "INVALID")
-    properties.putString(s"$ARRAY_KEY.2", "66")
+    properties.putString(s"$ARRAY_KEY.1", "66")
+    properties.putString(s"$ARRAY_KEY.2", "INVALID")
 
     testArrayValidation(properties, 1, Integer.MAX_VALUE)
   }
@@ -118,6 +122,19 @@ class DescriptorPropertiesTest {
     testArrayValidation(properties, 1, Integer.MAX_VALUE)
   }
 
+  @Test(expected = classOf[ValidationException])
+  def testInvalidFixedIndexedProperties(): Unit = {
+    val property = new DescriptorProperties()
+    val list = new util.ArrayList[util.List[String]]()
+    list.add(util.Arrays.asList("1", "string"))
+    list.add(util.Arrays.asList("INVALID", "string"))
+    property.putIndexedFixedProperties(
+      FIXED_INDEXED_PROPERTY_KEY,
+      util.Arrays.asList(PROPERTY_1_KEY, PROPERTY_2_KEY),
+      list)
+    testFixedIndexedPropertiesValidation(property)
+  }
+
   @Test
   def testRemoveKeys(): Unit = {
     val properties = new DescriptorProperties()
@@ -155,7 +172,7 @@ class DescriptorPropertiesTest {
       minLength: Int,
       maxLength: Int)
     : Unit = {
-    val validator: (String) => Unit = (key: String) => {
+    val validator: String => Unit = (key: String) => {
       properties.validateInt(key, false)
     }
 
@@ -165,4 +182,26 @@ class DescriptorPropertiesTest {
       minLength,
       maxLength)
   }
+
+  private def testFixedIndexedPropertiesValidation(properties: DescriptorProperties): Unit = {
+
+    val validatorMap = new util.HashMap[String, Consumer[String]]()
+
+    // PROPERTY_1 should be Int
+    val validator1: String => Unit = (key: String) => {
+      properties.validateInt(key, false)
+    }
+    validatorMap.put(PROPERTY_1_KEY, toJava(validator1))
+    // PROPERTY_2 should be String
+    val validator2: String => Unit = (key: String) => {
+      properties.validateString(key, false)
+    }
+    validatorMap.put(PROPERTY_2_KEY, toJava(validator2))
+
+    properties.validateFixedIndexedProperties(
+      FIXED_INDEXED_PROPERTY_KEY,
+      false,
+      validatorMap
+    )
+  }
 }