You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "harshit.varshney@iktara.ai" <Ha...@iktara.ai> on 2022/05/09 14:03:18 UTC
FW: Rabbitmq Connection error with Flink version(1.15.0)
From: harshit.varshney@iktara.ai [mailto:Harshit.Varshney@iktara.ai]
Sent: Monday, May 9, 2022 7:33 PM
To: 'user@flink.apache.org'
Cc: 'harshit.varshney@iktara.ai'
Subject: Rabbitmq Connection error with Flink version(1.15.0)
Dear Team,
I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.15.0 & using reference code from
pyflink reference code.
I am getting following error .
Exception in thread "Thread-4" java.lang.NoClassDefFoundError:
com/rabbitmq/client/ConnectionFactory
Caused by: java.lang.ClassNotFoundException:
com.rabbitmq.client.ConnectionFactory
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File
"C:\Users\Admin\PycharmProjects\pythonProject15\venv\lib\site-packages\py4j\
java_gateway.py", line 1159, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
py4j.protocol.Py4JError:
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Bu
ilder does not exist in the JVM
Below is my code for reference.
from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer, RMQConnectionConfig, RMQSource
import logging
import os
import sys
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import SimpleStringSchema
def main():
env = StreamExecutionEnvironment.get_execution_environment()
# checkpointing is required for exactly-once or at-least-once guarantees
env.enable_checkpointing(100)
rabbitmq_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'flink-connector-rabbitmq-1.15.0.jar')
env.add_jars("file:///{} <file:///\\%7b%7d> ".format(rabbitmq_jar))
connection_config = RMQConnectionConfig.Builder() \
.set_host("localhost") \
.set_port(5672) \
.build()
stream = env \
.add_source(RMQSource(
connection_config,
'hello',
True,
SimpleStringSchema(),
)) \
.set_parallelism(1)
stream.print()
env.execute('main')
if __name__ == '__main__':
main()
Thanks,
Harshit
Re: FW: Rabbitmq Connection error with Flink version(1.15.0)
Posted by Dian Fu <di...@gmail.com>.
Hi Harshit,
You should use
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-rabbitmq/1.15.0/flink-sql-connector-rabbitmq-1.15.0.jar
which is a fat jar containing all the dependencies.
Regards,
Dian
On Mon, May 9, 2022 at 10:05 PM harshit.varshney@iktara.ai <
Harshit.Varshney@iktara.ai> wrote:
>
>
>
>
> *From:* harshit.varshney@iktara.ai [mailto:Harshit.Varshney@iktara.ai]
> *Sent:* Monday, May 9, 2022 7:33 PM
> *To:* 'user@flink.apache.org'
> *Cc:* 'harshit.varshney@iktara.ai'
> *Subject:* Rabbitmq Connection error with Flink version(1.15.0)
>
>
>
> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.15.0 & using reference code from
> pyflink reference code.
>
>
>
> I am getting following error .
>
> Exception in thread "Thread-4" java.lang.NoClassDefFoundError:
> com/rabbitmq/client/ConnectionFactory
>
> Caused by: java.lang.ClassNotFoundException:
> com.rabbitmq.client.ConnectionFactory
>
> ERROR:root:Exception while sending command.
>
> Traceback (most recent call last):
>
> File
> "C:\Users\Admin\PycharmProjects\pythonProject15\venv\lib\site-packages\py4j\java_gateway.py",
> line 1159, in send_command
>
> raise Py4JNetworkError("Answer from Java side is empty")
>
> py4j.protocol.Py4JNetworkError: Answer from Java side is empty
>
> py4j.protocol.Py4JError:
> org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Builder
> does not exist in the JVM
>
>
>
> Below is my code for reference.
>
>
>
> from pyflink.datastream.connectors import FlinkKafkaProducer,
> FlinkKafkaConsumer, RMQConnectionConfig, RMQSource
>
> import logging
>
> import os
>
> import sys
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink.common import SimpleStringSchema
>
> def main():
>
> env = StreamExecutionEnvironment.get_execution_environment()
>
> # checkpointing is required for exactly-once or at-least-once
> guarantees
>
>
>
> env.enable_checkpointing(100)
>
> rabbitmq_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
> 'flink-connector-rabbitmq-1.15.0.jar')
>
> env.add_jars("file:///{}".format(rabbitmq_jar))
>
>
>
> connection_config = RMQConnectionConfig.Builder() \
>
> .set_host("localhost") \
>
> .set_port(5672) \
>
> .build()
>
>
>
>
>
> stream = env \
>
> .add_source(RMQSource(
>
> connection_config,
>
> 'hello',
>
> True,
>
> SimpleStringSchema(),
>
> )) \
>
> .set_parallelism(1)
>
>
>
> stream.print()
>
> env.execute(‘main’)
>
>
>
>
>
> if __name__ == '__main__':
>
> main()
>
>
>
>
>
> Thanks,
>
> Harshit
>
>
>
>
>