You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 23:37:32 UTC
[4/5] flink git commit: [FLINK-2556] [core] Refactor/fix pre-flight
key validation
[FLINK-2556] [core] Refactor/fix pre-flight key validation
This closes #1044
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e38d6fa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e38d6fa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e38d6fa
Branch: refs/heads/master
Commit: 1e38d6fad410c0a0ef82fa73617f6feda86cd0e0
Parents: bc63ef2
Author: zentol <s....@web.de>
Authored: Sun Aug 23 15:57:34 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 20:04:57 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/operators/DistinctOperator.java | 7 -------
.../org/apache/flink/api/java/operators/Keys.java | 15 +++++++++++++++
.../flink/api/java/sca/UdfAnalyzerExamplesTest.java | 4 ++--
3 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1e38d6fa/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index a6eb43e..ad2335b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -18,7 +18,6 @@
package org.apache.flink.api.java.operators;
-import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator;
@@ -26,9 +25,7 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
@@ -54,10 +51,6 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
this.distinctLocationName = distinctLocationName;
- if (!(input.getType() instanceof CompositeType) &&
- !(input.getType() instanceof AtomicType && input.getType().isKeyType())){
- throw new InvalidProgramException("Distinct only possible on composite or atomic key types.");
- }
// if keys is null distinction is done on all fields
if (keys == null) {
keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
http://git-wip-us.apache.org/repos/asf/flink/blob/1e38d6fa/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 09874e5..47c66f4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -209,6 +209,9 @@ public abstract class Keys<T> {
throw new InvalidProgramException("Specifying keys via field positions is only valid " +
"for tuple data types. Type: " + type);
}
+ if (type.getArity() == 0) {
+ throw new InvalidProgramException("Tuple size must be greater than 0. Size: " + type.getArity());
+ }
if (!allowEmpty && (groupingFields == null || groupingFields.length == 0)) {
throw new IllegalArgumentException("The grouping fields must not be empty.");
@@ -240,6 +243,9 @@ public abstract class Keys<T> {
}
else {
// arrived at key position
+ if (!fieldType.isKeyType()) {
+ throw new InvalidProgramException("This type (" + fieldType + ") cannot be used as key.");
+ }
if(fieldType instanceof CompositeType) {
// add all nested fields of composite type
((CompositeType) fieldType).getFlatFields("*", offset, keyFields);
@@ -296,6 +302,15 @@ public abstract class Keys<T> {
keyFields = new ArrayList<FlatFieldDescriptor>(expressions.length);
for (int i = 0; i < expressions.length; i++) {
List<FlatFieldDescriptor> keys = cType.getFlatFields(expressions[i]); // use separate list to do a size check
+ for (FlatFieldDescriptor key : keys) {
+ TypeInformation<?> keyType = key.getType();
+ if (!keyType.isKeyType()) {
+ throw new InvalidProgramException("This type (" + key.getType() + ") cannot be used as key.");
+ }
+ if (!(keyType instanceof AtomicType || keyType instanceof CompositeType)) {
+ throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: " + keyType);
+ }
+ }
if(keys.size() == 0) {
throw new InvalidProgramException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e38d6fa/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
index a1d2b97..5254b68 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
@@ -529,8 +529,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testLogisticRegressionExamplesSumGradient() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, SumGradient.class,
- "Tuple1<double[]>",
- "Tuple1<double[]>",
+ "Tuple1<double>",
+ "Tuple1<double>",
new String[] { "0" });
}