You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Cullen <ci...@gmail.com> on 2021/03/12 16:56:17 UTC

Python StreamExecutionEnvironment from_collection Kafka example

I’ve scoured the web looking for an example of using a Kafka source for a
DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...

-- 
Robert Cullen
240-475-4490

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Dian Fu <di...@gmail.com>.
Great to hear!

Regards,
Dian

On Tue, Mar 23, 2021 at 12:46 AM Robert Cullen <ci...@gmail.com>
wrote:

> Dian,
>
> I discovered the issue.  When I redeploy the kubernetes cluster completed
> jobs still remain in the queue.  The Flink REST service will throw an error
> looking for the missing jobs and thus subsequent submission hangs.
>
> On Mon, Mar 22, 2021 at 3:46 AM Dian Fu <di...@gmail.com> wrote:
>
>> Hi Robert,
>>
>> Usually we should submit the job to a cluster in detached mode.
>> Otherwise, it will wait until the job finishes or fails.
>>
>> Could you add the `--detached` flag during submission and try again? Or
>> is there any specific reason to run it in attached mode?
>>
>> Regards,
>> Dian
>>
>> On Fri, Mar 19, 2021 at 10:10 PM Robert Cullen <ci...@gmail.com>
>> wrote:
>>
>>> Dian,
>>> The job runs in attached mode. See the rest below.
>>>
>>> Can you supply some examples of Tumbling Time Windows and the correct
>>> json formatting for writing to a kafka topic. This snippet does not write
>>> to my topic:
>>>
>>>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>>>         result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
>>>         yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
>>>
>>> Here is the job submission command:
>>>
>>> ./bin/flink run \
>>> --target kubernetes-session \
>>> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
>>> --pyModule anomaly_detection \
>>> --pyFiles /opt/flink-1.12.0/examples/anomaly_detection.py
>>>
>>> Here is the code I’m running:
>>>
>>> from typing import Any
>>>
>>> from pyflink.common import Duration
>>> from pyflink.common.serialization import SimpleStringSchema, JsonRowDeserializationSchema
>>> from pyflink.common.typeinfo import Types
>>> from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
>>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>>> from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
>>> from pyflink.datastream.functions import KeyedProcessFunction, KeySelector, MapFunction
>>>
>>> import logging
>>> import json
>>> import sys
>>> import tad
>>>
>>> def json_to_tuple(js, fields):
>>>     return tuple([str(js.get(f, '')) for f in fields])
>>>
>>> def anomaly_detection():
>>>
>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>     env.set_parallelism(1)
>>>     env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>
>>>     # define the schema of the message from kafka, here the data is in json format.
>>>     #    type_info = Types.ROW([Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()]), Types.INT(), Types.INT()])
>>>     #    type_info = Types.ROW_NAMED(['msg_id', 'new_count', 'new_count_total', 'old_count', 'old_count_total', 'score'], [Types.STRING(), Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()])
>>>     type_info = Types.ROW_NAMED(["msg_id", "hostname", "count"], [Types.STRING(), Types.STRING(), Types.INT()])
>>>     json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>>
>>>     logging.info("Row info: %s", json_row_schema)
>>>
>>>     # define the kafka connection properties.
>>>     kafka_props = {'bootstrap.servers': 'kafka-cp-kafka-headless:9092'}
>>>
>>>     # create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
>>>     kafka_consumer = FlinkKafkaConsumer("prometheus-output", json_row_schema, kafka_props)
>>>     kafka_producer = FlinkKafkaProducer("prometheus-sink", SimpleStringSchema(), kafka_props)
>>>
>>>     # set the kafka source to consume data from earliest offset.
>>>     kafka_consumer.set_start_from_earliest()
>>>
>>>     watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()
>>>
>>>     # create a DataStream from kafka consumer source
>>>     ds = env.add_source(kafka_consumer)
>>>     ds.map(lambda x: json.dumps({"msgId": x[0], "hostname": x[1], "count": x[2]}), output_type=Types.STRING()) \
>>>         .add_sink(kafka_producer)
>>>     #ds.key_by(lambda x: x[0], key_type_info=Types.STRING()) \
>>>     #    .process(MyProcessFunction(), output_type=Types.TUPLE([Types.STRING(), Types.INT(), Types.LONG()])) \
>>>     #    .add_sink(kafka_producer)
>>>     env.execute("twitter_anomaly_detection")
>>>
>>> class MyProcessFunction(KeyedProcessFunction):
>>>
>>>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>>>         result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
>>>         yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
>>>         current_watermark = ctx.timer_service().current_watermark()
>>>         ctx.timer_service().register_event_time_timer(current_watermark)
>>>         anomaly_detect_ts(ctx.timestamp, max_anoms=0.02, direction="both", plot=True)
>>>
>>>     #def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
>>>     #    yield "On timer timestamp: " + str(timestamp)
>>>
>>> class KafkaRowTimestampAssigner(TimestampAssigner):
>>>
>>>     def extract_timestamp(self, value: Any, record_timestamp: int) -> int:
>>>         return int(value[0])
>>>
>>> class MyMapFunction(MapFunction):
>>>
>>>     def map(self, value):
>>>         return value[1]
>>>
>>> class MyKeySelector(KeySelector):
>>>
>>>     def get_key(self, value):
>>>         return value[0]
>>>
>>> if __name__ == '__main__':
>>>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
>>>
>>>     anomaly_detection()
>>>
>>>
>>> On Thu, Mar 18, 2021 at 10:10 PM Dian Fu <di...@gmail.com> wrote:
>>>
>>>> Does the job runs in detached mode or attached mode? Could you share
>>>> some code snippets and the job submission command if possible?
>>>>
>>>> Regards,
>>>> Dian
>>>>
>>>> 2021年3月18日 下午8:17,Robert Cullen <ci...@gmail.com> 写道:
>>>>
>>>> Dian,
>>>>
>>>> Thanks for your reply.  Yes, I would submit the same job in kubernetes
>>>> session mode.  Sometimes the job would succeed but successive tries would
>>>> fail. No stack trace, the job would never return a job id:
>>>>
>>>> In this case I redeployed the cluster and the job completed ... and
>>>> multiple tries were successful.
>>>>
>>>>
>>>> On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <di...@gmail.com> wrote:
>>>>
>>>>> Hi Robert,
>>>>>
>>>>> 1) Do you mean that when submitting the same job multiple times and it
>>>>> succeed sometimes and hangs sometimes or it only hangs for some specific
>>>>> job?
>>>>> 2) Which deployment mode do you use?
>>>>> 3) Is it possible to dump the stack trace? It would help us
>>>>> understanding what’s happening.
>>>>>
>>>>> Thanks,
>>>>> Dian
>>>>>
>>>>> 2021年3月16日 下午11:51,Robert Cullen <ci...@gmail.com> 写道:
>>>>>
>>>>> Thanks All,
>>>>>
>>>>> I've added python and pyflink to the TM image which fixed the
>>>>> problem.  Now however submitting a python script to the cluster
>>>>> successfully is sporadic; sometimes it completes but most of the time it
>>>>> just hangs.  Not sure what is causing this.
>>>>>
>>>>> On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <hx...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> From the error message, I think the problem is no python interpreter
>>>>>> on your TaskManager machine. You need to install a python 3.5+ interpreter
>>>>>> on the TM machine, and this python environment needs to install pyflink
>>>>>> (pip install apache-flink). For details, you can refer to the document[1].
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html
>>>>>>
>>>>>> Best,
>>>>>> Xingbo
>>>>>>
>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月16日周二 上午2:58写道:
>>>>>>
>>>>>>> Okay, I added the jars and fixed that exception. However I have a
>>>>>>> new exception that is harder to decipher:
>>>>>>>
>>>>>>> 2021-03-15 14:46:20
>>>>>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>> Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
>>>>>>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>>>>>>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
>>>>>>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
>>>>>>>     at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
>>>>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
>>>>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
>>>>>>>     at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
>>>>>>>     at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>>>>>>>     at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>>>>>>>     at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
>>>>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
>>>>>>>     at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
>>>>>>>     at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.io.IOException: error=2, No such file or directory
>>>>>>>     at java.lang.UNIXProcess.forkAndExec(Native Method)
>>>>>>>     at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
>>>>>>>     at java.lang.ProcessImpl.start(ProcessImpl.java:134)
>>>>>>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
>>>>>>>     ... 20 more
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <rm...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey,
>>>>>>>> are you sure the class is in the lib/ folder of all machines /
>>>>>>>> instances, and you've restarted Flink after adding the files to lib/ ?
>>>>>>>>
>>>>>>>> On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <
>>>>>>>> cinquaterra@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Shuiqiang,
>>>>>>>>>
>>>>>>>>> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib
>>>>>>>>> directory
>>>>>>>>>
>>>>>>>>> When submitting this job to my flink cluster I’m getting this
>>>>>>>>> stack trace at runtime:
>>>>>>>>>
>>>>>>>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>>>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>>>>>>>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>>>>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>>>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>>>>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>>>>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>>>>>> ClassLoader info: URL ClassLoader:
>>>>>>>>> Class not resolvable through given classloader.
>>>>>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>>>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>>>>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>>>>>>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>>>>>>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>>>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>>>>>>>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>>>>>>>>>     at java.lang.Class.forName0(Native Method)
>>>>>>>>>     at java.lang.Class.forName(Class.java:348)
>>>>>>>>>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>>>>>>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>>>>>>>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>>>>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>>>>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>>>>>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>>>>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>>>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>>>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>>>>>>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>>>>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>>>>>>>>>     ... 9 more
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <
>>>>>>>>> acqua.csq@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Robert,
>>>>>>>>>>
>>>>>>>>>> You can refer to
>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>>>>>>>>>> for the whole example.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Shuiqiang
>>>>>>>>>>
>>>>>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午4:01写道:
>>>>>>>>>>
>>>>>>>>>>> Shuiqiang, Can you include the import statements?  thanks.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <
>>>>>>>>>>> acqua.csq@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Robert,
>>>>>>>>>>>>
>>>>>>>>>>>> Kafka Connector is provided in Python DataStream API since
>>>>>>>>>>>> release-1.12.0. And the documentation for it is lacking, we will make it up
>>>>>>>>>>>> soon.
>>>>>>>>>>>>
>>>>>>>>>>>> The following code shows how to apply KafkaConsumers and
>>>>>>>>>>>> KafkaProducer:
>>>>>>>>>>>> ```
>>>>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>>>>> env.set_parallelism(1)
>>>>>>>>>>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>>>>>>>>>>
>>>>>>>>>>>> # define the schema of the message from kafka, here the data is
>>>>>>>>>>>> in json format.
>>>>>>>>>>>> type_info = Types.ROW_NAMED(['createTime', 'orderId',
>>>>>>>>>>>> 'payAmount', 'payPlatform', 'provinceId'],
>>>>>>>>>>>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>>>>>>>>>>>> Types.INT()])
>>>>>>>>>>>> json_row_schema =
>>>>>>>>>>>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>>>>>>>>>>>
>>>>>>>>>>>> # define the kafka connection properties.
>>>>>>>>>>>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>>>>>>>>>>>> 'pyflink-e2e-source'}
>>>>>>>>>>>>
>>>>>>>>>>>> # create the KafkaConsumer and KafkaProducer with the specified
>>>>>>>>>>>> topic name, serialization/deserialization schema and properties.
>>>>>>>>>>>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>>>>>>>>>>>> json_row_schema, kafka_props)
>>>>>>>>>>>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>>>>>>>>>>>> SimpleStringSchema(), kafka_props)
>>>>>>>>>>>>
>>>>>>>>>>>> # set the kafka source to consume data from earliest offset.
>>>>>>>>>>>> kafka_consumer.set_start_from_earliest()
>>>>>>>>>>>>
>>>>>>>>>>>> # create a DataStream from kafka consumer source
>>>>>>>>>>>> ds = env.add_source(kafka_consumer)
>>>>>>>>>>>>
>>>>>>>>>>>> result_stream = ...
>>>>>>>>>>>>
>>>>>>>>>>>> # write the result into kafka by a kafka producer sink.
>>>>>>>>>>>> result_stream.add_sink(kafka_producer)
>>>>>>>>>>>> ```
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Shuiqiang
>>>>>>>>>>>>
>>>>>>>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> I’ve scoured the web looking for an example of using a Kafka
>>>>>>>>>>>>> source for a DataStream in python. Can someone finish this example?
>>>>>>>>>>>>>
>>>>>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>>>>>> env.set_parallelism(1)
>>>>>>>>>>>>> ds = env.from_collection( KAFKA_SOURCE )
>>>>>>>>>>>>> ...
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Robert Cullen
>>>>>>>>>>>>> 240-475-4490
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Robert Cullen
>>>>>>>>>>> 240-475-4490
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Robert Cullen
>>>>>>>>> 240-475-4490
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Robert Cullen
>>>>>>> 240-475-4490
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Robert Cullen
>>>>> 240-475-4490
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Robert Cullen
>>>> 240-475-4490
>>>>
>>>>
>>>>
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Robert Cullen <ci...@gmail.com>.
Dian,

