You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 23:37:32 UTC

[4/5] flink git commit: [FLINK-2556] [core] Refactor/fix pre-flight key validation

[FLINK-2556] [core] Refactor/fix pre-flight key validation

This closes #1044


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e38d6fa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e38d6fa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e38d6fa

Branch: refs/heads/master
Commit: 1e38d6fad410c0a0ef82fa73617f6feda86cd0e0
Parents: bc63ef2
Author: zentol <s....@web.de>
Authored: Sun Aug 23 15:57:34 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 20:04:57 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/operators/DistinctOperator.java   |  7 -------
 .../org/apache/flink/api/java/operators/Keys.java    | 15 +++++++++++++++
 .../flink/api/java/sca/UdfAnalyzerExamplesTest.java  |  4 ++--
 3 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e38d6fa/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index a6eb43e..ad2335b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
@@ -26,9 +25,7 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
@@ -54,10 +51,6 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 
 		this.distinctLocationName = distinctLocationName;
 
-		if (!(input.getType() instanceof CompositeType) &&
-				!(input.getType() instanceof AtomicType && input.getType().isKeyType())){
-			throw new InvalidProgramException("Distinct only possible on composite or atomic key types.");
-		}
 		// if keys is null distinction is done on all fields
 		if (keys == null) {
 			keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());

http://git-wip-us.apache.org/repos/asf/flink/blob/1e38d6fa/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 09874e5..47c66f4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -209,6 +209,9 @@ public abstract class Keys<T> {
 				throw new InvalidProgramException("Specifying keys via field positions is only valid " +
 						"for tuple data types. Type: " + type);
 			}
+			if (type.getArity() == 0) {
+				throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity());
+			}
 
 			if (!allowEmpty && (groupingFields == null || groupingFields.length == 0)) {
 				throw new IllegalArgumentException("The grouping fields must not be empty.");
@@ -240,6 +243,9 @@ public abstract class Keys<T> {
 					}
 					else {
 						// arrived at key position
+						if (!fieldType.isKeyType()) {
+							throw new InvalidProgramException("This type (" + fieldType + ") cannot be used as key.");
+						}
 						if(fieldType instanceof CompositeType) {
 							// add all nested fields of composite type
 							((CompositeType) fieldType).getFlatFields("*", offset, keyFields);
@@ -296,6 +302,15 @@ public abstract class Keys<T> {
 				keyFields = new ArrayList<FlatFieldDescriptor>(expressions.length);
 				for (int i = 0; i < expressions.length; i++) {
 					List<FlatFieldDescriptor> keys = cType.getFlatFields(expressions[i]); // use separate list to do a size check
+					for (FlatFieldDescriptor key : keys) {
+						TypeInformation<?> keyType = key.getType();
+						if (!keyType.isKeyType()) {
+							throw new InvalidProgramException("This type (" + key.getType() + ") cannot be used as key.");
+						}
+						if (!(keyType instanceof AtomicType || keyType instanceof CompositeType)) {
+							throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: " + keyType);
+						}
+					}
 					if(keys.size() == 0) {
 						throw new InvalidProgramException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType);
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e38d6fa/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
index a1d2b97..5254b68 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
@@ -529,8 +529,8 @@ public class UdfAnalyzerExamplesTest {
 	@Test
 	public void testLogisticRegressionExamplesSumGradient() {
 		compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, SumGradient.class,
-				"Tuple1<double[]>",
-				"Tuple1<double[]>",
+				"Tuple1<double>",
+				"Tuple1<double>",
 				new String[] { "0" });
 	}