You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "harshit.varshney@iktara.ai" <Ha...@iktara.ai> on 2022/05/09 14:03:18 UTC

FW: Rabbitmq Connection error with Flink version(1.15.0)

 

 

From: harshit.varshney@iktara.ai [mailto:Harshit.Varshney@iktara.ai] 
Sent: Monday, May 9, 2022 7:33 PM
To: 'user@flink.apache.org'
Cc: 'harshit.varshney@iktara.ai'
Subject: Rabbitmq Connection error with Flink version(1.15.0)

 

Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.15.0 & using reference code from
pyflink reference code.

 

I am getting following error . 

Exception in thread "Thread-4" java.lang.NoClassDefFoundError:
com/rabbitmq/client/ConnectionFactory

Caused by: java.lang.ClassNotFoundException:
com.rabbitmq.client.ConnectionFactory

ERROR:root:Exception while sending command.

Traceback (most recent call last):

  File
"C:\Users\Admin\PycharmProjects\pythonProject15\venv\lib\site-packages\py4j\
java_gateway.py", line 1159, in send_command

    raise Py4JNetworkError("Answer from Java side is empty")

py4j.protocol.Py4JNetworkError: Answer from Java side is empty

py4j.protocol.Py4JError:
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Bu
ilder does not exist in the JVM

 

Below is my code for reference.

 

from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer, RMQConnectionConfig, RMQSource

import logging

import os

import sys

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.common import SimpleStringSchema

def main():

    env = StreamExecutionEnvironment.get_execution_environment()

    # checkpointing is required for exactly-once or at-least-once guarantees

 

    env.enable_checkpointing(100)

    rabbitmq_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),

                                'flink-connector-rabbitmq-1.15.0.jar')

    env.add_jars("file:///{} <file:///\\%7b%7d> ".format(rabbitmq_jar))

 

    connection_config = RMQConnectionConfig.Builder() \

        .set_host("localhost") \

        .set_port(5672) \

        .build()

 

 

    stream = env \

        .add_source(RMQSource(

        connection_config,

        'hello',

        True,

        SimpleStringSchema(),

    )) \

        .set_parallelism(1)

 

    stream.print()

    env.execute('main')

 

 

if __name__ == '__main__':

    main()

 

 

Thanks,

Harshit

 

 


Re: FW: Rabbitmq Connection error with Flink version(1.15.0)

Posted by Dian Fu <di...@gmail.com>.
Hi Harshit,

You should use
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-rabbitmq/1.15.0/flink-sql-connector-rabbitmq-1.15.0.jar
which is a fat jar containing all the dependencies.

Regards,
Dian

On Mon, May 9, 2022 at 10:05 PM harshit.varshney@iktara.ai <
Harshit.Varshney@iktara.ai> wrote:

>
>
>
>
> *From:* harshit.varshney@iktara.ai [mailto:Harshit.Varshney@iktara.ai]
> *Sent:* Monday, May 9, 2022 7:33 PM
> *To:* 'user@flink.apache.org'
> *Cc:* 'harshit.varshney@iktara.ai'
> *Subject:* Rabbitmq Connection error with Flink version(1.15.0)
>
>
>
> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.15.0 & using reference code from
> pyflink reference code.
>
>
>
> I am getting following error .
>
> Exception in thread "Thread-4" java.lang.NoClassDefFoundError:
> com/rabbitmq/client/ConnectionFactory
>
> Caused by: java.lang.ClassNotFoundException:
> com.rabbitmq.client.ConnectionFactory
>
> ERROR:root:Exception while sending command.
>
> Traceback (most recent call last):
>
>   File
> "C:\Users\Admin\PycharmProjects\pythonProject15\venv\lib\site-packages\py4j\java_gateway.py",
> line 1159, in send_command
>
>     raise Py4JNetworkError("Answer from Java side is empty")
>
> py4j.protocol.Py4JNetworkError: Answer from Java side is empty
>
> py4j.protocol.Py4JError:
> org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Builder
> does not exist in the JVM
>
>
>
> Below is my code for reference.
>
>
>
> from pyflink.datastream.connectors import FlinkKafkaProducer,
> FlinkKafkaConsumer, RMQConnectionConfig, RMQSource
>
> import logging
>
> import os
>
> import sys
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink.common import SimpleStringSchema
>
> def main():
>
>     env = StreamExecutionEnvironment.get_execution_environment()
>
>     # checkpointing is required for exactly-once or at-least-once
> guarantees
>
>
>
>     env.enable_checkpointing(100)
>
>     rabbitmq_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>                                 'flink-connector-rabbitmq-1.15.0.jar')
>
>     env.add_jars("file:///{}".format(rabbitmq_jar))
>
>
>
>     connection_config = RMQConnectionConfig.Builder() \
>
>         .set_host("localhost") \
>
>         .set_port(5672) \
>
>         .build()
>
>
>
>
>
>     stream = env \
>
>         .add_source(RMQSource(
>
>         connection_config,
>
>         'hello',
>
>         True,
>
>         SimpleStringSchema(),
>
>     )) \
>
>         .set_parallelism(1)
>
>
>
>     stream.print()
>
>     env.execute(‘main’)
>
>
>
>
>
> if __name__ == '__main__':
>
>     main()
>
>
>
>
>
> Thanks,
>
> Harshit
>
>
>
>
>