I discovered the issue.  When I redeploy the kubernetes cluster completed
jobs still remain in the queue.  The Flink REST service will throw an error
looking for the missing jobs and thus subsequent submission hangs.

On Mon, Mar 22, 2021 at 3:46 AM Dian Fu <di...@gmail.com> wrote:

> Hi Robert,
>
> Usually we should submit the job to a cluster in detached mode. Otherwise,
> it will wait until the job finishes or fails.
>
> Could you add the `--detached` flag during submission and try again? Or is
> there any specific reason to run it in attached mode?
>
> Regards,
> Dian
>
> On Fri, Mar 19, 2021 at 10:10 PM Robert Cullen <ci...@gmail.com>
> wrote:
>
>> Dian,
>> The job runs in attached mode. See the rest below.
>>
>> Can you supply some examples of Tumbling Time Windows and the correct
>> json formatting for writing to a kafka topic. This snippet does not write
>> to my topic:
>>
>>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>>         result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
>>         yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
>>
>> Here is the job submission command:
>>
>> ./bin/flink run \
>> --target kubernetes-session \
>> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
>> --pyModule anomaly_detection \
>> --pyFiles /opt/flink-1.12.0/examples/anomaly_detection.py
>>
>> Here is the code I’m running:
>>
>> from typing import Any
>>
>> from pyflink.common import Duration
>> from pyflink.common.serialization import SimpleStringSchema, JsonRowDeserializationSchema
>> from pyflink.common.typeinfo import Types
>> from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>> from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
>> from pyflink.datastream.functions import KeyedProcessFunction, KeySelector, MapFunction
>>
>> import logging
>> import json
>> import sys
>> import tad
>>
>> def json_to_tuple(js, fields):
>>     return tuple([str(js.get(f, '')) for f in fields])
>>
>> def anomaly_detection():
>>
>>     env = StreamExecutionEnvironment.get_execution_environment()
>>     env.set_parallelism(1)
>>     env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>
>>     # define the schema of the message from kafka, here the data is in json format.
>>     #    type_info = Types.ROW([Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()]), Types.INT(), Types.INT()])
>>     #    type_info = Types.ROW_NAMED(['msg_id', 'new_count', 'new_count_total', 'old_count', 'old_count_total', 'score'], [Types.STRING(), Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()])
>>     type_info = Types.ROW_NAMED(["msg_id", "hostname", "count"], [Types.STRING(), Types.STRING(), Types.INT()])
>>     json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>
>>     logging.info("Row info: %s", json_row_schema)
>>
>>     # define the kafka connection properties.
>>     kafka_props = {'bootstrap.servers': 'kafka-cp-kafka-headless:9092'}
>>
>>     # create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
>>     kafka_consumer = FlinkKafkaConsumer("prometheus-output", json_row_schema, kafka_props)
>>     kafka_producer = FlinkKafkaProducer("prometheus-sink", SimpleStringSchema(), kafka_props)
>>
>>     # set the kafka source to consume data from earliest offset.
>>     kafka_consumer.set_start_from_earliest()
>>
>>     watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()
>>
>>     # create a DataStream from kafka consumer source
>>     ds = env.add_source(kafka_consumer)
>>     ds.map(lambda x: json.dumps({"msgId": x[0], "hostname": x[1], "count": x[2]}), output_type=Types.STRING()) \
>>         .add_sink(kafka_producer)
>>     #ds.key_by(lambda x: x[0], key_type_info=Types.STRING()) \
>>     #    .process(MyProcessFunction(), output_type=Types.TUPLE([Types.STRING(), Types.INT(), Types.LONG()])) \
>>     #    .add_sink(kafka_producer)
>>     env.execute("twitter_anomaly_detection")
>>
>> class MyProcessFunction(KeyedProcessFunction):
>>
>>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>>         result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
>>         yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
>>         current_watermark = ctx.timer_service().current_watermark()
>>         ctx.timer_service().register_event_time_timer(current_watermark)
>>         anomaly_detect_ts(ctx.timestamp, max_anoms=0.02, direction="both", plot=True)
>>
>>     #def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
>>     #    yield "On timer timestamp: " + str(timestamp)
>>
>> class KafkaRowTimestampAssigner(TimestampAssigner):
>>
>>     def extract_timestamp(self, value: Any, record_timestamp: int) -> int:
>>         return int(value[0])
>>
>> class MyMapFunction(MapFunction):
>>
>>     def map(self, value):
>>         return value[1]
>>
>> class MyKeySelector(KeySelector):
>>
>>     def get_key(self, value):
>>         return value[0]
>>
>> if __name__ == '__main__':
>>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
>>
>>     anomaly_detection()
>>
>>
>> On Thu, Mar 18, 2021 at 10:10 PM Dian Fu <di...@gmail.com> wrote:
>>
>>> Does the job runs in detached mode or attached mode? Could you share
>>> some code snippets and the job submission command if possible?
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年3月18日 下午8:17,Robert Cullen <ci...@gmail.com> 写道:
>>>
>>> Dian,
>>>
>>> Thanks for your reply.  Yes, I would submit the same job in kubernetes
>>> session mode.  Sometimes the job would succeed but successive tries would
>>> fail. No stack trace, the job would never return a job id:
>>>
>>> In this case I redeployed the cluster and the job completed ... and
>>> multiple tries were successful.
>>>
>>>
>>> On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <di...@gmail.com> wrote:
>>>
>>>> Hi Robert,
>>>>
>>>> 1) Do you mean that when submitting the same job multiple times and it
>>>> succeed sometimes and hangs sometimes or it only hangs for some specific
>>>> job?
>>>> 2) Which deployment mode do you use?
>>>> 3) Is it possible to dump the stack trace? It would help us
>>>> understanding what’s happening.
>>>>
>>>> Thanks,
>>>> Dian
>>>>
>>>> 2021年3月16日 下午11:51,Robert Cullen <ci...@gmail.com> 写道:
>>>>
>>>> Thanks All,
>>>>
>>>> I've added python and pyflink to the TM image which fixed the problem.
>>>> Now however submitting a python script to the cluster successfully is
>>>> sporadic; sometimes it completes but most of the time it just hangs.  Not
>>>> sure what is causing this.
>>>>
>>>> On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <hx...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> From the error message, I think the problem is no python interpreter
>>>>> on your TaskManager machine. You need to install a python 3.5+ interpreter
>>>>> on the TM machine, and this python environment needs to install pyflink
>>>>> (pip install apache-flink). For details, you can refer to the document[1].
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html
>>>>>
>>>>> Best,
>>>>> Xingbo
>>>>>
>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月16日周二 上午2:58写道:
>>>>>
>>>>>> Okay, I added the jars and fixed that exception. However I have a new
>>>>>> exception that is harder to decipher:
>>>>>>
>>>>>> 2021-03-15 14:46:20
>>>>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>> Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
>>>>>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>>>>>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
>>>>>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
>>>>>>     at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
>>>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
>>>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
>>>>>>     at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
>>>>>>     at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>>>>>>     at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>>>>>>     at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
>>>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
>>>>>>     at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
>>>>>>     at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.io.IOException: error=2, No such file or directory
>>>>>>     at java.lang.UNIXProcess.forkAndExec(Native Method)
>>>>>>     at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
>>>>>>     at java.lang.ProcessImpl.start(ProcessImpl.java:134)
>>>>>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
>>>>>>     ... 20 more
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <rm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey,
>>>>>>> are you sure the class is in the lib/ folder of all machines /
>>>>>>> instances, and you've restarted Flink after adding the files to lib/ ?
>>>>>>>
>>>>>>> On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <ci...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Shuiqiang,
>>>>>>>>
>>>>>>>> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib
>>>>>>>> directory
>>>>>>>>
>>>>>>>> When submitting this job to my flink cluster I’m getting this stack
>>>>>>>> trace at runtime:
>>>>>>>>
>>>>>>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>>>>>>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>>>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>>>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>>>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>>>>> ClassLoader info: URL ClassLoader:
>>>>>>>> Class not resolvable through given classloader.
>>>>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>>>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>>>>>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>>>>>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>>>>>>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>>>>>>>>     at java.lang.Class.forName0(Native Method)
>>>>>>>>     at java.lang.Class.forName(Class.java:348)
>>>>>>>>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>>>>>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>>>>>>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>>>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>>>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>>>>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>>>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>>>>>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>>>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>>>>>>>>     ... 9 more
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <
>>>>>>>> acqua.csq@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Robert,
>>>>>>>>>
>>>>>>>>> You can refer to
>>>>>>>>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>>>>>>>>> for the whole example.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Shuiqiang
>>>>>>>>>
>>>>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午4:01写道:
>>>>>>>>>
>>>>>>>>>> Shuiqiang, Can you include the import statements?  thanks.
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <
>>>>>>>>>> acqua.csq@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Robert,
>>>>>>>>>>>
>>>>>>>>>>> Kafka Connector is provided in Python DataStream API since
>>>>>>>>>>> release-1.12.0. And the documentation for it is lacking, we will make it up
>>>>>>>>>>> soon.
>>>>>>>>>>>
>>>>>>>>>>> The following code shows how to apply KafkaConsumers and
>>>>>>>>>>> KafkaProducer:
>>>>>>>>>>> ```
>>>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>>>> env.set_parallelism(1)
>>>>>>>>>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>>>>>>>>>
>>>>>>>>>>> # define the schema of the message from kafka, here the data is
>>>>>>>>>>> in json format.
>>>>>>>>>>> type_info = Types.ROW_NAMED(['createTime', 'orderId',
>>>>>>>>>>> 'payAmount', 'payPlatform', 'provinceId'],
>>>>>>>>>>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>>>>>>>>>>> Types.INT()])
>>>>>>>>>>> json_row_schema =
>>>>>>>>>>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>>>>>>>>>>
>>>>>>>>>>> # define the kafka connection properties.
>>>>>>>>>>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>>>>>>>>>>> 'pyflink-e2e-source'}
>>>>>>>>>>>
>>>>>>>>>>> # create the KafkaConsumer and KafkaProducer with the specified
>>>>>>>>>>> topic name, serialization/deserialization schema and properties.
>>>>>>>>>>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>>>>>>>>>>> json_row_schema, kafka_props)
>>>>>>>>>>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>>>>>>>>>>> SimpleStringSchema(), kafka_props)
>>>>>>>>>>>
>>>>>>>>>>> # set the kafka source to consume data from earliest offset.
>>>>>>>>>>> kafka_consumer.set_start_from_earliest()
>>>>>>>>>>>
>>>>>>>>>>> # create a DataStream from kafka consumer source
>>>>>>>>>>> ds = env.add_source(kafka_consumer)
>>>>>>>>>>>
>>>>>>>>>>> result_stream = ...
>>>>>>>>>>>
>>>>>>>>>>> # write the result into kafka by a kafka producer sink.
>>>>>>>>>>> result_stream.add_sink(kafka_producer)
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Shuiqiang
>>>>>>>>>>>
>>>>>>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>>>>>>>>>>
>>>>>>>>>>>> I’ve scoured the web looking for an example of using a Kafka
>>>>>>>>>>>> source for a DataStream in python. Can someone finish this example?
>>>>>>>>>>>>
>>>>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>>>>> env.set_parallelism(1)
>>>>>>>>>>>> ds = env.from_collection( KAFKA_SOURCE )
>>>>>>>>>>>> ...
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Robert Cullen
>>>>>>>>>>>> 240-475-4490
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Robert Cullen
>>>>>>>>>> 240-475-4490
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Robert Cullen
>>>>>>>> 240-475-4490
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Robert Cullen
>>>>>> 240-475-4490
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Robert Cullen
>>>> 240-475-4490
>>>>
>>>>
>>>>
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>>
>>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Dian Fu <di...@gmail.com>.
Hi Robert,

Usually we should submit the job to a cluster in detached mode. Otherwise,
it will wait until the job finishes or fails.

Could you add the `--detached` flag during submission and try again? Or is
there any specific reason to run it in attached mode?

Regards,
Dian

On Fri, Mar 19, 2021 at 10:10 PM Robert Cullen <ci...@gmail.com>
wrote:

> Dian,
> The job runs in attached mode. See the rest below.
>
> Can you supply some examples of Tumbling Time Windows and the correct json
> formatting for writing to a kafka topic. This snippet does not write to my
> topic:
>
>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>         result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
>         yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
>
> Here is the job submission command:
>
> ./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
> --pyModule anomaly_detection \
> --pyFiles /opt/flink-1.12.0/examples/anomaly_detection.py
>
> Here is the code I’m running:
>
> from typing import Any
>
> from pyflink.common import Duration
> from pyflink.common.serialization import SimpleStringSchema, JsonRowDeserializationSchema
> from pyflink.common.typeinfo import Types
> from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
> from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
> from pyflink.datastream.functions import KeyedProcessFunction, KeySelector, MapFunction
>
> import logging
> import json
> import sys
> import tad
>
> def json_to_tuple(js, fields):
>     return tuple([str(js.get(f, '')) for f in fields])
>
> def anomaly_detection():
>
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_parallelism(1)
>     env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>
>     # define the schema of the message from kafka, here the data is in json format.
>     #    type_info = Types.ROW([Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()]), Types.INT(), Types.INT()])
>     #    type_info = Types.ROW_NAMED(['msg_id', 'new_count', 'new_count_total', 'old_count', 'old_count_total', 'score'], [Types.STRING(), Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()])
>     type_info = Types.ROW_NAMED(["msg_id", "hostname", "count"], [Types.STRING(), Types.STRING(), Types.INT()])
>     json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
>
>     logging.info("Row info: %s", json_row_schema)
>
>     # define the kafka connection properties.
>     kafka_props = {'bootstrap.servers': 'kafka-cp-kafka-headless:9092'}
>
>     # create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
>     kafka_consumer = FlinkKafkaConsumer("prometheus-output", json_row_schema, kafka_props)
>     kafka_producer = FlinkKafkaProducer("prometheus-sink", SimpleStringSchema(), kafka_props)
>
>     # set the kafka source to consume data from earliest offset.
>     kafka_consumer.set_start_from_earliest()
>
>     watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()
>
>     # create a DataStream from kafka consumer source
>     ds = env.add_source(kafka_consumer)
>     ds.map(lambda x: json.dumps({"msgId": x[0], "hostname": x[1], "count": x[2]}), output_type=Types.STRING()) \
>         .add_sink(kafka_producer)
>     #ds.key_by(lambda x: x[0], key_type_info=Types.STRING()) \
>     #    .process(MyProcessFunction(), output_type=Types.TUPLE([Types.STRING(), Types.INT(), Types.LONG()])) \
>     #    .add_sink(kafka_producer)
>     env.execute("twitter_anomaly_detection")
>
> class MyProcessFunction(KeyedProcessFunction):
>
>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
>         result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
>         yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
>         current_watermark = ctx.timer_service().current_watermark()
>         ctx.timer_service().register_event_time_timer(current_watermark)
>         anomaly_detect_ts(ctx.timestamp, max_anoms=0.02, direction="both", plot=True)
>
>     #def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
>     #    yield "On timer timestamp: " + str(timestamp)
>
> class KafkaRowTimestampAssigner(TimestampAssigner):
>
>     def extract_timestamp(self, value: Any, record_timestamp: int) -> int:
>         return int(value[0])
>
> class MyMapFunction(MapFunction):
>
>     def map(self, value):
>         return value[1]
>
> class MyKeySelector(KeySelector):
>
>     def get_key(self, value):
>         return value[0]
>
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
>
>     anomaly_detection()
>
>
> On Thu, Mar 18, 2021 at 10:10 PM Dian Fu <di...@gmail.com> wrote:
>
>> Does the job runs in detached mode or attached mode? Could you share some
>> code snippets and the job submission command if possible?
>>
>> Regards,
>> Dian
>>
>> 2021年3月18日 下午8:17,Robert Cullen <ci...@gmail.com> 写道:
>>
>> Dian,
>>
>> Thanks for your reply.  Yes, I would submit the same job in kubernetes
>> session mode.  Sometimes the job would succeed but successive tries would
>> fail. No stack trace, the job would never return a job id:
>>
>> In this case I redeployed the cluster and the job completed ... and
>> multiple tries were successful.
>>
>>
>> On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <di...@gmail.com> wrote:
>>
>>> Hi Robert,
>>>
>>> 1) Do you mean that when submitting the same job multiple times and it
>>> succeed sometimes and hangs sometimes or it only hangs for some specific
>>> job?
>>> 2) Which deployment mode do you use?
>>> 3) Is it possible to dump the stack trace? It would help us
>>> understanding what’s happening.
>>>
>>> Thanks,
>>> Dian
>>>
>>> 2021年3月16日 下午11:51,Robert Cullen <ci...@gmail.com> 写道:
>>>
>>> Thanks All,
>>>
>>> I've added python and pyflink to the TM image which fixed the problem.
>>> Now however submitting a python script to the cluster successfully is
>>> sporadic; sometimes it completes but most of the time it just hangs.  Not
>>> sure what is causing this.
>>>
>>> On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <hx...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> From the error message, I think the problem is no python interpreter on
>>>> your TaskManager machine. You need to install a python 3.5+ interpreter on
>>>> the TM machine, and this python environment needs to install pyflink (pip
>>>> install apache-flink). For details, you can refer to the document[1].
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> Robert Cullen <ci...@gmail.com> 于2021年3月16日周二 上午2:58写道:
>>>>
>>>>> Okay, I added the jars and fixed that exception. However I have a new
>>>>> exception that is harder to decipher:
>>>>>
>>>>> 2021-03-15 14:46:20
>>>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
>>>>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>>>>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
>>>>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
>>>>>     at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
>>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
>>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
>>>>>     at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
>>>>>     at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>>>>>     at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>>>>>     at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
>>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
>>>>>     at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
>>>>>     at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.io.IOException: error=2, No such file or directory
>>>>>     at java.lang.UNIXProcess.forkAndExec(Native Method)
>>>>>     at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
>>>>>     at java.lang.ProcessImpl.start(ProcessImpl.java:134)
>>>>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
>>>>>     ... 20 more
>>>>>
>>>>>
>>>>> On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <rm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hey,
>>>>>> are you sure the class is in the lib/ folder of all machines /
>>>>>> instances, and you've restarted Flink after adding the files to lib/ ?
>>>>>>
>>>>>> On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <ci...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Shuiqiang,
>>>>>>>
>>>>>>> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib
>>>>>>> directory
>>>>>>>
>>>>>>> When submitting this job to my flink cluster I’m getting this stack
>>>>>>> trace at runtime:
>>>>>>>
>>>>>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>>>>>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>>>> ClassLoader info: URL ClassLoader:
>>>>>>> Class not resolvable through given classloader.
>>>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>>>>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>>>>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>>>>>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>>>>>>>     at java.lang.Class.forName0(Native Method)
>>>>>>>     at java.lang.Class.forName(Class.java:348)
>>>>>>>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>>>>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>>>>>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>>>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>>>>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>>>>>>>     ... 9 more
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <ac...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Robert,
>>>>>>>>
>>>>>>>> You can refer to
>>>>>>>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>>>>>>>> for the whole example.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Shuiqiang
>>>>>>>>
>>>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午4:01写道:
>>>>>>>>
>>>>>>>>> Shuiqiang, Can you include the import statements?  thanks.
>>>>>>>>>
>>>>>>>>> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <
>>>>>>>>> acqua.csq@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Robert,
>>>>>>>>>>
>>>>>>>>>> Kafka Connector is provided in Python DataStream API since
>>>>>>>>>> release-1.12.0. And the documentation for it is lacking, we will make it up
>>>>>>>>>> soon.
>>>>>>>>>>
>>>>>>>>>> The following code shows how to apply KafkaConsumers and
>>>>>>>>>> KafkaProducer:
>>>>>>>>>> ```
>>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>>> env.set_parallelism(1)
>>>>>>>>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>>>>>>>>
>>>>>>>>>> # define the schema of the message from kafka, here the data is
>>>>>>>>>> in json format.
>>>>>>>>>> type_info = Types.ROW_NAMED(['createTime', 'orderId',
>>>>>>>>>> 'payAmount', 'payPlatform', 'provinceId'],
>>>>>>>>>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>>>>>>>>>> Types.INT()])
>>>>>>>>>> json_row_schema =
>>>>>>>>>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>>>>>>>>>
>>>>>>>>>> # define the kafka connection properties.
>>>>>>>>>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>>>>>>>>>> 'pyflink-e2e-source'}
>>>>>>>>>>
>>>>>>>>>> # create the KafkaConsumer and KafkaProducer with the specified
>>>>>>>>>> topic name, serialization/deserialization schema and properties.
>>>>>>>>>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>>>>>>>>>> json_row_schema, kafka_props)
>>>>>>>>>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>>>>>>>>>> SimpleStringSchema(), kafka_props)
>>>>>>>>>>
>>>>>>>>>> # set the kafka source to consume data from earliest offset.
>>>>>>>>>> kafka_consumer.set_start_from_earliest()
>>>>>>>>>>
>>>>>>>>>> # create a DataStream from kafka consumer source
>>>>>>>>>> ds = env.add_source(kafka_consumer)
>>>>>>>>>>
>>>>>>>>>> result_stream = ...
>>>>>>>>>>
>>>>>>>>>> # write the result into kafka by a kafka producer sink.
>>>>>>>>>> result_stream.add_sink(kafka_producer)
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Shuiqiang
>>>>>>>>>>
>>>>>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>>>>>>>>>
>>>>>>>>>>> I’ve scoured the web looking for an example of using a Kafka
>>>>>>>>>>> source for a DataStream in python. Can someone finish this example?
>>>>>>>>>>>
>>>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>>>> env.set_parallelism(1)
>>>>>>>>>>> ds = env.from_collection( KAFKA_SOURCE )
>>>>>>>>>>> ...
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Robert Cullen
>>>>>>>>>>> 240-475-4490
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Robert Cullen
>>>>>>>>> 240-475-4490
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Robert Cullen
>>>>>>> 240-475-4490
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Robert Cullen
>>>>> 240-475-4490
>>>>>
>>>>
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>>
>>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Robert Cullen <ci...@gmail.com>.
Dian,
The job runs in attached mode. See the rest below.

