You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/03/10 22:18:48 UTC
flink git commit: [FLINK-5874] Restrict key types in the DataStream
API.
Repository: flink
Updated Branches:
refs/heads/master 70e78a620 -> f15a7d2d9
[FLINK-5874] Restrict key types in the DataStream API.
Reject a type from being a key in keyBy() if it is:
1. it is a POJO type but does not override the hashCode() and
relies on the Object.hashCode() implementation.
2. it is an array of any type.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f15a7d2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f15a7d2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f15a7d2d
Branch: refs/heads/master
Commit: f15a7d2d9c9aae72bb3ac3eb2478b3ec4759401b
Parents: 70e78a6
Author: kl0u <kk...@gmail.com>
Authored: Wed Mar 8 12:11:07 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Fri Mar 10 17:58:00 2017 +0100
----------------------------------------------------------------------
docs/dev/datastream_api.md | 9 +
.../streaming/api/datastream/DataStream.java | 4 +-
.../streaming/api/datastream/KeyedStream.java | 77 +++++-
.../api/graph/StreamGraphGenerator.java | 6 +-
.../flink/streaming/api/DataStreamTest.java | 238 +++++++++++++++++++
5 files changed, 327 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index df13295..728c945 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -216,6 +216,15 @@ dataStream.filter(new FilterFunction<Integer>() {
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
{% endhighlight %}
+ <p>
+ <span class="label label-danger">Attention</span>
+ A type <strong>cannot be a key</strong> if:
+ <ol>
+ <li> it is a POJO type but does not override the <em>hashCode()</em> method and
+ relies on the <em>Object.hashCode()</em> implementation.</li>
+ <li> it is an array of any type.</li>
+ </ol>
+ </p>
</td>
</tr>
<tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8fcaf6b..71ef048 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -282,9 +282,9 @@ public class DataStream<T> {
}
/**
- * Partitions the operator state of a {@link DataStream}using field expressions.
+ * Partitions the operator state of a {@link DataStream} using field expressions.
* A field expression is either the name of a public field or a getter method with parentheses
- * of the {@link DataStream}S underlying type. A dot can be used to drill
+ * of the {@link DataStream}'s underlying type. A dot can be used to drill
* down into objects, as in {@code "field1.getInnerField2()" }.
*
* @param fields
http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 7c9f5bc..860aac6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -17,18 +17,25 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -61,6 +68,9 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
import java.util.UUID;
/**
@@ -114,9 +124,72 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
- this.keyType = keyType;
+ this.keyType = validateKeyType(keyType);
}
-
+
+ /**
+ * Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be
+ * used as a key in the {@code DataStream.keyBy()} operation. This is done by searching depth-first the
+ * key type and checking if each of the composite types satisfies the required conditions
+ * (see {@link #validateKeyTypeIsHashable(TypeInformation)}).
+ *
+ * @param keyType The {@link TypeInformation} of the key.
+ */
+ private TypeInformation<KEY> validateKeyType(TypeInformation<KEY> keyType) {
+ Stack<TypeInformation<?>> stack = new Stack<>();
+ stack.push(keyType);
+
+ List<TypeInformation<?>> unsupportedTypes = new ArrayList<>();
+
+ while (!stack.isEmpty()) {
+ TypeInformation<?> typeInfo = stack.pop();
+
+ if (!validateKeyTypeIsHashable(typeInfo)) {
+ unsupportedTypes.add(typeInfo);
+ }
+
+ if (typeInfo instanceof TupleTypeInfoBase) {
+ for (int i = 0; i < typeInfo.getArity(); i++) {
+ stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i));
+ }
+ }
+ }
+
+ if (!unsupportedTypes.isEmpty()) {
+ throw new InvalidProgramException("Type " + keyType + " cannot be used as key. Contained " +
+ "UNSUPPORTED key types: " + StringUtils.join(unsupportedTypes, ", ") + ". Look " +
+ "at the keyBy() documentation for the conditions a type has to satisfy in order to be " +
+ "eligible for a key.");
+ }
+
+ return keyType;
+ }
+
+ /**
+ * Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be
+ * used as a key in the {@code DataStream.keyBy()} operation.
+ *
+ * @param type The {@link TypeInformation} of the type to check.
+ * @return {@code false} if:
+ * <ol>
+ * <li>it is a POJO type but does not override the {@link #hashCode()} method and relies on
+ * the {@link Object#hashCode()} implementation.</li>
+ * <li>it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo},
+ * {@link ObjectArrayTypeInfo}).</li>
+ * </ol>,
+ * {@code true} otherwise.
+ */
+ private boolean validateKeyTypeIsHashable(TypeInformation<?> type) {
+ try {
+ return (type instanceof PojoTypeInfo)
+ ? !type.getTypeClass().getMethod("hashCode").getDeclaringClass().equals(Object.class)
+ : !(type instanceof PrimitiveArrayTypeInfo || type instanceof BasicArrayTypeInfo || type instanceof ObjectArrayTypeInfo);
+ } catch (NoSuchMethodException ignored) {
+ // this should never happen as we are just searching for the hashCode() method.
+ }
+ return false;
+ }
+
// ------------------------------------------------------------------------
// properties
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index bd018c3..de87a66 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -163,7 +163,7 @@ public class StreamGraphGenerator {
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
- transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform);
+ transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
@@ -496,10 +496,10 @@ public class StreamGraphGenerator {
* Transforms a {@code OneInputTransformation}.
*
* <p>
- * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and
+ * This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and
* wired the inputs to this new node.
*/
- private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) {
+ private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
Collection<Integer> inputIds = transform(transform.getInput());
http://git-wip-us.apache.org/repos/asf/flink/blob/f15a7d2d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index a619338..b4d2421 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -19,14 +19,23 @@ package org.apache.flink.streaming.api;
import java.util.List;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
@@ -63,7 +72,11 @@ import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
+import org.hamcrest.core.StringStartsWith;
+import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import static org.junit.Assert.*;
@@ -906,6 +919,231 @@ public class DataStreamTest {
}
/////////////////////////////////////////////////////////////
+ // KeyBy testing
+ /////////////////////////////////////////////////////////////
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testPrimitiveArrayKeyRejection() {
+
+ KeySelector<Tuple2<Integer[], String>, int[]> keySelector =
+ new KeySelector<Tuple2<Integer[], String>, int[]>() {
+
+ @Override
+ public int[] getKey(Tuple2<Integer[], String> value) throws Exception {
+ int[] ks = new int[value.f0.length];
+ for (int i = 0; i < ks.length; i++) {
+ ks[i] = value.f0[i];
+ }
+ return ks;
+ }
+ };
+
+ testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Test
+ public void testBasicArrayKeyRejection() {
+
+ KeySelector<Tuple2<Integer[], String>, Integer[]> keySelector =
+ new KeySelector<Tuple2<Integer[], String>, Integer[]>() {
+
+ @Override
+ public Integer[] getKey(Tuple2<Integer[], String> value) throws Exception {
+ return value.f0;
+ }
+ };
+
+ testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
+ }
+
+ @Test
+ public void testObjectArrayKeyRejection() {
+
+ KeySelector<Tuple2<Integer[], String>, Object[]> keySelector =
+ new KeySelector<Tuple2<Integer[], String>, Object[]>() {
+
+ @Override
+ public Object[] getKey(Tuple2<Integer[], String> value) throws Exception {
+ Object[] ks = new Object[value.f0.length];
+ for (int i = 0; i < ks.length; i++) {
+ ks[i] = new Object();
+ }
+ return ks;
+ }
+ };
+
+ ObjectArrayTypeInfo<Object[], Object> keyTypeInfo = ObjectArrayTypeInfo.getInfoFor(
+ Object[].class, new GenericTypeInfo<>(Object.class));
+
+ testKeyRejection(keySelector, keyTypeInfo);
+ }
+
+ private <K> void testKeyRejection(KeySelector<Tuple2<Integer[], String>, K> keySelector, TypeInformation<K> expectedKeyType) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<Integer[], String>> input = env.fromElements(
+ new Tuple2<>(new Integer[] {1, 2}, "barfoo")
+ );
+
+ Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
+
+ // adjust the rule
+ expectedException.expect(InvalidProgramException.class);
+ expectedException.expectMessage(new StringStartsWith("Type " + expectedKeyType + " cannot be used as key."));
+
+ input.keyBy(keySelector);
+ }
+
+ //////////////// Composite Key Tests : POJOs ////////////////
+
+ @Test
+ public void testPOJOWithNestedArrayNoHashCodeKeyRejection() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<POJOWithHashCode> input = env.fromElements(
+ new POJOWithHashCode(new int[] {1, 2}));
+
+ TypeInformation<?> expectedTypeInfo = new TupleTypeInfo<Tuple1<int[]>>(
+ PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+
+ // adjust the rule
+ expectedException.expect(InvalidProgramException.class);
+ expectedException.expectMessage(new StringStartsWith("Type " + expectedTypeInfo + " cannot be used as key."));
+
+ input.keyBy("id");
+ }
+
+ @Test
+ public void testPOJOWithNestedArrayAndHashCodeWorkAround() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<POJOWithHashCode> input = env.fromElements(
+ new POJOWithHashCode(new int[] {1, 2}));
+
+ input.keyBy(new KeySelector<POJOWithHashCode, POJOWithHashCode>() {
+ @Override
+ public POJOWithHashCode getKey(POJOWithHashCode value) throws Exception {
+ return value;
+ }
+ }).addSink(new SinkFunction<POJOWithHashCode>() {
+ @Override
+ public void invoke(POJOWithHashCode value) throws Exception {
+ Assert.assertEquals(value.getId(), new int[]{1, 2});
+ }
+ });
+ }
+
+ @Test
+ public void testPOJOnoHashCodeKeyRejection() {
+
+ KeySelector<POJOWithoutHashCode, POJOWithoutHashCode> keySelector =
+ new KeySelector<POJOWithoutHashCode, POJOWithoutHashCode>() {
+ @Override
+ public POJOWithoutHashCode getKey(POJOWithoutHashCode value) throws Exception {
+ return value;
+ }
+ };
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<POJOWithoutHashCode> input = env.fromElements(
+ new POJOWithoutHashCode(new int[] {1, 2}));
+
+ // adjust the rule
+ expectedException.expect(InvalidProgramException.class);
+
+ input.keyBy(keySelector);
+ }
+
+ //////////////// Composite Key Tests : Tuples ////////////////
+
+ @Test
+ public void testTupleNestedArrayKeyRejection() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<Integer[], String>> input = env.fromElements(
+ new Tuple2<>(new Integer[] {1, 2}, "test-test"));
+
+ TypeInformation<?> expectedTypeInfo = new TupleTypeInfo<Tuple2<Integer[], String>>(
+ BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ // adjust the rule
+ expectedException.expect(InvalidProgramException.class);
+ expectedException.expectMessage(new StringStartsWith("Type " + expectedTypeInfo + " cannot be used as key."));
+
+ input.keyBy(new KeySelector<Tuple2<Integer[],String>, Tuple2<Integer[],String>>() {
+ @Override
+ public Tuple2<Integer[], String> getKey(Tuple2<Integer[], String> value) throws Exception {
+ return value;
+ }
+ });
+ }
+
+ @Test
+ public void testPrimitiveKeyAcceptance() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.setMaxParallelism(1);
+
+ DataStream<Integer> input = env.fromElements(new Integer(10000));
+
+ KeyedStream<Integer, Object> keyedStream = input.keyBy(new KeySelector<Integer, Object>() {
+ @Override
+ public Object getKey(Integer value) throws Exception {
+ return value;
+ }
+ });
+
+ keyedStream.addSink(new SinkFunction<Integer>() {
+ @Override
+ public void invoke(Integer value) throws Exception {
+ Assert.assertEquals(10000L, (long) value);
+ }
+ });
+ }
+
+ public static class POJOWithoutHashCode {
+
+ private int[] id;
+
+ public POJOWithoutHashCode() {}
+
+ public POJOWithoutHashCode(int[] id) {
+ this.id = id;
+ }
+
+ public int[] getId() {
+ return id;
+ }
+
+ public void setId(int[] id) {
+ this.id = id;
+ }
+ }
+
+ public static class POJOWithHashCode extends POJOWithoutHashCode {
+
+ public POJOWithHashCode() {
+ }
+
+ public POJOWithHashCode(int[] id) {
+ super(id);
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 31;
+ for (int i : getId()) {
+ hash = 37 * hash + i;
+ }
+ return hash;
+ }
+ }
+
+ /////////////////////////////////////////////////////////////
// Utilities
/////////////////////////////////////////////////////////////