You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/06/01 11:19:04 UTC

[jira] [Commented] (FLINK-6783) Wrongly extracted TypeInformations for WindowedStream::aggregate

    [ https://issues.apache.org/jira/browse/FLINK-6783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16032806#comment-16032806 ] 

ASF GitHub Bot commented on FLINK-6783:
---------------------------------------

GitHub user dawidwys opened a pull request:

    https://github.com/apache/flink/pull/4039

    [FLINK-6783] Changed passing index of type argument while extracting …

    …return type.
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dawidwys/flink flink-6783

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4039.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4039
    
----
commit cb20120ccbaac1ebf3c2f8aafd4d3e82e5fe9fc1
Author: Dawid Wysakowicz <da...@getindata.com>
Date:   2017-06-01T11:17:25Z

    [FLINK-6783] Changed passing index of type argument while extracting return type.

----


> Wrongly extracted TypeInformations for WindowedStream::aggregate
> ----------------------------------------------------------------
>
>                 Key: FLINK-6783
>                 URL: https://issues.apache.org/jira/browse/FLINK-6783
>             Project: Flink
>          Issue Type: Bug
>          Components: Core, DataStream API
>    Affects Versions: 1.3.0, 1.3.1
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Blocker
>
> The following test fails because of wrongly acquired output type for {{AggregateFunction}}:
> {code}
> @Test
> public void testAggregateWithWindowFunctionDifferentResultTypes() throws Exception {
> 	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 	DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
> 	DataStream<Tuple3<String, String, Integer>> window = source
> 		.keyBy(new TupleKeySelector())
> 		.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
> 		.aggregate(new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
> 			@Override
> 			public Tuple2<String, Integer> createAccumulator() {
> 				return Tuple2.of("", 0);
> 			}
> 			@Override
> 			public void add(
> 				Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator) {
> 			}
> 			@Override
> 			public String getResult(Tuple2<String, Integer> accumulator) {
> 				return accumulator.f0;
> 			}
> 			@Override
> 			public Tuple2<String, Integer> merge(
> 				Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
> 				return Tuple2.of("", 0);
> 			}
> 		}, new WindowFunction<String, Tuple3<String, String, Integer>, String, TimeWindow>() {
> 			@Override
> 			public void apply(
> 				String s,
> 				TimeWindow window,
> 				Iterable<String> input,
> 				Collector<Tuple3<String, String, Integer>> out) throws Exception {
> 				out.collect(Tuple3.of("", "", 0));
> 			}
> 		});
> 	OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
> 		(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
> 	OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
> 	Assert.assertTrue(operator instanceof WindowOperator);
> 	WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
> 		(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
> 	Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
> 	Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
> 	Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
> 	processElementAndEnsureOutput(
> 		operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
> }
> {code}
> The test results in 
> {code}
> org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Tuple type expected.
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1157)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:451)
> 	at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:855)
> 	at org.apache.flink.streaming.runtime.operators.windowing.WindowTranslationTest.testAggregateWithWindowFunctionDifferentResultTypes(WindowTranslationTest.java:702)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> 	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> 	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
> 	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> 	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Tuple type expected.
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1204)
> 	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1154)
> 	... 25 more
> {code}
> I tracked down the issue and the reason is wrongly handled {{outputTypeArgumentIndex}} in {{TypeExtractor::getUnaryOperatorReturnType}}.
> My proposition is to remove/deprecate version of {{TypeExtractor::getUnaryOperatorReturnType}} that accepts {{hasIterable}} and {{hasCollector}} as parameters and move all invocations to explicitly passing index of output type (after fixing {{outputTypeArgumentIndex}} handling in line {{TypeExtractor:455}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)