Can you supply some examples of Tumbling Time Windows and the correct json
formatting for writing to a kafka topic. This snippet does not write to my
topic:

    def process_element(self, value, ctx:
'KeyedProcessFunction.Context'):
        result = "{'msgId': {}, 'count': {}, 'timestamp':
{}}".format(str(ctx.get_current_key, str(value[2]),
str(ctx.timestamp())))
        yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])

Here is the job submission command:

./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule anomaly_detection \
--pyFiles /opt/flink-1.12.0/examples/anomaly_detection.py

Here is the code I’m running:

from typing import Any

from pyflink.common import Duration
from pyflink.common.serialization import SimpleStringSchema,
JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner,
WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.functions import KeyedProcessFunction,
KeySelector, MapFunction

import logging
import json
import sys
import tad

def json_to_tuple(js, fields):
    return tuple([str(js.get(f, '')) for f in fields])

def anomaly_detection():

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

    # define the schema of the message from kafka, here the data is in
json format.
    #    type_info = Types.ROW([Types.STRING(), Types.STRING(),
Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT(),
Types.STRING()]), Types.INT(), Types.INT()])
    #    type_info = Types.ROW_NAMED(['msg_id', 'new_count',
'new_count_total', 'old_count', 'old_count_total', 'score'],
[Types.STRING(), Types.INT(), Types.INT(), Types.INT(), Types.INT(),
Types.STRING()])
    type_info = Types.ROW_NAMED(["msg_id", "hostname", "count"],
[Types.STRING(), Types.STRING(), Types.INT()])
    json_row_schema =
