You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "joris.vanagtmaal" <jo...@wartsila.com> on 2021/02/08 14:46:15 UTC

Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

I'm trying to read data from my eventhub in Azure, but i end up with the
Flink error message 'findAndCreateTableSource failed'

using Flink 1.13-Snapshot

source_ddl = f"""CREATE TABLE dms_source(
				x_value VARCHAR
			 ) WITH (
			  'connector.type' = 'Kafka',
			  'connector.version' = 'universal',
			  'connector.partition' = '0',
			  'connector.sasl.jaas.config'=
'org.apache.kafka.common.security.plain.PlainLoginModule required
username="$ConnectionString"
password="Endpoint=sb://**EVEN_HUB_NAME**.servicebus.windows.net/;SharedAccessKeyName=**KEY_
NAME**;SharedAccessKey=***PRIMARY_KEY***;EntityPath=**EVENT_HUB_INSTANCE_NAME**";',
			  'connector.sasl.mechanism' = 'PLAIN',
			  'connector.security.protocol' = 'SASL_SSL',
			  'connector.properties.bootstrap.servers' =
'**EVEN_HUB_NAME**.servicebus.windows.net:9093',
			  'connector.properties.group.id' = '$Default',
			  'connector.startup-mode' = 'latest-offset',
			  'format.type' = 'json')
			"""

Any tips on how to debug this?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Posted by "joris.vanagtmaal" <jo...@wartsila.com>.
Hi Arvid,

I'm currently running PyFlink locally in the JVM with a parallelism of 1,
and the same file works fine if i direct it to a Kafka cluster (running in a
local docker instance). 

I assumed that the JAR pipeline definition in the python file would make
sure they are made available on the cluster (or in this case the local JVM),
right?

But if i need to verify the JVM has access to the JAR files, i need a bit of
guidance how to do that.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Posted by Arvid Heise <ar...@apache.org>.
Hi Joris,

Are you sure that all nodes have access to these jars? Usually, shared
resources reside on some kind of distributed file system or network
directory.

I'm pulling in Dian who can probably help better.

Best,

Arvid

On Tue, Feb 9, 2021 at 1:50 PM joris.vanagtmaal <
joris.vanagtmaal@wartsila.com> wrote:

> hi Yun,
>
> thanks for the help!
>
> if i direct the Kafka connector in the DDL to a local Kafka cluster, it
> works fine. So i assume access to the JAR files should not be the issue.
>
> This is how i referred to the JAR files from Python:
> t_env.get_config().get_configuration().set_string("pipeline.jars",
>
> "file:///Users/jag002/flinkjars/flink-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/kafka-clients-2.7.0.jar")
>
> All the best,
> Joris
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Posted by "joris.vanagtmaal" <jo...@wartsila.com>.
hi Yun,

thanks for the help!

if i direct the Kafka connector in the DDL to a local Kafka cluster, it
works fine. So i assume access to the JAR files should not be the issue. 

This is how i referred to the JAR files from Python:
t_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/jag002/flinkjars/flink-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/kafka-clients-2.7.0.jar")

All the best, 
Joris



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Posted by Yun Gao <yu...@aliyun.com>.
Hi,

Could you have a try to add the jar via python configuration explicitly? It might refer to [1].

Best,
 Yun

[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/dependency_management.html#java-dependency-in-python-program 
------------------------------------------------------------------
Sender:joris.vanagtmaal<jo...@wartsila.com>
Date:2021/02/09 15:50:27
Recipient:<us...@flink.apache.org>
Theme:Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

My JAR files included in the same folder i run the python code:

flink-connector-kafka_2.11-1.13-SNAPSHOT.JAR
flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.JAR
kafka-clients-2.7.0.JAR



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Posted by "joris.vanagtmaal" <jo...@wartsila.com>.
My JAR files included in the same folder i run the python code:

flink-connector-kafka_2.11-1.13-SNAPSHOT.JAR
flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.JAR
kafka-clients-2.7.0.JAR



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Posted by Yun Gao <yu...@aliyun.com>.
Hi,

Have you also include the kakfa-connector related jar in the classpath?

Best,
Yun
 ------------------Original Mail ------------------
Sender:joris.vanagtmaal <jo...@wartsila.com>
Send Date:Tue Feb 9 03:16:52 2021
Recipients:User-Flink <us...@flink.apache.org>
Subject:Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector
Traceback (most recent call last):
  File "streaming-dms.py", line 309, in <module>
    anomalies()
  File "streaming-dms.py", line 142, in anomalies
    t_env.sql_query(query).insert_into("ark_sink")
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 748, in sql_query
    j_table = self._j_tenv.sqlQuery(query)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 162, in deco
    raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
 at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
 at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
 at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
 at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
 at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.base/java.lang.Thread.run(Thread.java:834)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Posted by "joris.vanagtmaal" <jo...@wartsila.com>.
Traceback (most recent call last):
  File "streaming-dms.py", line 309, in <module>
    anomalies()
  File "streaming-dms.py", line 142, in anomalies
    t_env.sql_query(query).insert_into("ark_sink")
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 748, in sql_query
    j_table = self._j_tenv.sqlQuery(query)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File
"/Users/jag002/anaconda3/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 162, in deco
    raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
	 at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
	 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
	 at
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
	 at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
	 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
	 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
	 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
	 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
	 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
	 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
	 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
	 at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
	 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
	 at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
	 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
	 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
	 at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
	 at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
	 at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
	 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
	 at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	 at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	 at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	 at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	 at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	 at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	 at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	 at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	 at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	 at java.base/java.lang.Thread.run(Thread.java:834)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Connecting Pyflink DDL to Azure Eventhubs using Kafka connector

Posted by Khachatryan Roman <kh...@gmail.com>.
Hi,

Could you provide the exception stack trace?

Regards,
Roman


On Mon, Feb 8, 2021 at 3:46 PM joris.vanagtmaal <
joris.vanagtmaal@wartsila.com> wrote:

> I'm trying to read data from my eventhub in Azure, but i end up with the
> Flink error message 'findAndCreateTableSource failed'
>
> using Flink 1.13-Snapshot
>
> source_ddl = f"""CREATE TABLE dms_source(
>                                 x_value VARCHAR
>                          ) WITH (
>                           'connector.type' = 'Kafka',
>                           'connector.version' = 'universal',
>                           'connector.partition' = '0',
>                           'connector.sasl.jaas.config'=
> 'org.apache.kafka.common.security.plain.PlainLoginModule required
> username="$ConnectionString"
> password="Endpoint=sb://**EVEN_HUB_NAME**.
> servicebus.windows.net/;SharedAccessKeyName=**KEY_
>
> NAME**;SharedAccessKey=***PRIMARY_KEY***;EntityPath=**EVENT_HUB_INSTANCE_NAME**";',
>                           'connector.sasl.mechanism' = 'PLAIN',
>                           'connector.security.protocol' = 'SASL_SSL',
>                           'connector.properties.bootstrap.servers' =
> '**EVEN_HUB_NAME**.servicebus.windows.net:9093',
>                           'connector.properties.group.id' = '$Default',
>                           'connector.startup-mode' = 'latest-offset',
>                           'format.type' = 'json')
>                         """
>
>  Any tips on how to debug this?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>