You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Hongbo (Jira)" <ji...@apache.org> on 2021/12/01 17:31:00 UTC

[jira] [Created] (FLINK-25137) Cannot do window aggregation after a regular join: Window can only be defined on a time attribute column

Hongbo created FLINK-25137:
------------------------------

             Summary: Cannot do window aggregation after a regular join: Window can only be defined on a time attribute column
                 Key: FLINK-25137
                 URL: https://issues.apache.org/jira/browse/FLINK-25137
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.14.0
            Reporter: Hongbo


I have a stream joining with a bounded lookup table. Then I want to do a window aggregation on this stream.

The code looks like this:

 
{code:java}
var joined = tableEnv.sqlQuery("SELECT test.ts, test.v, lookup.c from test" +
" join lookup on test.v = lookup.a");
tableEnv.createTemporaryView("joined_table", joined);

var agg = tableEnv.sqlQuery("SELECT window_start, window_end, c, sum(v)\n" +
" FROM TABLE(\n" +
" HOP(TABLE joined_table, DESCRIPTOR(ts), INTERVAL '10' seconds, INTERVAL '5' minutes))\n" +
" GROUP BY window_start, window_end, c");
{code}
 

However, it failed with an exception:
{code:java}
Window can only be defined on a time attribute column, but is type of TIMESTAMP(3)
org.apache.flink.table.api.ValidationException: Window can only be defined on a time attribute column, but is type of TIMESTAMP(3)
    at org.apache.flink.table.planner.plan.utils.WindowUtil$.convertToWindowingStrategy(WindowUtil.scala:183)
    at org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:193)
    at GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown Source)
    at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown Source)
    at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
    at org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getProjectWindowProperties(FlinkRelMdWindowProperties.scala:89)
    at org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:75)
    at GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown Source)
    at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown Source)
    at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
    at org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.gatherIndicesToMaterialize(RelTimeIndicatorConverter.java:433)
    at org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convertAggInput(RelTimeIndicatorConverter.java:416)
    at org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visitAggregate(RelTimeIndicatorConverter.java:402)
    at org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visit(RelTimeIndicatorConverter.java:167)
    at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:217)
    at org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convert(RelTimeIndicatorConverter.java:133)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkRelTimeIndicatorProgram.optimize(FlinkRelTimeIndicatorProgram.scala:35)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:81)
    at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
    at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:300)
    at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:104)
    at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:705)
    at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:606)
    at com.microstrategy.realtime.FlinkTest.testAggregationAfterJoin(FlinkTest.java:67)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:99)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:79)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:75)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
    at com.sun.proxy.$Proxy2.stop(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:133)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
    at java.base/java.lang.Thread.run(Thread.java:829 {code}
Is there any limitation to do this?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)