You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Shameet Doshi <sh...@firstperformance.com> on 2022/04/04 17:41:38 UTC

Pyflink + TABLE API + snowflake

Hello,

I have need to talk to snowflake from within pyflink preferably using the TABLE API
We are running pyflink inside AWS Kinesis Data Analysis application. Installing any other python modules

I was able to connect a mysql db but not able to connect to snowflake db






    import argparse
    import logging
    import sys

    from pyflink.common import Row
    from pyflink.table import (EnvironmentSettings, TableEnvironment,
                           DataTypes)
    from pyflink.table.expressions import lit, col
    from pyflink.table.udf import udtf
    
    env_settings = (
    EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    )
    t_env  = TableEnvironment.create(environment_settings=env_settings)

    # write all the data to one file
    #t_env.get_config().set("parallelism.default", "1")

    # define the source

    print("Executing word_count example with default input data set.")
    print("Use --input to specify file input.")
    tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))

    # define the sink
 
    

    t_env.execute_sql("""
    CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:snowflake://myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE',
   'table-name' = 'users',
   'username' = 'devops',
   'password' = '50v*hrlM#SdK0euvnWR3',
   'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver'
)
""")

    t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.13.9.jar")
  
    usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable where name='amy'") 

    usersdata.print()



ERROR on execution : Caused by: java.lang.IllegalStateException: Cannot handle such jdbc url: 
jdbc:snowflake://myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE



Similar test works if I try to connect to MySQL

t_env.execute_sql("""
    CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase <mysql://localhost:3306/mydatabase>',
   'table-name' = 'users',
   'username' = 'root',
   'password' = 'rootroot'
)
""")

    t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar")
  
    usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable where name='amy'") 


-- 
NOTICE: This communication may contain information which is confidential to 
First Performance Corporation (FPC). If you are not the intended recipient 
of this communication, please delete this email, destroy all copies, and 
alert the sender. If you are the intended recipient of this communication, 
you should not copy, disclose or distribute this communication without the 
authority of FPC. Any views expressed in this communication are those of 
the individual sender, except where the sender specifically states them to 
be the views of FPC. Except as required by law, FPC does not represent, 
warrant or guarantee that the integrity of this communication has been 
maintained nor that the communication is free of errors, harmful code, 
interception or interference.

Re: Pyflink + TABLE API + snowflake

Posted by Shameet Doshi <sh...@firstperformance.com>.
I see
Would that mean to contribute to the flink connector  source code or I can
create it as part of another dependant jar


On Tue, Apr 5, 2022 at 5:36 AM Martijn Visser <ma...@apache.org>
wrote:

> Hi Shameet,
>
> No, you'll need to add support for the Snowflake dialect in Flink. You can
> find more information here at the JdbcDialect documentation [1]
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
> https://github.com/MartijnVisser
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.html
>
> On Tue, 5 Apr 2022 at 11:15, Shameet Doshi <
> shameet.doshi@firstperformance.com> wrote:
>
> > thanks Martijn
> >
> > So supplying the snowflake-jdbc jar as a dependency jar as i have done
> and
> > mentioning the driver property doesn't help ?
> >
> > t_env.get_config().get_configuration().set_string("pipeline.jars",
> >
> >
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.14.4.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.6.27.jar")
> >
> >
> > 'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver'
> >
> >
> >
> > On Tue, Apr 5, 2022 at 3:21 AM Martijn Visser <ma...@apache.org>
> > wrote:
> >
> > > Hi Shameet,
> > >
> > > There's currently no open source Flink Snowflake connector/sink
> > available.
> > > As mentioned in the documentation [1] this requires the implementation
> > of a
> > > dialect.
> > >
> > > Best regards
> > >
> > > Martijn Visser
> > > https://twitter.com/MartijnVisser82
> > > https://github.com/MartijnVisser
> > >
> > > [1]
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/
> > >
> > >
> > > On Mon, 4 Apr 2022 at 19:41, Shameet Doshi <
> > > shameet.doshi@firstperformance.com> wrote:
> > >
> > > > Hello,
> > > >
> > > > I have need to talk to snowflake from within pyflink preferably using
> > the
> > > > TABLE API
> > > > We are running pyflink inside AWS Kinesis Data Analysis application.
> > > > Installing any other python modules
> > > >
> > > > I was able to connect a mysql db but not able to connect to snowflake
> > db
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >     import argparse
> > > >     import logging
> > > >     import sys
> > > >
> > > >     from pyflink.common import Row
> > > >     from pyflink.table import (EnvironmentSettings, TableEnvironment,
> > > >                            DataTypes)
> > > >     from pyflink.table.expressions import lit, col
> > > >     from pyflink.table.udf import udtf
> > > >
> > > >     env_settings = (
> > > >
> > > >
> > >
> >
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> > > >     )
> > > >     t_env  =
> TableEnvironment.create(environment_settings=env_settings)
> > > >
> > > >     # write all the data to one file
> > > >     #t_env.get_config().set("parallelism.default", "1")
> > > >
> > > >     # define the source
> > > >
> > > >     print("Executing word_count example with default input data
> set.")
> > > >     print("Use --input to specify file input.")
> > > >     tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
> > > >
>  DataTypes.ROW([DataTypes.FIELD('line',
> > > > DataTypes.STRING())]))
> > > >
> > > >     # define the sink
> > > >
> > > >
> > > >
> > > >     t_env.execute_sql("""
> > > >     CREATE TABLE MyUserTable (
> > > >   id BIGINT,
> > > >   name STRING,
> > > >   age INT,
> > > >   status BOOLEAN,
> > > >   PRIMARY KEY (id) NOT ENFORCED
> > > > ) WITH (
> > > >    'connector' = 'jdbc',
> > > >    'url' = 'jdbc:snowflake://
> > > >
> > >
> >
> myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE
> > > > ',
> > > >    'table-name' = 'users',
> > > >    'username' = 'devops',
> > > >    'password' = '50v*hrlM#SdK0euvnWR3',
> > > >    'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver'
> > > > )
> > > > """)
> > > >
> > > >
>  t_env.get_config().get_configuration().set_string("pipeline.jars",
> > > >
> > >
> >
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.13.9.jar")
> > > >
> > > >     usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable
> where
> > > > name='amy'")
> > > >
> > > >     usersdata.print()
> > > >
> > > >
> > > >
> > > > ERROR on execution : Caused by: java.lang.IllegalStateException:
> Cannot
> > > > handle such jdbc url:
> > > > jdbc:snowflake://
> > > >
> > >
> >
> myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE
> > > >
> > > >
> > > >
> > > > Similar test works if I try to connect to MySQL
> > > >
> > > > t_env.execute_sql("""
> > > >     CREATE TABLE MyUserTable (
> > > >   id BIGINT,
> > > >   name STRING,
> > > >   age INT,
> > > >   status BOOLEAN,
> > > >   PRIMARY KEY (id) NOT ENFORCED
> > > > ) WITH (
> > > >    'connector' = 'jdbc',
> > > >    'url' = 'jdbc:mysql://localhost:3306/mydatabase
> > > > <mysql://localhost:3306/mydatabase>',
> > > >    'table-name' = 'users',
> > > >    'username' = 'root',
> > > >    'password' = 'rootroot'
> > > > )
> > > > """)
> > > >
> > > >
>  t_env.get_config().get_configuration().set_string("pipeline.jars",
> > > >
> > >
> >
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar")
> > > >
> > > >     usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable
> where
> > > > name='amy'")
> > > >
> > > >
> > > > --
> > > > NOTICE: This communication may contain information which is
> > confidential
> > > > to
> > > > First Performance Corporation (FPC). If you are not the intended
> > > recipient
> > > > of this communication, please delete this email, destroy all copies,
> > and
> > > > alert the sender. If you are the intended recipient of this
> > > communication,
> > > > you should not copy, disclose or distribute this communication
> without
> > > the
> > > > authority of FPC. Any views expressed in this communication are those
> > of
> > > > the individual sender, except where the sender specifically states
> them
> > > to
> > > > be the views of FPC. Except as required by law, FPC does not
> represent,
> > > > warrant or guarantee that the integrity of this communication has
> been
> > > > maintained nor that the communication is free of errors, harmful
> code,
> > > > interception or interference.
> > > >
> > >
> >
> > --
> > NOTICE: This communication may contain information which is confidential
> > to
> > First Performance Corporation (FPC). If you are not the intended
> recipient
> > of this communication, please delete this email, destroy all copies, and
> > alert the sender. If you are the intended recipient of this
> communication,
> > you should not copy, disclose or distribute this communication without
> the
> > authority of FPC. Any views expressed in this communication are those of
> > the individual sender, except where the sender specifically states them
> to
> > be the views of FPC. Except as required by law, FPC does not represent,
> > warrant or guarantee that the integrity of this communication has been
> > maintained nor that the communication is free of errors, harmful code,
> > interception or interference.
> >
>

-- 
NOTICE: This communication may contain information which is confidential to 
First Performance Corporation (FPC). If you are not the intended recipient 
of this communication, please delete this email, destroy all copies, and 
alert the sender. If you are the intended recipient of this communication, 
you should not copy, disclose or distribute this communication without the 
authority of FPC. Any views expressed in this communication are those of 
the individual sender, except where the sender specifically states them to 
be the views of FPC. Except as required by law, FPC does not represent, 
warrant or guarantee that the integrity of this communication has been 
maintained nor that the communication is free of errors, harmful code, 
interception or interference.

Re: Pyflink + TABLE API + snowflake

Posted by Martijn Visser <ma...@apache.org>.
Hi Shameet,

No, you'll need to add support for the Snowflake dialect in Flink. You can
find more information here at the JdbcDialect documentation [1]

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1]
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.html

On Tue, 5 Apr 2022 at 11:15, Shameet Doshi <
shameet.doshi@firstperformance.com> wrote:

> thanks Martijn
>
> So supplying the snowflake-jdbc jar as a dependency jar as i have done and
> mentioning the driver property doesn't help ?
>
> t_env.get_config().get_configuration().set_string("pipeline.jars",
>
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.14.4.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.6.27.jar")
>
>
> 'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver'
>
>
>
> On Tue, Apr 5, 2022 at 3:21 AM Martijn Visser <ma...@apache.org>
> wrote:
>
> > Hi Shameet,
> >
> > There's currently no open source Flink Snowflake connector/sink
> available.
> > As mentioned in the documentation [1] this requires the implementation
> of a
> > dialect.
> >
> > Best regards
> >
> > Martijn Visser
> > https://twitter.com/MartijnVisser82
> > https://github.com/MartijnVisser
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/
> >
> >
> > On Mon, 4 Apr 2022 at 19:41, Shameet Doshi <
> > shameet.doshi@firstperformance.com> wrote:
> >
> > > Hello,
> > >
> > > I have need to talk to snowflake from within pyflink preferably using
> the
> > > TABLE API
> > > We are running pyflink inside AWS Kinesis Data Analysis application.
> > > Installing any other python modules
> > >
> > > I was able to connect a mysql db but not able to connect to snowflake
> db
> > >
> > >
> > >
> > >
> > >
> > >
> > >     import argparse
> > >     import logging
> > >     import sys
> > >
> > >     from pyflink.common import Row
> > >     from pyflink.table import (EnvironmentSettings, TableEnvironment,
> > >                            DataTypes)
> > >     from pyflink.table.expressions import lit, col
> > >     from pyflink.table.udf import udtf
> > >
> > >     env_settings = (
> > >
> > >
> >
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> > >     )
> > >     t_env  = TableEnvironment.create(environment_settings=env_settings)
> > >
> > >     # write all the data to one file
> > >     #t_env.get_config().set("parallelism.default", "1")
> > >
> > >     # define the source
> > >
> > >     print("Executing word_count example with default input data set.")
> > >     print("Use --input to specify file input.")
> > >     tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
> > >                                 DataTypes.ROW([DataTypes.FIELD('line',
> > > DataTypes.STRING())]))
> > >
> > >     # define the sink
> > >
> > >
> > >
> > >     t_env.execute_sql("""
> > >     CREATE TABLE MyUserTable (
> > >   id BIGINT,
> > >   name STRING,
> > >   age INT,
> > >   status BOOLEAN,
> > >   PRIMARY KEY (id) NOT ENFORCED
> > > ) WITH (
> > >    'connector' = 'jdbc',
> > >    'url' = 'jdbc:snowflake://
> > >
> >
> myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE
> > > ',
> > >    'table-name' = 'users',
> > >    'username' = 'devops',
> > >    'password' = '50v*hrlM#SdK0euvnWR3',
> > >    'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver'
> > > )
> > > """)
> > >
> > >     t_env.get_config().get_configuration().set_string("pipeline.jars",
> > >
> >
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.13.9.jar")
> > >
> > >     usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable where
> > > name='amy'")
> > >
> > >     usersdata.print()
> > >
> > >
> > >
> > > ERROR on execution : Caused by: java.lang.IllegalStateException: Cannot
> > > handle such jdbc url:
> > > jdbc:snowflake://
> > >
> >
> myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE
> > >
> > >
> > >
> > > Similar test works if I try to connect to MySQL
> > >
> > > t_env.execute_sql("""
> > >     CREATE TABLE MyUserTable (
> > >   id BIGINT,
> > >   name STRING,
> > >   age INT,
> > >   status BOOLEAN,
> > >   PRIMARY KEY (id) NOT ENFORCED
> > > ) WITH (
> > >    'connector' = 'jdbc',
> > >    'url' = 'jdbc:mysql://localhost:3306/mydatabase
> > > <mysql://localhost:3306/mydatabase>',
> > >    'table-name' = 'users',
> > >    'username' = 'root',
> > >    'password' = 'rootroot'
> > > )
> > > """)
> > >
> > >     t_env.get_config().get_configuration().set_string("pipeline.jars",
> > >
> >
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar")
> > >
> > >     usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable where
> > > name='amy'")
> > >
> > >
> > > --
> > > NOTICE: This communication may contain information which is
> confidential
> > > to
> > > First Performance Corporation (FPC). If you are not the intended
> > recipient
> > > of this communication, please delete this email, destroy all copies,
> and
> > > alert the sender. If you are the intended recipient of this
> > communication,
> > > you should not copy, disclose or distribute this communication without
> > the
> > > authority of FPC. Any views expressed in this communication are those
> of
> > > the individual sender, except where the sender specifically states them
> > to
> > > be the views of FPC. Except as required by law, FPC does not represent,
> > > warrant or guarantee that the integrity of this communication has been
> > > maintained nor that the communication is free of errors, harmful code,
> > > interception or interference.
> > >
> >
>
> --
> NOTICE: This communication may contain information which is confidential
> to
> First Performance Corporation (FPC). If you are not the intended recipient
> of this communication, please delete this email, destroy all copies, and
> alert the sender. If you are the intended recipient of this communication,
> you should not copy, disclose or distribute this communication without the
> authority of FPC. Any views expressed in this communication are those of
> the individual sender, except where the sender specifically states them to
> be the views of FPC. Except as required by law, FPC does not represent,
> warrant or guarantee that the integrity of this communication has been
> maintained nor that the communication is free of errors, harmful code,
> interception or interference.
>

Re: Pyflink + TABLE API + snowflake

Posted by Shameet Doshi <sh...@firstperformance.com>.
thanks Martijn

So supplying the snowflake-jdbc jar as a dependency jar as i have done and
mentioning the driver property doesn't help ?

t_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.14.4.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.6.27.jar")


'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver'



On Tue, Apr 5, 2022 at 3:21 AM Martijn Visser <ma...@apache.org>
wrote:

> Hi Shameet,
>
> There's currently no open source Flink Snowflake connector/sink available.
> As mentioned in the documentation [1] this requires the implementation of a
> dialect.
>
> Best regards
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
> https://github.com/MartijnVisser
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/
>
>
> On Mon, 4 Apr 2022 at 19:41, Shameet Doshi <
> shameet.doshi@firstperformance.com> wrote:
>
> > Hello,
> >
> > I have need to talk to snowflake from within pyflink preferably using the
> > TABLE API
> > We are running pyflink inside AWS Kinesis Data Analysis application.
> > Installing any other python modules
> >
> > I was able to connect a mysql db but not able to connect to snowflake db
> >
> >
> >
> >
> >
> >
> >     import argparse
> >     import logging
> >     import sys
> >
> >     from pyflink.common import Row
> >     from pyflink.table import (EnvironmentSettings, TableEnvironment,
> >                            DataTypes)
> >     from pyflink.table.expressions import lit, col
> >     from pyflink.table.udf import udtf
> >
> >     env_settings = (
> >
> >
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> >     )
> >     t_env  = TableEnvironment.create(environment_settings=env_settings)
> >
> >     # write all the data to one file
> >     #t_env.get_config().set("parallelism.default", "1")
> >
> >     # define the source
> >
> >     print("Executing word_count example with default input data set.")
> >     print("Use --input to specify file input.")
> >     tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
> >                                 DataTypes.ROW([DataTypes.FIELD('line',
> > DataTypes.STRING())]))
> >
> >     # define the sink
> >
> >
> >
> >     t_env.execute_sql("""
> >     CREATE TABLE MyUserTable (
> >   id BIGINT,
> >   name STRING,
> >   age INT,
> >   status BOOLEAN,
> >   PRIMARY KEY (id) NOT ENFORCED
> > ) WITH (
> >    'connector' = 'jdbc',
> >    'url' = 'jdbc:snowflake://
> >
> myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE
> > ',
> >    'table-name' = 'users',
> >    'username' = 'devops',
> >    'password' = '50v*hrlM#SdK0euvnWR3',
> >    'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver'
> > )
> > """)
> >
> >     t_env.get_config().get_configuration().set_string("pipeline.jars",
> >
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.13.9.jar")
> >
> >     usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable where
> > name='amy'")
> >
> >     usersdata.print()
> >
> >
> >
> > ERROR on execution : Caused by: java.lang.IllegalStateException: Cannot
> > handle such jdbc url:
> > jdbc:snowflake://
> >
> myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE
> >
> >
> >
> > Similar test works if I try to connect to MySQL
> >
> > t_env.execute_sql("""
> >     CREATE TABLE MyUserTable (
> >   id BIGINT,
> >   name STRING,
> >   age INT,
> >   status BOOLEAN,
> >   PRIMARY KEY (id) NOT ENFORCED
> > ) WITH (
> >    'connector' = 'jdbc',
> >    'url' = 'jdbc:mysql://localhost:3306/mydatabase
> > <mysql://localhost:3306/mydatabase>',
> >    'table-name' = 'users',
> >    'username' = 'root',
> >    'password' = 'rootroot'
> > )
> > """)
> >
> >     t_env.get_config().get_configuration().set_string("pipeline.jars",
> >
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar")
> >
> >     usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable where
> > name='amy'")
> >
> >
> > --
> > NOTICE: This communication may contain information which is confidential
> > to
> > First Performance Corporation (FPC). If you are not the intended
> recipient
> > of this communication, please delete this email, destroy all copies, and
> > alert the sender. If you are the intended recipient of this
> communication,
> > you should not copy, disclose or distribute this communication without
> the
> > authority of FPC. Any views expressed in this communication are those of
> > the individual sender, except where the sender specifically states them
> to
> > be the views of FPC. Except as required by law, FPC does not represent,
> > warrant or guarantee that the integrity of this communication has been
> > maintained nor that the communication is free of errors, harmful code,
> > interception or interference.
> >
>

-- 
NOTICE: This communication may contain information which is confidential to 
First Performance Corporation (FPC). If you are not the intended recipient 
of this communication, please delete this email, destroy all copies, and 
alert the sender. If you are the intended recipient of this communication, 
you should not copy, disclose or distribute this communication without the 
authority of FPC. Any views expressed in this communication are those of 
the individual sender, except where the sender specifically states them to 
be the views of FPC. Except as required by law, FPC does not represent, 
warrant or guarantee that the integrity of this communication has been 
maintained nor that the communication is free of errors, harmful code, 
interception or interference.

Re: Pyflink + TABLE API + snowflake

Posted by Martijn Visser <ma...@apache.org>.
Hi Shameet,

There's currently no open source Flink Snowflake connector/sink available.
As mentioned in the documentation [1] this requires the implementation of a
dialect.

Best regards

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/


On Mon, 4 Apr 2022 at 19:41, Shameet Doshi <
shameet.doshi@firstperformance.com> wrote:

> Hello,
>
> I have need to talk to snowflake from within pyflink preferably using the
> TABLE API
> We are running pyflink inside AWS Kinesis Data Analysis application.
> Installing any other python modules
>
> I was able to connect a mysql db but not able to connect to snowflake db
>
>
>
>
>
>
>     import argparse
>     import logging
>     import sys
>
>     from pyflink.common import Row
>     from pyflink.table import (EnvironmentSettings, TableEnvironment,
>                            DataTypes)
>     from pyflink.table.expressions import lit, col
>     from pyflink.table.udf import udtf
>
>     env_settings = (
>
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>     )
>     t_env  = TableEnvironment.create(environment_settings=env_settings)
>
>     # write all the data to one file
>     #t_env.get_config().set("parallelism.default", "1")
>
>     # define the source
>
>     print("Executing word_count example with default input data set.")
>     print("Use --input to specify file input.")
>     tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
>                                 DataTypes.ROW([DataTypes.FIELD('line',
> DataTypes.STRING())]))
>
>     # define the sink
>
>
>
>     t_env.execute_sql("""
>     CREATE TABLE MyUserTable (
>   id BIGINT,
>   name STRING,
>   age INT,
>   status BOOLEAN,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:snowflake://
> myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE
> ',
>    'table-name' = 'users',
>    'username' = 'devops',
>    'password' = '50v*hrlM#SdK0euvnWR3',
>    'driver' = 'net.snowflake.client.jdbc.SnowflakeDriver'
> )
> """)
>
>     t_env.get_config().get_configuration().set_string("pipeline.jars",
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar;file:///Users/shameetdoshi/Downloads/snowflake-jdbc-3.13.9.jar")
>
>     usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable where
> name='amy'")
>
>     usersdata.print()
>
>
>
> ERROR on execution : Caused by: java.lang.IllegalStateException: Cannot
> handle such jdbc url:
> jdbc:snowflake://
> myaccount.snowflakecomputing.com:443/?ssl=on&warehouse=MYWAREHOUSE&db=MYDB&schema=MYSCHEMA&user=someuser&role=MYROLE
>
>
>
> Similar test works if I try to connect to MySQL
>
> t_env.execute_sql("""
>     CREATE TABLE MyUserTable (
>   id BIGINT,
>   name STRING,
>   age INT,
>   status BOOLEAN,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://localhost:3306/mydatabase
> <mysql://localhost:3306/mydatabase>',
>    'table-name' = 'users',
>    'username' = 'root',
>    'password' = 'rootroot'
> )
> """)
>
>     t_env.get_config().get_configuration().set_string("pipeline.jars",
> "file:///Users/shameetdoshi/Downloads/flink-connector-jdbc_2.12-1.13.3.jar;file:///Users/shameetdoshi/Downloads/mysql-connector-java-8.0.28.jar")
>
>     usersdata = t_env.execute_sql("SELECT   name FROM MyUserTable where
> name='amy'")
>
>
> --
> NOTICE: This communication may contain information which is confidential
> to
> First Performance Corporation (FPC). If you are not the intended recipient
> of this communication, please delete this email, destroy all copies, and
> alert the sender. If you are the intended recipient of this communication,
> you should not copy, disclose or distribute this communication without the
> authority of FPC. Any views expressed in this communication are those of
> the individual sender, except where the sender specifically states them to
> be the views of FPC. Except as required by law, FPC does not represent,
> warrant or guarantee that the integrity of this communication has been
> maintained nor that the communication is free of errors, harmful code,
> interception or interference.
>