You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by chenxuying <cx...@163.com> on 2020/07/21 03:35:36 UTC
官方pyflink 例子的执行问题
官方例子:
https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
按照例子写了程序,也安装了pyflink
|
python -m pip install apache-flink
|
代码:
|
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
t_env.register_function("add", add)
t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt')) \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt')) \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')
t_env.execute("tutorial_job")
|
执行:
|
python test_pyflink.py
|
报错:
|
Traceback (most recent call last):
File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco
return f(*a, **kw)
File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: org.apache.flink.table.api.TableException: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'.
at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "test_pyflink.py", line 34, in <module>
t_env.execute("tutorial_job")
File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'."
|
里面提到还要配置taskmanager.memory.task.off-heap.size这个属性吗 ,
我找到..\Python\Python37\Lib\site-packages\pyflink\conf下面的flink-conf.yaml
增加了taskmanager.memory.task.off-heap.size: 100m
但是还是报一样的错误
请问用python安装的flink去哪里配置属性
Re:Re: 官方pyflink 例子的执行问题
Posted by chenxuying <cx...@163.com>.
你好
明白了,感谢 , 我文档没看清楚哈
在 2020-07-21 11:44:23,"Xingbo Huang" <hx...@gmail.com> 写道:
>Hi,
>你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>True)就行,如果你用了的话,就需要配置off-heap
>memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>'80m')。你可以参考文档上的例子,以及对应的note说明[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions
>
>Best,
>Xingbo
>
>
>chenxuying <cx...@163.com> 于2020年7月21日周二 上午11:36写道:
>
>> 官方例子:
>> https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
>> 按照例子写了程序,也安装了pyflink
>> |
>> python -m pip install apache-flink
>> |
>> 代码:
>> |
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, DataTypes
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
>> from pyflink.table.udf import udf
>>
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> t_env = StreamTableEnvironment.create(env)
>>
>>
>> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
>> DataTypes.BIGINT())
>>
>>
>> t_env.register_function("add", add)
>>
>>
>> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
>> \
>> .with_format(OldCsv()
>> .field('a', DataTypes.BIGINT())
>> .field('b', DataTypes.BIGINT())) \
>> .with_schema(Schema()
>> .field('a', DataTypes.BIGINT())
>> .field('b', DataTypes.BIGINT())) \
>> .create_temporary_table('mySource')
>>
>>
>> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
>> \
>> .with_format(OldCsv()
>> .field('sum', DataTypes.BIGINT())) \
>> .with_schema(Schema()
>> .field('sum', DataTypes.BIGINT())) \
>> .create_temporary_table('mySink')
>>
>>
>> t_env.from_path('mySource')\
>> .select("add(a, b)") \
>> .insert_into('mySink')
>>
>>
>> t_env.execute("tutorial_job")
>> |
>>
>> 执行:
>>
>> |
>> python test_pyflink.py
>> |
>>
>> 报错:
>>
>>
>> |
>> Traceback (most recent call last):
>> File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>> line 147, in deco
>> return f(*a, **kw)
>> File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
>> line 328, in get_return_value
>> format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.TableException: The configured Task Off-Heap
>> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
>> The Task Off-Heap Memory can be configured using the configuration key
>> 'taskmanager.memory.task.off-heap.size'.
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> During handling of the above exception, another exception occurred:
>>
>>
>> Traceback (most recent call last):
>> File "test_pyflink.py", line 34, in <module>
>> t_env.execute("tutorial_job")
>> File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
>> line 1057, in execute
>> return JobExecutionResult(self._j_tenv.execute(job_name))
>> File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
>> line 1286, in __call__
>> answer, self.gateway_client, self.target_id, self.name)
>> File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>> line 154, in deco
>> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
>> pyflink.util.exceptions.TableException: "The configured Task Off-Heap
>> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
>> The Task Off-Heap Memory can be configured using the configuration key
>> 'taskmanager.memory.task.off-heap.size'."
>> |
>>
>>
>>
>>
>> 里面提到还要配置taskmanager.memory.task.off-heap.size这个属性吗 ,
>>
>> 我找到..\Python\Python37\Lib\site-packages\pyflink\conf下面的flink-conf.yaml
>>
>> 增加了taskmanager.memory.task.off-heap.size: 100m
>>
>> 但是还是报一样的错误
>>
>> 请问用python安装的flink去哪里配置属性
>>
>>
>>
Re: 官方pyflink 例子的执行问题
Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
True)就行,如果你用了的话,就需要配置off-heap
memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')。你可以参考文档上的例子,以及对应的note说明[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions
Best,
Xingbo
chenxuying <cx...@163.com> 于2020年7月21日周二 上午11:36写道:
> 官方例子:
> https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
> 按照例子写了程序,也安装了pyflink
> |
> python -m pip install apache-flink
> |
> 代码:
> |
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.udf import udf
>
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env)
>
>
> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
> DataTypes.BIGINT())
>
>
> t_env.register_function("add", add)
>
>
> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
> \
> .with_format(OldCsv()
> .field('a', DataTypes.BIGINT())
> .field('b', DataTypes.BIGINT())) \
> .with_schema(Schema()
> .field('a', DataTypes.BIGINT())
> .field('b', DataTypes.BIGINT())) \
> .create_temporary_table('mySource')
>
>
> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
> \
> .with_format(OldCsv()
> .field('sum', DataTypes.BIGINT())) \
> .with_schema(Schema()
> .field('sum', DataTypes.BIGINT())) \
> .create_temporary_table('mySink')
>
>
> t_env.from_path('mySource')\
> .select("add(a, b)") \
> .insert_into('mySink')
>
>
> t_env.execute("tutorial_job")
> |
>
> 执行:
>
> |
> python test_pyflink.py
> |
>
> 报错:
>
>
> |
> Traceback (most recent call last):
> File
> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
> line 147, in deco
> return f(*a, **kw)
> File
> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
> line 328, in get_return_value
> format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : org.apache.flink.table.api.TableException: The configured Task Off-Heap
> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
> The Task Off-Heap Memory can be configured using the configuration key
> 'taskmanager.memory.task.off-heap.size'.
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
> at
> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> During handling of the above exception, another exception occurred:
>
>
> Traceback (most recent call last):
> File "test_pyflink.py", line 34, in <module>
> t_env.execute("tutorial_job")
> File
> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
> line 1057, in execute
> return JobExecutionResult(self._j_tenv.execute(job_name))
> File
> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
> line 1286, in __call__
> answer, self.gateway_client, self.target_id, self.name)
> File
> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
> line 154, in deco
> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
> pyflink.util.exceptions.TableException: "The configured Task Off-Heap
> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
> The Task Off-Heap Memory can be configured using the configuration key
> 'taskmanager.memory.task.off-heap.size'."
> |
>
>
>
>
> 里面提到还要配置taskmanager.memory.task.off-heap.size这个属性吗 ,
>
> 我找到..\Python\Python37\Lib\site-packages\pyflink\conf下面的flink-conf.yaml
>
> 增加了taskmanager.memory.task.off-heap.size: 100m
>
> 但是还是报一样的错误
>
> 请问用python安装的flink去哪里配置属性
>
>
>