You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kostas Evangelou <ev...@gmail.com> on 2018/10/11 10:18:20 UTC

Apache Flink: Kafka connector in Python streaming API, “Cannot load user class”

Hey all,

Thank you so much for your efforts. I've already posted this question on
stack overflow, but thought I should ask here as well.

I am trying out Flink's new Python streaming API and attempting to run my
script with ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py.
The python script is fairly straightforward, I am just trying to consume
from an existing topic and send everything to stdout (or the *.out file in
the log directory where the output method emits data by default).

import glob

import os

import sys

from java.util import Properties

from org.apache.flink.streaming.api.functions.source import SourceFunction

from org.apache.flink.streaming.api.collector.selector import OutputSelector

from org.apache.flink.api.common.serialization import SimpleStringSchema


directories=['/home/user/flink/flink-1.6.1/lib']

for directory in directories:

    for jar in glob.glob(os.path.join(directory,'*.jar')):

                sys.path.append(jar)


from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09


props = Properties()

config = {"bootstrap_servers": "localhost:9092",

          "group_id": "flink_test",

          "topics": ["TopicCategory-TopicName"]}

props.setProperty("bootstrap.servers", config['bootstrap_servers'])

props.setProperty("group_id", config['group_id'])

props.setProperty("zookeeper.connect", "localhost:2181")


def main(factory):

    consumer = FlinkKafkaConsumer09([config["topics"]],
SimpleStringSchema(), props)


    env = factory.get_execution_environment()

    env.add_java_source(consumer) \

        .output()

    env.execute()

I grabbed a handful of jar files from the maven repos, namely
flink-connector-kafka-0.9_2.11-1.6.1.jar,
flink-connector-kafka-base_2.11-1.6.1.jar and kafka-clients-0.9.0.1.jarand
copied them in Flink's lib directory. Unless I misunderstood the
documentation, this should suffice for Flink to load the kafka connector.
Indeed, if I remove any of these jars the import fails, but this doesn't
seem to be enough to actually invoke the plan. Adding a for loop to
dynamically add these to sys.path didn't work either. Here's what gets
printed in the console:

Starting execution of program

Failed to run plan: null

Traceback (most recent call last):

  File "<string>", line 1, in <module>

  File
"/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py",
line 32, in main

    at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)

    at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)

    at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)

    at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)

    at
org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)


org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: bbcc0cb2c4fe6e3012d228b06b270eba)


The program didn't contain a Flink job. Perhaps you forgot to call
execute() on the execution environment.

This is what I see in the logs:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09

ClassLoader info: URL ClassLoader:

    file:
'/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887'
(valid JAR)

Class not resolvable through given classloader.

    at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)

    at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)

    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

    at java.lang.Thread.run(Thread.java:748)

Is there a way to fix this and make the connector available to Python?

Many thanks,
Kostas

Re: Apache Flink: Kafka connector in Python streaming API, “Cannot load user class”

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Kostas,

As far as I know you cannot just use java classes from within python
API. I think Python API does not provide wrapper for kafka connector. I
am adding Chesnay to cc to correct me if I am wrong.

Best,

Dawid


On 11/10/18 12:18, Kostas Evangelou wrote:
> Hey all, 
>
> Thank you so much for your efforts. I've already posted this question
> on stack overflow, but thought I should ask here as well.
>
> I am trying out Flink's new Python streaming API and attempting to run
> my script with |./flink-1.6.1/bin/pyflink-stream.sh
> examples/read_from_kafka.py|. The python script is fairly
> straightforward, I am just trying to consume from an existing topic
> and send everything to stdout (or the *.out file in the log directory
> where the output method emits data by default).
>
> import glob
>
> import os
>
> import sys
>
> from java.util import Properties
>
> from org.apache.flink.streaming.api.functions.source import SourceFunction
>
> from org.apache.flink.streaming.api.collector.selector import
> OutputSelector
>
> from org.apache.flink.api.common.serialization import SimpleStringSchema
>
>
> directories=['/home/user/flink/flink-1.6.1/lib']
>
> for directory in directories:
>
>     for jar in glob.glob(os.path.join(directory,'*.jar')):
>
>                 sys.path.append(jar)
>
>
> from org.apache.flink.streaming.connectors.kafka import
> FlinkKafkaConsumer09
>
>
> props = Properties()
>
> config = {"bootstrap_servers": "localhost:9092",
>
>           "group_id": "flink_test",
>
>           "topics": ["TopicCategory-TopicName"]}
>
> props.setProperty("bootstrap.servers", config['bootstrap_servers'])
>
> props.setProperty("group_id", config['group_id'])
>
> props.setProperty("zookeeper.connect", "localhost:2181")
>
>
> def main(factory):
>
>     consumer = FlinkKafkaConsumer09([config["topics"]],
> SimpleStringSchema(), props)
>
>
>     env = factory.get_execution_environment()
>
>     env.add_java_source(consumer) \
>
>         .output()
>
>     env.execute()
>
>
> I grabbed a handful of jar files from the maven repos,
> namely |flink-connector-kafka-0.9_2.11-1.6.1.jar|, |flink-connector-kafka-base_2.11-1.6.1.jar| and |kafka-clients-0.9.0.1.jar|and
> copied them in Flink's |lib| directory. Unless I misunderstood the
> documentation, this should suffice for Flink to load the kafka
> connector. Indeed, if I remove any of these jars the import fails, but
> this doesn't seem to be enough to actually invoke the plan. Adding a
> for loop to dynamically add these to |sys.path| didn't work either.
> Here's what gets printed in the console:
>
> Starting execution of program
>
> Failed to run plan: null
>
> Traceback (most recent call last):
>
>   File "<string>", line 1, in <module>
>
>   File
> "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py",
> line 32, in main
>
>     at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
>
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
>
>     at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>
>     at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>
>     at
> org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>
> org.apache.flink.client.program.ProgramInvocationException:
> org.apache.flink.client.program.ProgramInvocationException: Job
> failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)
>
>
> The program didn't contain a Flink job. Perhaps you forgot to call
> execute() on the execution environment.
>
>
> This is what I see in the logs:
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> load user class:   
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
>
> ClassLoader info: URL ClassLoader:
>
>     file:
> '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887'
> (valid JAR)
>
> Class not resolvable through given classloader.
>
>     at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
>
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
>
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>     at java.lang.Thread.run(Thread.java:748)
>
>
> Is there a way to fix this and make the connector available to Python?
>
> Many thanks,
> Kostas
>