JsonRowDeserializationSchema.builder().type_info(type_info).build()

    logging.info("Row info: %s", json_row_schema)

    # define the kafka connection properties.
    kafka_props = {'bootstrap.servers': 'kafka-cp-kafka-headless:9092'}

    # create the KafkaConsumer and KafkaProducer with the specified
topic name, serialization/deserialization schema and properties.
    kafka_consumer = FlinkKafkaConsumer("prometheus-output",
json_row_schema, kafka_props)
    kafka_producer = FlinkKafkaProducer("prometheus-sink",
SimpleStringSchema(), kafka_props)

    # set the kafka source to consume data from earliest offset.
    kafka_consumer.set_start_from_earliest()

    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()

    # create a DataStream from kafka consumer source
    ds = env.add_source(kafka_consumer)
    ds.map(lambda x: json.dumps({"msgId": x[0], "hostname": x[1],
"count": x[2]}), output_type=Types.STRING()) \
        .add_sink(kafka_producer)
    #ds.key_by(lambda x: x[0], key_type_info=Types.STRING()) \
    #    .process(MyProcessFunction(),
output_type=Types.TUPLE([Types.STRING(), Types.INT(), Types.LONG()]))
\
    #    .add_sink(kafka_producer)
    env.execute("twitter_anomaly_detection")

class MyProcessFunction(KeyedProcessFunction):

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        result = "{'msgId': {}, 'count': {}, 'timestamp':
{}}".format(str(ctx.get_current_key, str(value[2]),
str(ctx.timestamp())))
        yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
        current_watermark = ctx.timer_service().current_watermark()
        ctx.timer_service().register_event_time_timer(current_watermark)
        anomaly_detect_ts(ctx.timestamp, max_anoms=0.02,
direction="both", plot=True)

    #def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
    #    yield "On timer timestamp: " + str(timestamp)

class KafkaRowTimestampAssigner(TimestampAssigner):

    def extract_timestamp(self, value: Any, record_timestamp: int) -> int:
        return int(value[0])

class MyMapFunction(MapFunction):

    def map(self, value):
        return value[1]

class MyKeySelector(KeySelector):

    def get_key(self, value):
        return value[0]

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")

    anomaly_detection()


On Thu, Mar 18, 2021 at 10:10 PM Dian Fu <di...@gmail.com> wrote:

