You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Zhefu PENG <pe...@gmail.com> on 2020/04/02 10:33:30 UTC

[flink-1.10]有关使用cep功能时times的问题以及pyflink table api的聚合计算操作问题

Hi all,

最近在做flink相关的使用和开发,现在遇到了两个问题:

1.当使用cep功能时, pattern本身的积累功能有一个Pattern.times()的接口,
当我在这个times中输入一个较大的数字(比如超过100,000及以上的数字)的时候,
启动相关的任务会非常耗时甚至耗时过久。比如我用100,000时, 启动时间一共花了15+min.

启动方式为:flink run thenameofjar.jar
不知道这是使用方式不对还是什么其他问题?

2. 当我们用pyflink的table api时, 会出现聚合计算的错误,代码和错误日志如下:

# 环境声明
exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

schema声明
return Schema() \
        .field('source', DataTypes.STRING()) \
        .field('proctime',DataTypes.TIMESTAMP()).proctime()


# 窗口group计算
table3 = table1.window(Tumble.over("1.minutes").on("proctime").alias('w')) \
    .group_by('w, source') \
    .select("source")

error日志:

Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.6/runpy.py", line 193, in
_run_module_as_main
    "__main__", mod_spec)
  File "/usr/local/python3/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/flink_test/flink_udf_kafka.py", line 65, in <module>
    .select("source")
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594pyflink.zip/pyflink/table/table.py",
line 784, in select
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File
"/tmp/pyflink/be9fe638-d5c2-4abc-86b7-274c08b4da3e/41da0979-bdf8-420b-9852-f7f1e14e8594py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o196.select.
: org.apache.flink.table.api.ValidationException: A group window expects a
time attribute for grouping in a stream environment.
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
at
org.apache.flink.table.operations.utils.factories.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:243)
at
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:762)
at
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:747)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)

今天也有看到有同学也问了类似的问题,但是不知道出现的这个error是否也属于相关issue的。
https://issues.apache.org/jira/browse/FLINK-16160

希望能得到帮助,感谢。
Best,
Zhefu