You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/06/05 08:46:08 UTC
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
GitHub user twalthr opened a pull request:
https://github.com/apache/flink/pull/6120
[FLINK-7251] [types] Remove the flink-java8 module and improve lambda type extraction
## What is the purpose of the change
This PR merge the `flink-java8` module into `flink-core`/`flink-runtime`. It also fixes issues with lambda extraction that occurred outside of `flink-java8` which used a different compiler.
Lambdas are still difficult to use in Flink. I'm working on an experimental solution that I will publish soon.
## Brief change log
- Integration of some lambda functions into existing examples/tests
- Removal of error-prone lambda input validation
- Removal of `flink-java8`
- Code clean up (remove warnings at several places)
## Verifying this change
This change is already covered by existing tests, such as `LambdaExtractionTest`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): yes
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/twalthr/flink FLINK-7251_1
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6120.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6120
----
commit 0710fd42c321964dbf523878f59f82624baee597
Author: Timo Walther <tw...@...>
Date: 2018-06-04T10:49:43Z
[FLINK-7251] [types] Remove the flink-java8 module and improve lambda type extraction
----
---
[GitHub] flink issue #6120: [FLINK-7251] [types] Remove the flink-java8 module and im...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6120
@zentol are there other locations where I have to remove the `flink-java8` module?
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r201655466
--- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---
@@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
// number of parameters the SAM of implemented interface has; the parameter indexing applies to this range
final int baseParametersLen = sam.getParameterTypes().length;
- // executable references "this" implicitly
- if (paramLen <= 0) {
--- End diff --
Why you removed checking the input type information?
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r203002928
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java ---
@@ -102,15 +104,62 @@ public void TestAnonymousInnerClassTrick4() throws Exception {
jfc.addClass(NestedAnonymousInnerClass.class)
.createJarFile();
- Set<String> ans = new HashSet<String>();
+ Set<String> ans = new HashSet<>();
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass.class");
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1$1.class");
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1.class");
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$A.class");
Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out));
- out.delete();
+ Assert.assertTrue(out.delete());
+ }
+
+ @Ignore // this is currently not supported (see FLINK-9520)
+ @Test
+ public void testFilterWithMethodReference() throws Exception {
--- End diff --
Did these also not work when they were still in the java8 module?
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r203003680
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---
@@ -237,19 +248,81 @@ private boolean validateKeyTypeIsHashable(TypeInformation<?> type) {
}
/**
- * Gets the type of the key by which the stream is partitioned.
- * @return The type of the key by which the stream is partitioned.
+ * Tries to fill in the type information. Type information can be filled in
+ * later when the program uses a type hint. This method checks whether the
+ * type information has ever been accessed before and does not allow
+ * modifications if the type was accessed already. This ensures consistency
+ * by making sure different parts of the operation do not assume different
+ * type information.
+ *
+ * @param keyType The type information to fill in.
+ *
+ * @throws IllegalStateException Thrown, if the type information has been accessed before.
+ */
+ private void setKeyType(TypeInformation<KEY> keyType) {
+ if (typeUsed) {
+ throw new IllegalStateException(
+ "TypeInformation cannot be filled in for the type after it has been used. "
+ + "Please make sure that the type info hints are the first call after "
+ + "the keyBy() function before any other access.");
+ }
+ this.keyType = keyType;
+ }
+
+ /**
+ * Returns the key type of this {@code KeyedStream} as a {@link TypeInformation}. Once
+ * this is used once the key type cannot be changed anymore using {@link #returns(TypeInformation)}.
+ *
+ * @return The output type of this {@code KeyedStream}
*/
@Internal
public TypeInformation<KEY> getKeyType() {
- return keyType;
+ if (keyType instanceof MissingTypeInfo) {
+ MissingTypeInfo typeInfo = (MissingTypeInfo) this.keyType;
+ throw new InvalidTypesException(
+ "The key type of key selector '"
+ + typeInfo.getFunctionName()
+ + "' could not be determined automatically, due to type erasure. "
+ + "You can give type information hints by using the returns(...) "
+ + "method on the result of the transformation call, or by letting "
+ + "your selector implement the 'ResultTypeQueryable' "
+ + "interface.", typeInfo.getTypeException());
+ }
+
+ // perform the validation when the type is used for the first time
+ if (!typeUsed) {
+ typeUsed = true;
+ validateKeyType(keyType);
+ }
+
+ return this.keyType;
}
@Override
protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
}
+ // ------------------------------------------------------------------------
+ // Type hinting
+ // ------------------------------------------------------------------------
+
+ /**
+ * Adds a type information hint about the key type of a key selector. This method
+ * can be used in cases where Flink cannot determine automatically what the produced
+ * type of a key selector is. That can be the case if the selector uses generic type variables
+ * in the return type that cannot be inferred from the input type.
+ *
+ * @param typeInfo type information as a key type hint
+ * @return This operator with a given key type hint.
+ */
+ public KeyedStream<T, KEY> returns(TypeInformation<KEY> typeInfo) {
--- End diff --
Should probably make this `@PublicEvolving` for now.
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r201657797
--- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---
@@ -695,29 +678,14 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
Preconditions.checkArgument(
lambdaOutputTypeArgumentIndices != null,
"Indices for output type arguments within lambda not provided");
- // check for lambda type erasure
- validateLambdaGenericParameters(exec);
final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);
final int baseParametersLen = sam.getParameterTypes().length;
// parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function
final int paramLen = exec.getParameterTypes().length;
- final Type input1 = TypeExtractionUtils.extractTypeFromLambda(
--- End diff --
Same comments as above. Why don't check the input types, if not checking them the lambda input indices not necessary.
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r201655890
--- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---
@@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
// number of parameters the SAM of implemented interface has; the parameter indexing applies to this range
final int baseParametersLen = sam.getParameterTypes().length;
- // executable references "this" implicitly
- if (paramLen <= 0) {
- // executable declaring class can also be a super class of the input type
- // we only validate if the executable exists in input type
- validateInputContainsExecutable(exec, inType);
- }
- else {
- final Type input = TypeExtractionUtils.extractTypeFromLambda(
- exec,
- lambdaInputTypeArgumentIndices,
--- End diff --
If you removed checking the input type info the parameter `lambdaInputTypeArgumentIndices` is no longer necessary.
---
[GitHub] flink issue #6120: [FLINK-7251] [types] Remove the flink-java8 module and im...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6120
Thank you @aljoscha. I will add a test for `flapMap` and the bug that I just noticed and merge this.
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r203083498
--- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java ---
@@ -271,19 +214,20 @@ public void testKeySelectorLambda() {
public void testLambdaTypeErasure() {
MapFunction<Tuple1<Integer>, Tuple1> f = (i) -> null;
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, new TypeHint<Tuple1<Integer>>(){}.getTypeInfo(), null, true);
- Assert.assertTrue(ti instanceof MissingTypeInfo);
+ assertTrue(ti instanceof MissingTypeInfo);
}
@Test
public void testPartitionerLambda() {
Partitioner<Tuple2<Integer, String>> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions;
- final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner);
-
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO);
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+ final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner, null, true);
+ if (!(ti instanceof MissingTypeInfo)) {
--- End diff --
In case a compiler adds generics, this case would be activated. Otherwise testing for `MissingTypInfo` is correct.
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r203001023
--- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java ---
@@ -50,12 +41,12 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
/**
- * Tests the type extractor for lambda functions.
+ * Tests the type extractor for lambda functions. Many tests only work if the compiler supports
--- End diff --
Some tests, like `flatMapLambda()` are not there anymore? It's because we don't use Tycho anymore but do theses tests now reside somewhere else? If not, we should probably have a new module `flink-tycho-lambda-tests` where we test those things that only work with Tycho. Otherwise we lose that coverage.
---
[GitHub] flink issue #6120: [FLINK-7251] [types] Remove the flink-java8 module and im...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6120
@StephanEwen could you take a look?
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r202553410
--- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---
@@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
// number of parameters the SAM of implemented interface has; the parameter indexing applies to this range
final int baseParametersLen = sam.getParameterTypes().length;
- // executable references "this" implicitly
- if (paramLen <= 0) {
- // executable declaring class can also be a super class of the input type
- // we only validate if the executable exists in input type
- validateInputContainsExecutable(exec, inType);
- }
- else {
- final Type input = TypeExtractionUtils.extractTypeFromLambda(
- exec,
- lambdaInputTypeArgumentIndices,
--- End diff --
Good point. I will remove it.
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r203021961
--- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java ---
@@ -50,12 +41,12 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
/**
- * Tests the type extractor for lambda functions.
+ * Tests the type extractor for lambda functions. Many tests only work if the compiler supports
--- End diff --
A flat map can not work by definition because it contains generics. I will add some tests for the `returns`.
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r203040441
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java ---
@@ -102,15 +104,62 @@ public void TestAnonymousInnerClassTrick4() throws Exception {
jfc.addClass(NestedAnonymousInnerClass.class)
.createJarFile();
- Set<String> ans = new HashSet<String>();
+ Set<String> ans = new HashSet<>();
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass.class");
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1$1.class");
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1.class");
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$A.class");
Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out));
- out.delete();
+ Assert.assertTrue(out.delete());
+ }
+
+ @Ignore // this is currently not supported (see FLINK-9520)
+ @Test
+ public void testFilterWithMethodReference() throws Exception {
--- End diff --
No this has never worked. I just noticed this bug when I looked at the test. What is this `JarFileCreator` good for actually?
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r203060212
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java ---
@@ -102,15 +104,62 @@ public void TestAnonymousInnerClassTrick4() throws Exception {
jfc.addClass(NestedAnonymousInnerClass.class)
.createJarFile();
- Set<String> ans = new HashSet<String>();
+ Set<String> ans = new HashSet<>();
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass.class");
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1$1.class");
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$1.class");
ans.add("org/apache/flink/runtime/util/jartestprogram/NestedAnonymousInnerClass$A.class");
Assert.assertTrue("Jar file for Anonymous Inner Class is not correct", validate(ans, out));
- out.delete();
+ Assert.assertTrue(out.delete());
+ }
+
+ @Ignore // this is currently not supported (see FLINK-9520)
+ @Test
+ public void testFilterWithMethodReference() throws Exception {
--- End diff --
It was a util for programmatically creating Jar files. I think it's only used in tests right now, for example `ClassLoaderUtilsTest`.
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r203001399
--- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/LambdaExtractionTest.java ---
@@ -271,19 +214,20 @@ public void testKeySelectorLambda() {
public void testLambdaTypeErasure() {
MapFunction<Tuple1<Integer>, Tuple1> f = (i) -> null;
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, new TypeHint<Tuple1<Integer>>(){}.getTypeInfo(), null, true);
- Assert.assertTrue(ti instanceof MissingTypeInfo);
+ assertTrue(ti instanceof MissingTypeInfo);
}
@Test
public void testPartitionerLambda() {
Partitioner<Tuple2<Integer, String>> partitioner = (key, numPartitions) -> key.f1.length() % numPartitions;
- final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner);
-
- Assert.assertTrue(ti.isTupleType());
- Assert.assertEquals(2, ti.getArity());
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(0), BasicTypeInfo.INT_TYPE_INFO);
- Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
+ final TypeInformation<?> ti = TypeExtractor.getPartitionerTypes(partitioner, null, true);
+ if (!(ti instanceof MissingTypeInfo)) {
--- End diff --
Why did you need to add this?
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6120
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r202552984
--- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---
@@ -584,21 +581,6 @@ private static void registerFactory(Type t, Class<? extends TypeInfoFactory> fac
// number of parameters the SAM of implemented interface has; the parameter indexing applies to this range
final int baseParametersLen = sam.getParameterTypes().length;
- // executable references "this" implicitly
- if (paramLen <= 0) {
--- End diff --
The input validation caused more errors than it solved. Especially with generic types. For lambdas this validation is limited anyway in a JDK compiler.
---
[GitHub] flink issue #6120: [FLINK-7251] [types] Remove the flink-java8 module and im...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/6120
I checked the docs. They do not change with these changes. However, we should verify that the docs about the Eclipse JDT compiler are still correct maybe even with an end-to-end test.
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r203605524
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---
@@ -545,15 +543,15 @@ public IntervalJoined(
TypeInformation<OUT> resultType = TypeExtractor.getBinaryOperatorReturnType(
cleanedUdf,
- ProcessJoinFunction.class, // ProcessJoinFunction<IN1, IN2, OUT>
- 0, // 0 1 2
+ ProcessJoinFunction.class,
+ 0,
1,
2,
- TypeExtractor.NO_INDEX, // output arg indices
- left.getType(), // input 1 type information
- right.getType(), // input 2 type information
- INTERVAL_JOIN_FUNC_NAME ,
- false
+ TypeExtractor.NO_INDEX,
+ left.getType(),
+ right.getType(),
+ Utils.getCallLocationName(),
+ true
--- End diff --
Just saw that this should be false. Will correct that.
---
[GitHub] flink pull request #6120: [FLINK-7251] [types] Remove the flink-java8 module...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r201662934
--- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -119,19 +120,15 @@ public boolean filter(Event value) throws Exception {
}
});
- DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event, String>() {
+ DataStream<String> result = CEP.pattern(input, pattern).flatSelect((p, o) -> {
--- End diff --
We should also have one test with `select` and lambda.
---
[GitHub] flink issue #6120: [FLINK-7251] [types] Remove the flink-java8 module and im...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/6120
@twalthr I think you only have to update the java8 docs: https://ci.apache.org/projects/flink/flink-docs-master/dev/java8.html
---