> Does the job runs in detached mode or attached mode? Could you share some
> code snippets and the job submission command if possible?
>
> Regards,
> Dian
>
> 2021年3月18日 下午8:17,Robert Cullen <ci...@gmail.com> 写道:
>
> Dian,
>
> Thanks for your reply.  Yes, I would submit the same job in kubernetes
> session mode.  Sometimes the job would succeed but successive tries would
> fail. No stack trace, the job would never return a job id:
>
> In this case I redeployed the cluster and the job completed ... and
> multiple tries were successful.
>
>
> On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <di...@gmail.com> wrote:
>
>> Hi Robert,
>>
>> 1) Do you mean that when submitting the same job multiple times and it
>> succeed sometimes and hangs sometimes or it only hangs for some specific
>> job?
>> 2) Which deployment mode do you use?
>> 3) Is it possible to dump the stack trace? It would help us understanding
>> what’s happening.
>>
>> Thanks,
>> Dian
>>
>> 2021年3月16日 下午11:51,Robert Cullen <ci...@gmail.com> 写道:
>>
>> Thanks All,
>>
>> I've added python and pyflink to the TM image which fixed the problem.
>> Now however submitting a python script to the cluster successfully is
>> sporadic; sometimes it completes but most of the time it just hangs.  Not
>> sure what is causing this.
>>
>> On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <hx...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> From the error message, I think the problem is no python interpreter on
>>> your TaskManager machine. You need to install a python 3.5+ interpreter on
>>> the TM machine, and this python environment needs to install pyflink (pip
>>> install apache-flink). For details, you can refer to the document[1].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html
>>>
>>> Best,
>>> Xingbo
>>>
>>> Robert Cullen <ci...@gmail.com> 于2021年3月16日周二 上午2:58写道:
>>>
>>>> Okay, I added the jars and fixed that exception. However I have a new
>>>> exception that is harder to decipher:
>>>>
>>>> 2021-03-15 14:46:20
>>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
>>>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>>>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
>>>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
>>>>     at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
>>>>     at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
>>>>     at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>>>>     at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>>>>     at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
>>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
>>>>     at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
>>>>     at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.io.IOException: error=2, No such file or directory
>>>>     at java.lang.UNIXProcess.forkAndExec(Native Method)
>>>>     at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
>>>>     at java.lang.ProcessImpl.start(ProcessImpl.java:134)
>>>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
>>>>     ... 20 more
>>>>
>>>>
>>>> On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <rm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hey,
>>>>> are you sure the class is in the lib/ folder of all machines /
>>>>> instances, and you've restarted Flink after adding the files to lib/ ?
>>>>>
>>>>> On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <ci...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Shuiqiang,
>>>>>>
>>>>>> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib
>>>>>> directory
>>>>>>
>>>>>> When submitting this job to my flink cluster I’m getting this stack
>>>>>> trace at runtime:
>>>>>>
>>>>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>>>>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>>> ClassLoader info: URL ClassLoader:
>>>>>> Class not resolvable through given classloader.
>>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>>>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>>>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>>>>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>>>>>>     at java.lang.Class.forName0(Native Method)
>>>>>>     at java.lang.Class.forName(Class.java:348)
>>>>>>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>>>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>>>>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>>>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>>>>>>     ... 9 more
>>>>>>
>>>>>>
>>>>>> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <ac...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Robert,
>>>>>>>
>>>>>>> You can refer to
>>>>>>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>>>>>>> for the whole example.
>>>>>>>
>>>>>>> Best,
>>>>>>> Shuiqiang
>>>>>>>
>>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午4:01写道:
>>>>>>>
>>>>>>>> Shuiqiang, Can you include the import statements?  thanks.
>>>>>>>>
>>>>>>>> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <ac...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Robert,
>>>>>>>>>
>>>>>>>>> Kafka Connector is provided in Python DataStream API since
>>>>>>>>> release-1.12.0. And the documentation for it is lacking, we will make it up
>>>>>>>>> soon.
>>>>>>>>>
>>>>>>>>> The following code shows how to apply KafkaConsumers and
>>>>>>>>> KafkaProducer:
>>>>>>>>> ```
>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>> env.set_parallelism(1)
>>>>>>>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>>>>>>>
>>>>>>>>> # define the schema of the message from kafka, here the data is in
>>>>>>>>> json format.
>>>>>>>>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
>>>>>>>>> 'payPlatform', 'provinceId'],
>>>>>>>>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>>>>>>>>> Types.INT()])
>>>>>>>>> json_row_schema =
>>>>>>>>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>>>>>>>>
>>>>>>>>> # define the kafka connection properties.
>>>>>>>>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>>>>>>>>> 'pyflink-e2e-source'}
>>>>>>>>>
>>>>>>>>> # create the KafkaConsumer and KafkaProducer with the specified
>>>>>>>>> topic name, serialization/deserialization schema and properties.
>>>>>>>>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>>>>>>>>> json_row_schema, kafka_props)
>>>>>>>>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>>>>>>>>> SimpleStringSchema(), kafka_props)
>>>>>>>>>
>>>>>>>>> # set the kafka source to consume data from earliest offset.
>>>>>>>>> kafka_consumer.set_start_from_earliest()
>>>>>>>>>
>>>>>>>>> # create a DataStream from kafka consumer source
>>>>>>>>> ds = env.add_source(kafka_consumer)
>>>>>>>>>
>>>>>>>>> result_stream = ...
>>>>>>>>>
>>>>>>>>> # write the result into kafka by a kafka producer sink.
>>>>>>>>> result_stream.add_sink(kafka_producer)
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Shuiqiang
>>>>>>>>>
>>>>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>>>>>>>>
>>>>>>>>>> I’ve scoured the web looking for an example of using a Kafka
>>>>>>>>>> source for a DataStream in python. Can someone finish this example?
>>>>>>>>>>
>>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>>> env.set_parallelism(1)
>>>>>>>>>> ds = env.from_collection( KAFKA_SOURCE )
>>>>>>>>>> ...
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Robert Cullen
>>>>>>>>>> 240-475-4490
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Robert Cullen
>>>>>>>> 240-475-4490
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Robert Cullen
>>>>>> 240-475-4490
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Robert Cullen
>>>> 240-475-4490
>>>>
>>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>
>
>

-- 
Robert Cullen
240-475-4490

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Dian Fu <di...@gmail.com>.
Does the job runs in detached mode or attached mode? Could you share some code snippets and the job submission command if possible?

Regards,
Dian

> 2021年3月18日 下午8:17,Robert Cullen <ci...@gmail.com> 写道:
> 
> Dian,
> 
> Thanks for your reply.  Yes, I would submit the same job in kubernetes session mode.  Sometimes the job would succeed but successive tries would fail. No stack trace, the job would never return a job id:
> 
> In this case I redeployed the cluster and the job completed ... and multiple tries were successful.
> 
> 
> On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <dian0511.fu@gmail.com <ma...@gmail.com>> wrote:
> Hi Robert,
> 
> 1) Do you mean that when submitting the same job multiple times and it succeed sometimes and hangs sometimes or it only hangs for some specific job?
> 2) Which deployment mode do you use? 
> 3) Is it possible to dump the stack trace? It would help us understanding what’s happening.
> 
> Thanks,
> Dian
> 
>> 2021年3月16日 下午11:51,Robert Cullen <cinquaterra@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Thanks All,
>> 
>> I've added python and pyflink to the TM image which fixed the problem.  Now however submitting a python script to the cluster successfully is sporadic; sometimes it completes but most of the time it just hangs.  Not sure what is causing this.
>> 
>> On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <hxbks2ks@gmail.com <ma...@gmail.com>> wrote:
>> Hi,
>> 
>> From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].
>> 
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html>
>> 
>> Best,
>> Xingbo
>> 
>> Robert Cullen <cinquaterra@gmail.com <ma...@gmail.com>> 于2021年3月16日周二 上午2:58写道:
>> Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:
>> 
>> 2021-03-15 14:46:20
>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
>>     at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
>>     at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
>>     at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>>     at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>>     at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
>>     at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
>>     at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: error=2, No such file or directory
>>     at java.lang.UNIXProcess.forkAndExec(Native Method)
>>     at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
>>     at java.lang.ProcessImpl.start(ProcessImpl.java:134)
>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
>>     ... 20 more
>> 
>> On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <rmetzger@apache.org <ma...@apache.org>> wrote:
>> Hey,
>> are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?
>> 
>> On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <cinquaterra@gmail.com <ma...@gmail.com>> wrote:
>> Shuiqiang,
>> 
>> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory
>> 
>> When submitting this job to my flink cluster I’m getting this stack trace at runtime:
>> 
>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>> ClassLoader info: URL ClassLoader:
>> Class not resolvable through given classloader.
>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>>     at java.lang.Class.forName0(Native Method)
>>     at java.lang.Class.forName(Class.java:348)
>>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>>     ... 9 more
>> 
>> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <acqua.csq@gmail.com <ma...@gmail.com>> wrote:
>> Hi Robert,
>> 
>> You can refer to https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py <https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py> for the whole example.
>> 
>> Best,
>> Shuiqiang
>> 
>> Robert Cullen <cinquaterra@gmail.com <ma...@gmail.com>> 于2021年3月13日周六 上午4:01写道:
>> Shuiqiang, Can you include the import statements?  thanks.
>> 
>> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <acqua.csq@gmail.com <ma...@gmail.com>> wrote:
>> Hi Robert,
>> 
>> Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.
>> 
>> The following code shows how to apply KafkaConsumers and KafkaProducer:
>> ```
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> 
>> # define the schema of the message from kafka, here the data is in json format.
>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>> Types.INT()])
>> json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
>> 
>> # define the kafka connection properties.
>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id <http://group.id/>': 'pyflink-e2e-source'}
>> 
>> # create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
>> kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)
>> 
>> # set the kafka source to consume data from earliest offset.
>> kafka_consumer.set_start_from_earliest()
>> 
>> # create a DataStream from kafka consumer source
>> ds = env.add_source(kafka_consumer)  
>> 
>> result_stream = ...
>> 
>> # write the result into kafka by a kafka producer sink.
>> result_stream.add_sink(kafka_producer)
>> ```
>> 
>> Best,
>> Shuiqiang
>> 
>> Robert Cullen <cinquaterra@gmail.com <ma...@gmail.com>> 于2021年3月13日周六 上午12:56写道:
>> I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?
>> 
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> ds = env.from_collection( KAFKA_SOURCE )
>> ...
>> -- 
>> Robert Cullen
>> 240-475-4490
>> 
>> 
>> -- 
>> Robert Cullen
>> 240-475-4490
>> 
>> 
>> -- 
>> Robert Cullen
>> 240-475-4490
>> 
>> 
>> -- 
>> Robert Cullen
>> 240-475-4490
>> 
>> 
>> -- 
>> Robert Cullen
>> 240-475-4490
> 
> 
> 
> -- 
> Robert Cullen
> 240-475-4490


Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Robert Cullen <ci...@gmail.com>.
Dian,

Thanks for your reply.  Yes, I would submit the same job in kubernetes
session mode.  Sometimes the job would succeed but successive tries would
fail. No stack trace, the job would never return a job id:

In this case I redeployed the cluster and the job completed ... and
multiple tries were successful.


On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <di...@gmail.com> wrote:

> Hi Robert,
>
> 1) Do you mean that when submitting the same job multiple times and it
> succeed sometimes and hangs sometimes or it only hangs for some specific
> job?
> 2) Which deployment mode do you use?
> 3) Is it possible to dump the stack trace? It would help us understanding
> what’s happening.
>
> Thanks,
> Dian
>
> 2021年3月16日 下午11:51,Robert Cullen <ci...@gmail.com> 写道:
>
> Thanks All,
>
> I've added python and pyflink to the TM image which fixed the problem.
> Now however submitting a python script to the cluster successfully is
> sporadic; sometimes it completes but most of the time it just hangs.  Not
> sure what is causing this.
>
> On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <hx...@gmail.com> wrote:
>
>> Hi,
>>
>> From the error message, I think the problem is no python interpreter on
>> your TaskManager machine. You need to install a python 3.5+ interpreter on
>> the TM machine, and this python environment needs to install pyflink (pip
>> install apache-flink). For details, you can refer to the document[1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html
>>
>> Best,
>> Xingbo
>>
>> Robert Cullen <ci...@gmail.com> 于2021年3月16日周二 上午2:58写道:
>>
>>> Okay, I added the jars and fixed that exception. However I have a new
>>> exception that is harder to decipher:
>>>
>>> 2021-03-15 14:46:20
>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
>>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
>>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
>>>     at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
>>>     at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
>>>     at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>>>     at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>>>     at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
>>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
>>>     at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
>>>     at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.IOException: error=2, No such file or directory
>>>     at java.lang.UNIXProcess.forkAndExec(Native Method)
>>>     at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
>>>     at java.lang.ProcessImpl.start(ProcessImpl.java:134)
>>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
>>>     ... 20 more
>>>
>>>
>>> On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <rm...@apache.org>
>>> wrote:
>>>
>>>> Hey,
>>>> are you sure the class is in the lib/ folder of all machines /
>>>> instances, and you've restarted Flink after adding the files to lib/ ?
>>>>
>>>> On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <ci...@gmail.com>
>>>> wrote:
>>>>
>>>>> Shuiqiang,
>>>>>
>>>>> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib
>>>>> directory
>>>>>
>>>>> When submitting this job to my flink cluster I’m getting this stack
>>>>> trace at runtime:
>>>>>
>>>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>>>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>> ClassLoader info: URL ClassLoader:
>>>>> Class not resolvable through given classloader.
>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>>>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>>>>>     at java.lang.Class.forName0(Native Method)
>>>>>     at java.lang.Class.forName(Class.java:348)
>>>>>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>>>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>>>>>     ... 9 more
>>>>>
>>>>>
>>>>> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <ac...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Robert,
>>>>>>
>>>>>> You can refer to
>>>>>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>>>>>> for the whole example.
>>>>>>
>>>>>> Best,
>>>>>> Shuiqiang
>>>>>>
>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午4:01写道:
>>>>>>
>>>>>>> Shuiqiang, Can you include the import statements?  thanks.
>>>>>>>
>>>>>>> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <ac...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Robert,
>>>>>>>>
>>>>>>>> Kafka Connector is provided in Python DataStream API since
>>>>>>>> release-1.12.0. And the documentation for it is lacking, we will make it up
>>>>>>>> soon.
>>>>>>>>
>>>>>>>> The following code shows how to apply KafkaConsumers and
>>>>>>>> KafkaProducer:
>>>>>>>> ```
>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>> env.set_parallelism(1)
>>>>>>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>>>>>>
>>>>>>>> # define the schema of the message from kafka, here the data is in
>>>>>>>> json format.
>>>>>>>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
>>>>>>>> 'payPlatform', 'provinceId'],
>>>>>>>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>>>>>>>> Types.INT()])
>>>>>>>> json_row_schema =
>>>>>>>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>>>>>>>
>>>>>>>> # define the kafka connection properties.
>>>>>>>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>>>>>>>> 'pyflink-e2e-source'}
>>>>>>>>
>>>>>>>> # create the KafkaConsumer and KafkaProducer with the specified
>>>>>>>> topic name, serialization/deserialization schema and properties.
>>>>>>>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>>>>>>>> json_row_schema, kafka_props)
>>>>>>>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>>>>>>>> SimpleStringSchema(), kafka_props)
>>>>>>>>
>>>>>>>> # set the kafka source to consume data from earliest offset.
>>>>>>>> kafka_consumer.set_start_from_earliest()
>>>>>>>>
>>>>>>>> # create a DataStream from kafka consumer source
>>>>>>>> ds = env.add_source(kafka_consumer)
>>>>>>>>
>>>>>>>> result_stream = ...
>>>>>>>>
>>>>>>>> # write the result into kafka by a kafka producer sink.
>>>>>>>> result_stream.add_sink(kafka_producer)
>>>>>>>> ```
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Shuiqiang
>>>>>>>>
>>>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>>>>>>>
>>>>>>>>> I’ve scoured the web looking for an example of using a Kafka
>>>>>>>>> source for a DataStream in python. Can someone finish this example?
>>>>>>>>>
>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>> env.set_parallelism(1)
>>>>>>>>> ds = env.from_collection( KAFKA_SOURCE )
>>>>>>>>> ...
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Robert Cullen
>>>>>>>>> 240-475-4490
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Robert Cullen
>>>>>>> 240-475-4490
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Robert Cullen
>>>>> 240-475-4490
>>>>>
>>>>
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>
>
>

-- 
Robert Cullen
240-475-4490

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Dian Fu <di...@gmail.com>.
Hi Robert,

1) Do you mean that when submitting the same job multiple times and it succeed sometimes and hangs sometimes or it only hangs for some specific job?
2) Which deployment mode do you use? 
3) Is it possible to dump the stack trace? It would help us understanding what’s happening.

Thanks,
Dian

> 2021年3月16日 下午11:51,Robert Cullen <ci...@gmail.com> 写道:
> 
> Thanks All,
> 
> I've added python and pyflink to the TM image which fixed the problem.  Now however submitting a python script to the cluster successfully is sporadic; sometimes it completes but most of the time it just hangs.  Not sure what is causing this.
> 
> On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <hxbks2ks@gmail.com <ma...@gmail.com>> wrote:
> Hi,
> 
> From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html>
> 
> Best,
> Xingbo
> 
> Robert Cullen <cinquaterra@gmail.com <ma...@gmail.com>> 于2021年3月16日周二 上午2:58写道:
> Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:
> 
> 2021-03-15 14:46:20
> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
>     at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
>     at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
>     at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>     at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>     at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
>     at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
>     at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: error=2, No such file or directory
>     at java.lang.UNIXProcess.forkAndExec(Native Method)
>     at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
>     at java.lang.ProcessImpl.start(ProcessImpl.java:134)
>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
>     ... 20 more
> 
> On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <rmetzger@apache.org <ma...@apache.org>> wrote:
> Hey,
> are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?
> 
> On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <cinquaterra@gmail.com <ma...@gmail.com>> wrote:
> Shuiqiang,
> 
> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory
> 
> When submitting this job to my flink cluster I’m getting this stack trace at runtime:
> 
> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:348)
>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>     ... 9 more
> 
> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <acqua.csq@gmail.com <ma...@gmail.com>> wrote:
> Hi Robert,
> 
> You can refer to https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py <https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py> for the whole example.
> 
> Best,
> Shuiqiang
> 
> Robert Cullen <cinquaterra@gmail.com <ma...@gmail.com>> 于2021年3月13日周六 上午4:01写道:
> Shuiqiang, Can you include the import statements?  thanks.
> 
> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <acqua.csq@gmail.com <ma...@gmail.com>> wrote:
> Hi Robert,
> 
> Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.
> 
> The following code shows how to apply KafkaConsumers and KafkaProducer:
> ```
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 
> # define the schema of the message from kafka, here the data is in json format.
> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
> Types.INT()])
> json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
> 
> # define the kafka connection properties.
> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id <http://group.id/>': 'pyflink-e2e-source'}
> 
> # create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
> kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
> kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)
> 
> # set the kafka source to consume data from earliest offset.
> kafka_consumer.set_start_from_earliest()
> 
> # create a DataStream from kafka consumer source
> ds = env.add_source(kafka_consumer)  
> 
> result_stream = ...
> 
> # write the result into kafka by a kafka producer sink.
> result_stream.add_sink(kafka_producer)
> ```
> 
> Best,
> Shuiqiang
> 
> Robert Cullen <cinquaterra@gmail.com <ma...@gmail.com>> 于2021年3月13日周六 上午12:56写道:
> I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?
> 
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> ds = env.from_collection( KAFKA_SOURCE )
> ...
> -- 
> Robert Cullen
> 240-475-4490
> 
> 
> -- 
> Robert Cullen
> 240-475-4490
> 
> 
> -- 
> Robert Cullen
> 240-475-4490
> 
> 
> -- 
> Robert Cullen
> 240-475-4490
> 
> 
> -- 
> Robert Cullen
> 240-475-4490


Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Robert Cullen <ci...@gmail.com>.
Thanks All,

I've added python and pyflink to the TM image which fixed the problem.  Now
however submitting a python script to the cluster successfully is sporadic;
sometimes it completes but most of the time it just hangs.  Not sure what
is causing this.

On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <hx...@gmail.com> wrote:

> Hi,
>
> From the error message, I think the problem is no python interpreter on
> your TaskManager machine. You need to install a python 3.5+ interpreter on
> the TM machine, and this python environment needs to install pyflink (pip
> install apache-flink). For details, you can refer to the document[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html
>
> Best,
> Xingbo
>
> Robert Cullen <ci...@gmail.com> 于2021年3月16日周二 上午2:58写道:
>
>> Okay, I added the jars and fixed that exception. However I have a new
>> exception that is harder to decipher:
>>
>> 2021-03-15 14:46:20
>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
>>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
>>     at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
>>     at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
>>     at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>>     at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>>     at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
>>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
>>     at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
>>     at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: error=2, No such file or directory
>>     at java.lang.UNIXProcess.forkAndExec(Native Method)
>>     at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
>>     at java.lang.ProcessImpl.start(ProcessImpl.java:134)
>>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
>>     ... 20 more
>>
>>
>> On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <rm...@apache.org>
>> wrote:
>>
>>> Hey,
>>> are you sure the class is in the lib/ folder of all machines /
>>> instances, and you've restarted Flink after adding the files to lib/ ?
>>>
>>> On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <ci...@gmail.com>
>>> wrote:
>>>
>>>> Shuiqiang,
>>>>
>>>> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib
>>>> directory
>>>>
>>>> When submitting this job to my flink cluster I’m getting this stack
>>>> trace at runtime:
>>>>
>>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>> ClassLoader info: URL ClassLoader:
>>>> Class not resolvable through given classloader.
>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>>>>     at java.lang.Class.forName0(Native Method)
>>>>     at java.lang.Class.forName(Class.java:348)
>>>>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>>>>     ... 9 more
>>>>
>>>>
>>>> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <ac...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Robert,
>>>>>
>>>>> You can refer to
>>>>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>>>>> for the whole example.
>>>>>
>>>>> Best,
>>>>> Shuiqiang
>>>>>
>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午4:01写道:
>>>>>
>>>>>> Shuiqiang, Can you include the import statements?  thanks.
>>>>>>
>>>>>> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <ac...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Robert,
>>>>>>>
>>>>>>> Kafka Connector is provided in Python DataStream API since
>>>>>>> release-1.12.0. And the documentation for it is lacking, we will make it up
>>>>>>> soon.
>>>>>>>
>>>>>>> The following code shows how to apply KafkaConsumers and
>>>>>>> KafkaProducer:
>>>>>>> ```
>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>> env.set_parallelism(1)
>>>>>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>>>>>
>>>>>>> # define the schema of the message from kafka, here the data is in
>>>>>>> json format.
>>>>>>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
>>>>>>> 'payPlatform', 'provinceId'],
>>>>>>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>>>>>>> Types.INT()])
>>>>>>> json_row_schema =
>>>>>>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>>>>>>
>>>>>>> # define the kafka connection properties.
>>>>>>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>>>>>>> 'pyflink-e2e-source'}
>>>>>>>
>>>>>>> # create the KafkaConsumer and KafkaProducer with the specified
>>>>>>> topic name, serialization/deserialization schema and properties.
>>>>>>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>>>>>>> json_row_schema, kafka_props)
>>>>>>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>>>>>>> SimpleStringSchema(), kafka_props)
>>>>>>>
>>>>>>> # set the kafka source to consume data from earliest offset.
>>>>>>> kafka_consumer.set_start_from_earliest()
>>>>>>>
>>>>>>> # create a DataStream from kafka consumer source
>>>>>>> ds = env.add_source(kafka_consumer)
>>>>>>>
>>>>>>> result_stream = ...
>>>>>>>
>>>>>>> # write the result into kafka by a kafka producer sink.
>>>>>>> result_stream.add_sink(kafka_producer)
>>>>>>> ```
>>>>>>>
>>>>>>> Best,
>>>>>>> Shuiqiang
>>>>>>>
>>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>>>>>>
>>>>>>>> I’ve scoured the web looking for an example of using a Kafka source
>>>>>>>> for a DataStream in python. Can someone finish this example?
>>>>>>>>
>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>> env.set_parallelism(1)
>>>>>>>> ds = env.from_collection( KAFKA_SOURCE )
>>>>>>>> ...
>>>>>>>>
>>>>>>>> --
>>>>>>>> Robert Cullen
>>>>>>>> 240-475-4490
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Robert Cullen
>>>>>> 240-475-4490
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Robert Cullen
>>>> 240-475-4490
>>>>
>>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

From the error message, I think the problem is no python interpreter on
your TaskManager machine. You need to install a python 3.5+ interpreter on
the TM machine, and this python environment needs to install pyflink (pip
install apache-flink). For details, you can refer to the document[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/installation.html

Best,
Xingbo

Robert Cullen <ci...@gmail.com> 于2021年3月16日周二 上午2:58写道:

> Okay, I added the jars and fixed that exception. However I have a new
> exception that is harder to decipher:
>
> 2021-03-15 14:46:20
> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
>     at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
>     at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
>     at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
>     at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>     at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>     at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
>     at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
>     at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
>     at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: error=2, No such file or directory
>     at java.lang.UNIXProcess.forkAndExec(Native Method)
>     at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
>     at java.lang.ProcessImpl.start(ProcessImpl.java:134)
>     at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
>     ... 20 more
>
>
> On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hey,
>> are you sure the class is in the lib/ folder of all machines / instances,
>> and you've restarted Flink after adding the files to lib/ ?
>>
>> On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <ci...@gmail.com>
>> wrote:
>>
>>> Shuiqiang,
>>>
>>> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib
>>> directory
>>>
>>> When submitting this job to my flink cluster I’m getting this stack
>>> trace at runtime:
>>>
>>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>> ClassLoader info: URL ClassLoader:
>>> Class not resolvable through given classloader.
>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>>     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>>>     at java.lang.Class.forName0(Native Method)
>>>     at java.lang.Class.forName(Class.java:348)
>>>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>>>     ... 9 more
>>>
>>>
>>> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <ac...@gmail.com>
>>> wrote:
>>>
>>>> Hi Robert,
>>>>
>>>> You can refer to
>>>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>>>> for the whole example.
>>>>
>>>> Best,
>>>> Shuiqiang
>>>>
>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午4:01写道:
>>>>
>>>>> Shuiqiang, Can you include the import statements?  thanks.
>>>>>
>>>>> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <ac...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Robert,
>>>>>>
>>>>>> Kafka Connector is provided in Python DataStream API since
>>>>>> release-1.12.0. And the documentation for it is lacking, we will make it up
>>>>>> soon.
>>>>>>
>>>>>> The following code shows how to apply KafkaConsumers and
>>>>>> KafkaProducer:
>>>>>> ```
>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>> env.set_parallelism(1)
>>>>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>>>>
>>>>>> # define the schema of the message from kafka, here the data is in
>>>>>> json format.
>>>>>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
>>>>>> 'payPlatform', 'provinceId'],
>>>>>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>>>>>> Types.INT()])
>>>>>> json_row_schema =
>>>>>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>>>>>
>>>>>> # define the kafka connection properties.
>>>>>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>>>>>> 'pyflink-e2e-source'}
>>>>>>
>>>>>> # create the KafkaConsumer and KafkaProducer with the specified topic
>>>>>> name, serialization/deserialization schema and properties.
>>>>>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>>>>>> json_row_schema, kafka_props)
>>>>>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>>>>>> SimpleStringSchema(), kafka_props)
>>>>>>
>>>>>> # set the kafka source to consume data from earliest offset.
>>>>>> kafka_consumer.set_start_from_earliest()
>>>>>>
>>>>>> # create a DataStream from kafka consumer source
>>>>>> ds = env.add_source(kafka_consumer)
>>>>>>
>>>>>> result_stream = ...
>>>>>>
>>>>>> # write the result into kafka by a kafka producer sink.
>>>>>> result_stream.add_sink(kafka_producer)
>>>>>> ```
>>>>>>
>>>>>> Best,
>>>>>> Shuiqiang
>>>>>>
>>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>>>>>
>>>>>>> I’ve scoured the web looking for an example of using a Kafka source
>>>>>>> for a DataStream in python. Can someone finish this example?
>>>>>>>
>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>> env.set_parallelism(1)
>>>>>>> ds = env.from_collection( KAFKA_SOURCE )
>>>>>>> ...
>>>>>>>
>>>>>>> --
>>>>>>> Robert Cullen
>>>>>>> 240-475-4490
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Robert Cullen
>>>>> 240-475-4490
>>>>>
>>>>
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Robert Cullen <ci...@gmail.com>.
Okay, I added the jars and fixed that exception. However I have a new
exception that is harder to decipher:

2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2,
No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 20 more


On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <rm...@apache.org> wrote:

> Hey,
> are you sure the class is in the lib/ folder of all machines / instances,
> and you've restarted Flink after adding the files to lib/ ?
>
> On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <ci...@gmail.com>
> wrote:
>
>> Shuiqiang,
>>
>> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory
>>
>> When submitting this job to my flink cluster I’m getting this stack trace
>> at runtime:
>>
>> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>> ClassLoader info: URL ClassLoader:
>> Class not resolvable through given classloader.
>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>>     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>>     at java.lang.Class.forName0(Native Method)
>>     at java.lang.Class.forName(Class.java:348)
>>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>>     ... 9 more
>>
>>
>> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <ac...@gmail.com>
>> wrote:
>>
>>> Hi Robert,
>>>
>>> You can refer to
>>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>>> for the whole example.
>>>
>>> Best,
>>> Shuiqiang
>>>
>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午4:01写道:
>>>
>>>> Shuiqiang, Can you include the import statements?  thanks.
>>>>
>>>> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <ac...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Robert,
>>>>>
>>>>> Kafka Connector is provided in Python DataStream API since
>>>>> release-1.12.0. And the documentation for it is lacking, we will make it up
>>>>> soon.
>>>>>
>>>>> The following code shows how to apply KafkaConsumers and KafkaProducer:
>>>>> ```
>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>> env.set_parallelism(1)
>>>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>>>
>>>>> # define the schema of the message from kafka, here the data is in
>>>>> json format.
>>>>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
>>>>> 'payPlatform', 'provinceId'],
>>>>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>>>>> Types.INT()])
>>>>> json_row_schema =
>>>>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>>>>
>>>>> # define the kafka connection properties.
>>>>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>>>>> 'pyflink-e2e-source'}
>>>>>
>>>>> # create the KafkaConsumer and KafkaProducer with the specified topic
>>>>> name, serialization/deserialization schema and properties.
>>>>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>>>>> json_row_schema, kafka_props)
>>>>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>>>>> SimpleStringSchema(), kafka_props)
>>>>>
>>>>> # set the kafka source to consume data from earliest offset.
>>>>> kafka_consumer.set_start_from_earliest()
>>>>>
>>>>> # create a DataStream from kafka consumer source
>>>>> ds = env.add_source(kafka_consumer)
>>>>>
>>>>> result_stream = ...
>>>>>
>>>>> # write the result into kafka by a kafka producer sink.
>>>>> result_stream.add_sink(kafka_producer)
>>>>> ```
>>>>>
>>>>> Best,
>>>>> Shuiqiang
>>>>>
>>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>>>>
>>>>>> I’ve scoured the web looking for an example of using a Kafka source
>>>>>> for a DataStream in python. Can someone finish this example?
>>>>>>
>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>> env.set_parallelism(1)
>>>>>> ds = env.from_collection( KAFKA_SOURCE )
>>>>>> ...
>>>>>>
>>>>>> --
>>>>>> Robert Cullen
>>>>>> 240-475-4490
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Robert Cullen
>>>> 240-475-4490
>>>>
>>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Robert Metzger <rm...@apache.org>.
Hey,
are you sure the class is in the lib/ folder of all machines / instances,
and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <ci...@gmail.com> wrote:

> Shuiqiang,
>
> I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory
>
> When submitting this job to my flink cluster I’m getting this stack trace
> at runtime:
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
>     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
>     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
>     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
>     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
>     at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
>     at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
>     at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>     at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:348)
>     at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>     at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>     at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
>     at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
>     ... 9 more
>
>
> On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <ac...@gmail.com>
> wrote:
>
>> Hi Robert,
>>
>> You can refer to
>> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
>> for the whole example.
>>
>> Best,
>> Shuiqiang
>>
>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午4:01写道:
>>
>>> Shuiqiang, Can you include the import statements?  thanks.
>>>
>>> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <ac...@gmail.com>
>>> wrote:
>>>
>>>> Hi Robert,
>>>>
>>>> Kafka Connector is provided in Python DataStream API since
>>>> release-1.12.0. And the documentation for it is lacking, we will make it up
>>>> soon.
>>>>
>>>> The following code shows how to apply KafkaConsumers and KafkaProducer:
>>>> ```
>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>> env.set_parallelism(1)
>>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>>
>>>> # define the schema of the message from kafka, here the data is in json
>>>> format.
>>>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
>>>> 'payPlatform', 'provinceId'],
>>>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>>>> Types.INT()])
>>>> json_row_schema =
>>>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>>>
>>>> # define the kafka connection properties.
>>>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>>>> 'pyflink-e2e-source'}
>>>>
>>>> # create the KafkaConsumer and KafkaProducer with the specified topic
>>>> name, serialization/deserialization schema and properties.
>>>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>>>> json_row_schema, kafka_props)
>>>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>>>> SimpleStringSchema(), kafka_props)
>>>>
>>>> # set the kafka source to consume data from earliest offset.
>>>> kafka_consumer.set_start_from_earliest()
>>>>
>>>> # create a DataStream from kafka consumer source
>>>> ds = env.add_source(kafka_consumer)
>>>>
>>>> result_stream = ...
>>>>
>>>> # write the result into kafka by a kafka producer sink.
>>>> result_stream.add_sink(kafka_producer)
>>>> ```
>>>>
>>>> Best,
>>>> Shuiqiang
>>>>
>>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>>>
>>>>> I’ve scoured the web looking for an example of using a Kafka source
>>>>> for a DataStream in python. Can someone finish this example?
>>>>>
>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>> env.set_parallelism(1)
>>>>> ds = env.from_collection( KAFKA_SOURCE )
>>>>> ...
>>>>>
>>>>> --
>>>>> Robert Cullen
>>>>> 240-475-4490
>>>>>
>>>>
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Robert Cullen <ci...@gmail.com>.
Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace
at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot load user class:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more


On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <ac...@gmail.com> wrote:

> Hi Robert,
>
> You can refer to
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
> for the whole example.
>
> Best,
> Shuiqiang
>
> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午4:01写道:
>
>> Shuiqiang, Can you include the import statements?  thanks.
>>
>> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <ac...@gmail.com>
>> wrote:
>>
>>> Hi Robert,
>>>
>>> Kafka Connector is provided in Python DataStream API since
>>> release-1.12.0. And the documentation for it is lacking, we will make it up
>>> soon.
>>>
>>> The following code shows how to apply KafkaConsumers and KafkaProducer:
>>> ```
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>
>>> # define the schema of the message from kafka, here the data is in json
>>> format.
>>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
>>> 'payPlatform', 'provinceId'],
>>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>>> Types.INT()])
>>> json_row_schema =
>>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>>
>>> # define the kafka connection properties.
>>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>>> 'pyflink-e2e-source'}
>>>
>>> # create the KafkaConsumer and KafkaProducer with the specified topic
>>> name, serialization/deserialization schema and properties.
>>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>>> json_row_schema, kafka_props)
>>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>>> SimpleStringSchema(), kafka_props)
>>>
>>> # set the kafka source to consume data from earliest offset.
>>> kafka_consumer.set_start_from_earliest()
>>>
>>> # create a DataStream from kafka consumer source
>>> ds = env.add_source(kafka_consumer)
>>>
>>> result_stream = ...
>>>
>>> # write the result into kafka by a kafka producer sink.
>>> result_stream.add_sink(kafka_producer)
>>> ```
>>>
>>> Best,
>>> Shuiqiang
>>>
>>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>>
>>>> I’ve scoured the web looking for an example of using a Kafka source for
>>>> a DataStream in python. Can someone finish this example?
>>>>
>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>> env.set_parallelism(1)
>>>> ds = env.from_collection( KAFKA_SOURCE )
>>>> ...
>>>>
>>>> --
>>>> Robert Cullen
>>>> 240-475-4490
>>>>
>>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi Robert,

You can refer to
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
for the whole example.

Best,
Shuiqiang

Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午4:01写道:

> Shuiqiang, Can you include the import statements?  thanks.
>
> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <ac...@gmail.com>
> wrote:
>
>> Hi Robert,
>>
>> Kafka Connector is provided in Python DataStream API since
>> release-1.12.0. And the documentation for it is lacking, we will make it up
>> soon.
>>
>> The following code shows how to apply KafkaConsumers and KafkaProducer:
>> ```
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>
>> # define the schema of the message from kafka, here the data is in json
>> format.
>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
>> 'payPlatform', 'provinceId'],
>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>> Types.INT()])
>> json_row_schema =
>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>
>> # define the kafka connection properties.
>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>> 'pyflink-e2e-source'}
>>
>> # create the KafkaConsumer and KafkaProducer with the specified topic
>> name, serialization/deserialization schema and properties.
>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>> json_row_schema, kafka_props)
>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>> SimpleStringSchema(), kafka_props)
>>
>> # set the kafka source to consume data from earliest offset.
>> kafka_consumer.set_start_from_earliest()
>>
>> # create a DataStream from kafka consumer source
>> ds = env.add_source(kafka_consumer)
>>
>> result_stream = ...
>>
>> # write the result into kafka by a kafka producer sink.
>> result_stream.add_sink(kafka_producer)
>> ```
>>
>> Best,
>> Shuiqiang
>>
>> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>
>>> I’ve scoured the web looking for an example of using a Kafka source for
>>> a DataStream in python. Can someone finish this example?
>>>
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> ds = env.from_collection( KAFKA_SOURCE )
>>> ...
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Robert Cullen <ci...@gmail.com>.
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <ac...@gmail.com> wrote:

> Hi Robert,
>
> Kafka Connector is provided in Python DataStream API since release-1.12.0.
> And the documentation for it is lacking, we will make it up soon.
>
> The following code shows how to apply KafkaConsumers and KafkaProducer:
> ```
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>
> # define the schema of the message from kafka, here the data is in json
> format.
> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
> 'payPlatform', 'provinceId'],
> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
> Types.INT()])
> json_row_schema =
> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>
> # define the kafka connection properties.
> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
> 'pyflink-e2e-source'}
>
> # create the KafkaConsumer and KafkaProducer with the specified topic
> name, serialization/deserialization schema and properties.
> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
> json_row_schema, kafka_props)
> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
> SimpleStringSchema(), kafka_props)
>
> # set the kafka source to consume data from earliest offset.
> kafka_consumer.set_start_from_earliest()
>
> # create a DataStream from kafka consumer source
> ds = env.add_source(kafka_consumer)
>
> result_stream = ...
>
> # write the result into kafka by a kafka producer sink.
> result_stream.add_sink(kafka_producer)
> ```
>
> Best,
> Shuiqiang
>
> Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>
>> I’ve scoured the web looking for an example of using a Kafka source for a
>> DataStream in python. Can someone finish this example?
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> ds = env.from_collection( KAFKA_SOURCE )
>> ...
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490

Re: Python StreamExecutionEnvironment from_collection Kafka example

Posted by Shuiqiang Chen <ac...@gmail.com>.
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0.
And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json
format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema =
JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name,
serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema,
kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink",
SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer)

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <ci...@gmail.com> 于2021年3月13日周六 上午12:56写道:

> I’ve scoured the web looking for an example of using a Kafka source for a
> DataStream in python. Can someone finish this example?
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> ds = env.from_collection( KAFKA_SOURCE )
> ...
>
> --
> Robert Cullen
> 240-475-4490
>