You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2021/03/11 11:12:35 UTC

Error in Kafka property file contains no connector type

Hi,


Trying to stream from Kafka to Google BigQuery.


 The connect-standalone.properties is as follows


key.converter=org.apache.kafka.connect.storage.StringConverter

##value.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

#

# Converter-specific settings can be passed in by prefixing the Converter's

# setting with the converter we want to apply it to

key.converter.schemas.enable=true

value.converter.schemas.enable=false


# The internal converter used for offsets and config data is configurable
and

# must be specified, but most users will always want to use the built-in

# default. Offset and config data is never visible outside of Kafka Connect
in

# this format.

##internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter=org.apache.kafka.connect.storage.StringConverter

##internal.value.converter=org.apache.kafka.connect.storage.StringConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false


offset.storage.file.filename=/tmp/connect_bq.offsets

# Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000


# Set to a list of filesystem paths separated by commas (,) to enable class

# loading isolation for plugins (connectors, converters, transformations).
The

# list should consist of top level directories that include any combination
of:

# a) directories immediately containing jars with plugins and their
dependencies

# b) uber-jars with plugins and their dependencies

# c) directories immediately containing the package directory structure of

# classes of plugins and their dependencies Note: symlinks will be followed
to

# discover dependencies or plugins.

# Examples:

plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins


And bigquery-sink.properties file has this


{

     "name": "bigquery-sink",

     "connector.type": "bigquery-connector",

     "connector.class":
"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",

     "defaultDataset": "test",

     "project": "xyz",

     "topics": "md",

     "autoCreateTables": "false",

     "gcsBucketName": "tmp_storage_bucket",

     "queueSize": "-1",

     "bigQueryRetry": "0",

     "bigQueryRetryWait": "1000",

     "bigQueryMessageTimePartitioning": "false",

     "bigQueryPartitionDecorator": "true",

     "timePartitioningType": "DAY",

     "keySource": "FILE",

     "keyfile": "/home/hduser/xyz.json",

     "sanitizeTopics": "false",

     "schemaRetriever":
"com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",

     "threadPoolSize": "10",

     "allBQFieldsNullable": "false",

     "avroDataCacheSize": "100",

     "batchLoadIntervalSec": "120",

     "convertDoubleSpecialValues": "false",

     "enableBatchLoad": "false",

     "upsertEnabled": "false",

     "deleteEnabled": "false",

     "mergeIntervalMs": "60_000L",

     "mergeRecordsThreshold": "-1",

     "autoCreateBucket": "true",

     "allowNewBigQueryFields": "false",

     "allowBigQueryRequiredFieldRelaxation": "false",

     "allowSchemaUnionization": "false",

     "kafkaDataFieldName": "null",

     "kafkaKeyFieldName": "null"

}

Run as below


$KAFKA_HOME/bin/connect-standalone.sh \

/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \

/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties

I get this error

[2021-03-11 11:07:58,826] ERROR Failed to create job for
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
(org.apache.kafka.connect.cli.ConnectStandalone:102)
[2021-03-11 11:07:58,826] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException:
org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
config {"defaultDataset"="test",,
"schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
"project"="axial-glow-224522",, "autoCreateTables"="false",,
"deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
"bigQueryMessageTimePartitioning"="false",,
"connector.type"="bigquery-connector",,
"gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
"mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
"kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
"keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
"topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
"keySource"="FILE",, "allowNewBigQueryFields"="false",,
"bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
"threadPoolSize"="10",, "timePartitioningType"="DAY",,
"enableBatchLoad"="false",,
"connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
"mergeRecordsThreshold"="-1",, "queueSize"="-1",,
"batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
"avroDataCacheSize"="100",, "upsertEnabled"="false",,
"kafkaDataFieldName"="null",, }=,
"allowBigQueryRequiredFieldRelaxation"="false",} contains no connector type

I think the problem is the wrong entry in the bigquery-sink.properties
file above.

I cannot see what it is?


Any ideas appreciated.


Thanks

*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Fwd: Error in Kafka property file contains no connector type

Posted by Mich Talebzadeh <mi...@gmail.com>.
@Aakarsh Gopi, check this email


--------- Forwarded message ---------
From: Mich Talebzadeh <mi...@gmail.com>
Date: Sun, 14 Mar 2021 at 19:44
Subject: Re: Error in Kafka property file contains no connector type
To: <us...@kafka.apache.org>


Thanks all.

I was using an older Kafka version.

I upgraded Kafka in the cluster from kafka_2.12-1.1.0 to the latest stable
version kafka_2.12-2.7.0. I also upgraded zookeeper from
zookeeper-3.4.6 to apache-zookeeper-3.6.2-bin
version.

In addition in the run file I added the following:


#!/bin/ksh
unset CLASSPATH
export
CLASSPATH=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins:$KAFKA_HOME/libs
$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties

where the first part of CLASSPATH comes from plugins

plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins

This solved the issue.

Mich


LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 12 Mar 2021 at 20:33, Mich Talebzadeh <mi...@gmail.com>
wrote:

> Ok this try to stream kafka topic to BigQuery has moved on but I am now
> getting this error
>
> [2021-03-12 20:17:41,870] ERROR Stopping due to error
> (org.apache.kafka.connect.cli.ConnectStandalone:122)
> j*ava.lang.NoSuchMethodError:
> org.apache.kafka.common.acl.AclBindingFilter.<init>(Lorg/apache/kafka/common/resource/ResourcePatternFilter;Lorg/apache/kafka/common/acl/AccessControlEntryFilter;)V*
>         at
> org.apache.kafka.connect.mirror.MirrorSourceConnector.<clinit>(MirrorSourceConnector.java:67)
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at java.lang.Class.newInstance(Class.java:442)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:302)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:280)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:208)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:177)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:154)
>         at
> org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:56)
>         at
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
>
> I googled it but could not get much.
>
>
> Any ideas please.
>
>
> Thanks
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 14:44, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Thanks again Liam.
>>
>> This is the error
>>
>> [2021-03-12 14:17:54,670] ERROR Stopping after connector error
>> (org.apache.kafka.connect.cli.ConnectStandalone:113)
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>>         at
>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>>        at
>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>>         at
>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>>         at
>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>>         at
>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>>         at
>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>> Caused by: java.lang.ClassNotFoundException:
>> *org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*
>>
>> It is indeed under $KAFKA_HOME/libs
>>
>>
>> cd $KAFKA_HOME/libs
>>
>>  grep -lRi org.apache.kafka.common.config.ConfigDef
>>
>> kafka-clients-1.1.0.jar
>>
>> So it is in kafka-clients-1.1.0.jar file
>>
>>
>> I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
>> copied over the file to
>> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib
>>
>>
>>
>> grep -lRi
>> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>>
>>
>> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar
>>
>> Still cannot find it!
>>
>> In my plugin I have specified
>>
>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>>
>>
>> So not sure where it is looking
>>
>>
>>
>>
>> Regards,
>>
>>
>> Mich
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
>> liam.clarke@adscale.co.nz> wrote:
>>
>>> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
>>> should already have the common conf lib in there).
>>>
>>>
>>> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <
>>> mich.talebzadeh@gmail.com>
>>> wrote:
>>>
>>> > Thanks Liam for the suggestion.
>>> >
>>> > This is the redone sink file (plain text)
>>> >
>>> > name=bigquery-sink
>>> > connector.type=bigquery-connector
>>> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
>>> > defaultDataset=test
>>> > project=axial-glow-224522
>>> > topics=md
>>> > autoCreateTables=false
>>> > gcsBucketName=tmp_storage_bucket
>>> > queueSize=-1
>>> > bigQueryRetry=0
>>> > bigQueryRetryWait=1000
>>> > bigQueryMessageTimePartitioning=false
>>> > bigQueryPartitionDecorator=true
>>> > timePartitioningType=DAY
>>> > keySource=FILE
>>> > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
>>> > sanitizeTopics=false
>>> >
>>> >
>>> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
>>> > threadPoolSize=10
>>> > allBQFieldsNullable=false
>>> > avroDataCacheSize=100
>>> > batchLoadIntervalSec=120
>>> > convertDoubleSpecialValues=false
>>> > enableBatchLoad=false
>>> > upsertEnabled=false
>>> > deleteEnabled=false
>>> > mergeIntervalMs=60_000L
>>> > mergeRecordsThreshold=-1
>>> > autoCreateBucket=true
>>> > allowNewBigQueryFields=false
>>> > allowBigQueryRequiredFieldRelaxation=false
>>> > allowSchemaUnionization=false
>>> > kafkaDataFieldName=null
>>> > kafkaKeyFieldName=null
>>> >
>>> > Now when I run the command
>>> >
>>> > $KAFKA_HOME/bin/connect-standalone.sh \
>>> >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>>> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> >
>>> > It comes back with this error:
>>> >
>>> > [2021-03-12 09:23:54,523] INFO REST server listening at
>>> > http://50.140.197.220:8083/, advertising URL
>>> http://50.140.197.220:8083/
>>> > (org.apache.kafka.connect.runtime.rest.RestServer:207)
>>> > [2021-03-12 09:23:54,523] INFO Kafka Connect started
>>> > (org.apache.kafka.connect.runtime.Connect:55)
>>> > [2021-03-12 09:23:54,534] ERROR Stopping after connector error
>>> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>>> > java.lang.NoClassDefFoundError:
>>> > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>>> >         at
>>> >
>>> >
>>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>>> >         at
>>> >
>>> >
>>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>>> >         at
>>> >
>>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>>> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
>>> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> >
>>> > I downloaded common-config-6.1.0.jar and added to lib directory in
>>> >
>>> > ..../wepay-kafka-connect-bigquery-2.1.0/lib
>>> >
>>> > But little joy I am afraid.
>>> >
>>> > Cheers,
>>> >
>>> > Mich
>>> >
>>> >
>>> >
>>> > LinkedIn *
>>> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> > <
>>> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> > >*
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any
>>> > loss, damage or destruction of data or any other property which may
>>> arise
>>> > from relying on this email's technical content is explicitly
>>> disclaimed.
>>> > The author will in no case be liable for any monetary damages arising
>>> from
>>> > such loss, damage or destruction.
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
>>> > liam.clarke@adscale.co.nz> wrote:
>>> >
>>> > > Hi Mich,
>>> > >
>>> > > Your bigquery-sink.properties file is in a JSON format - which won't
>>> > work.
>>> > > It needs to follow the usual format of a Java properties file.
>>> > >
>>> > > Kind regards,
>>> > >
>>> > > Liam Clarke-Hutchinson
>>> > >
>>> > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
>>> > > mich.talebzadeh@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi,
>>> > > >
>>> > > >
>>> > > > Trying to stream from Kafka to Google BigQuery.
>>> > > >
>>> > > >
>>> > > >  The connect-standalone.properties is as follows
>>> > > >
>>> > > >
>>> > > > key.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > value.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > > #
>>> > > >
>>> > > > # Converter-specific settings can be passed in by prefixing the
>>> > > Converter's
>>> > > >
>>> > > > # setting with the converter we want to apply it to
>>> > > >
>>> > > > key.converter.schemas.enable=true
>>> > > >
>>> > > > value.converter.schemas.enable=false
>>> > > >
>>> > > >
>>> > > > # The internal converter used for offsets and config data is
>>> > configurable
>>> > > > and
>>> > > >
>>> > > > # must be specified, but most users will always want to use the
>>> > built-in
>>> > > >
>>> > > > # default. Offset and config data is never visible outside of Kafka
>>> > > Connect
>>> > > > in
>>> > > >
>>> > > > # this format.
>>> > > >
>>> > > >
>>> ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > >
>>> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > >
>>> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > >
>>> > >
>>> >
>>> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > internal.key.converter.schemas.enable=false
>>> > > >
>>> > > > internal.value.converter.schemas.enable=false
>>> > > >
>>> > > >
>>> > > > offset.storage.file.filename=/tmp/connect_bq.offsets
>>> > > >
>>> > > > # Flush much faster than normal, which is useful for
>>> testing/debugging
>>> > > >
>>> > > > offset.flush.interval.ms=10000
>>> > > >
>>> > > >
>>> > > > # Set to a list of filesystem paths separated by commas (,) to
>>> enable
>>> > > class
>>> > > >
>>> > > > # loading isolation for plugins (connectors, converters,
>>> > > transformations).
>>> > > > The
>>> > > >
>>> > > > # list should consist of top level directories that include any
>>> > > combination
>>> > > > of:
>>> > > >
>>> > > > # a) directories immediately containing jars with plugins and their
>>> > > > dependencies
>>> > > >
>>> > > > # b) uber-jars with plugins and their dependencies
>>> > > >
>>> > > > # c) directories immediately containing the package directory
>>> structure
>>> > > of
>>> > > >
>>> > > > # classes of plugins and their dependencies Note: symlinks will be
>>> > > followed
>>> > > > to
>>> > > >
>>> > > > # discover dependencies or plugins.
>>> > > >
>>> > > > # Examples:
>>> > > >
>>> > > >
>>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>>> > > >
>>> > > >
>>> > > > And bigquery-sink.properties file has this
>>> > > >
>>> > > >
>>> > > > {
>>> > > >
>>> > > >      "name": "bigquery-sink",
>>> > > >
>>> > > >      "connector.type": "bigquery-connector",
>>> > > >
>>> > > >      "connector.class":
>>> > > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
>>> > > >
>>> > > >      "defaultDataset": "test",
>>> > > >
>>> > > >      "project": "xyz",
>>> > > >
>>> > > >      "topics": "md",
>>> > > >
>>> > > >      "autoCreateTables": "false",
>>> > > >
>>> > > >      "gcsBucketName": "tmp_storage_bucket",
>>> > > >
>>> > > >      "queueSize": "-1",
>>> > > >
>>> > > >      "bigQueryRetry": "0",
>>> > > >
>>> > > >      "bigQueryRetryWait": "1000",
>>> > > >
>>> > > >      "bigQueryMessageTimePartitioning": "false",
>>> > > >
>>> > > >      "bigQueryPartitionDecorator": "true",
>>> > > >
>>> > > >      "timePartitioningType": "DAY",
>>> > > >
>>> > > >      "keySource": "FILE",
>>> > > >
>>> > > >      "keyfile": "/home/hduser/xyz.json",
>>> > > >
>>> > > >      "sanitizeTopics": "false",
>>> > > >
>>> > > >      "schemaRetriever":
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
>>> > > >
>>> > > >      "threadPoolSize": "10",
>>> > > >
>>> > > >      "allBQFieldsNullable": "false",
>>> > > >
>>> > > >      "avroDataCacheSize": "100",
>>> > > >
>>> > > >      "batchLoadIntervalSec": "120",
>>> > > >
>>> > > >      "convertDoubleSpecialValues": "false",
>>> > > >
>>> > > >      "enableBatchLoad": "false",
>>> > > >
>>> > > >      "upsertEnabled": "false",
>>> > > >
>>> > > >      "deleteEnabled": "false",
>>> > > >
>>> > > >      "mergeIntervalMs": "60_000L",
>>> > > >
>>> > > >      "mergeRecordsThreshold": "-1",
>>> > > >
>>> > > >      "autoCreateBucket": "true",
>>> > > >
>>> > > >      "allowNewBigQueryFields": "false",
>>> > > >
>>> > > >      "allowBigQueryRequiredFieldRelaxation": "false",
>>> > > >
>>> > > >      "allowSchemaUnionization": "false",
>>> > > >
>>> > > >      "kafkaDataFieldName": "null",
>>> > > >
>>> > > >      "kafkaKeyFieldName": "null"
>>> > > >
>>> > > > }
>>> > > >
>>> > > > Run as below
>>> > > >
>>> > > >
>>> > > > $KAFKA_HOME/bin/connect-standalone.sh \
>>> > > >
>>> > > >
>>> > >
>>> >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>>> > > >
>>> > > >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> > > >
>>> > > > I get this error
>>> > > >
>>> > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
>>> > > >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
>>> > > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
>>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>>> > > > java.util.concurrent.ExecutionException:
>>> > > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
>>> > > Connector
>>> > > > config {"defaultDataset"="test",,
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
>>> > > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
>>> > > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
>>> > > > "bigQueryMessageTimePartitioning"="false",,
>>> > > > "connector.type"="bigquery-connector",,
>>> > > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
>>> > > > "mergeIntervalMs"="60_000L",,
>>> "convertDoubleSpecialValues"="false",,
>>> > > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
>>> > > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
>>> > > > "topics"="md",, "bigQueryRetry"="0",,
>>> "allBQFieldsNullable"="false",,
>>> > > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
>>> > > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
>>> > > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
>>> > > > "enableBatchLoad"="false",,
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
>>> > > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
>>> > > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
>>> > > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
>>> > > > "kafkaDataFieldName"="null",, }=,
>>> > > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no
>>> connector
>>> > > type
>>> > > >
>>> > > > I think the problem is the wrong entry in the
>>> bigquery-sink.properties
>>> > > > file above.
>>> > > >
>>> > > > I cannot see what it is?
>>> > > >
>>> > > >
>>> > > > Any ideas appreciated.
>>> > > >
>>> > > >
>>> > > > Thanks
>>> > > >
>>> > > > *Disclaimer:* Use it at your own risk. Any and all responsibility
>>> for
>>> > any
>>> > > > loss, damage or destruction of data or any other property which may
>>> > arise
>>> > > > from relying on this email's technical content is explicitly
>>> > disclaimed.
>>> > > > The author will in no case be liable for any monetary damages
>>> arising
>>> > > from
>>> > > > such loss, damage or destruction.
>>> > > >
>>> > >
>>> >
>>>
>>

Re: Error in Kafka property file contains no connector type

Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks all.

I was using an older Kafka version.

I upgraded Kafka in the cluster from kafka_2.12-1.1.0 to the latest stable
version kafka_2.12-2.7.0. I also upgraded zookeeper from
zookeeper-3.4.6 to apache-zookeeper-3.6.2-bin
version.

In addition in the run file I added the following:


#!/bin/ksh
unset CLASSPATH
export
CLASSPATH=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins:$KAFKA_HOME/libs
$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties

where the first part of CLASSPATH comes from plugins

plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins

This solved the issue.

Mich


LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 12 Mar 2021 at 20:33, Mich Talebzadeh <mi...@gmail.com>
wrote:

> Ok this try to stream kafka topic to BigQuery has moved on but I am now
> getting this error
>
> [2021-03-12 20:17:41,870] ERROR Stopping due to error
> (org.apache.kafka.connect.cli.ConnectStandalone:122)
> j*ava.lang.NoSuchMethodError:
> org.apache.kafka.common.acl.AclBindingFilter.<init>(Lorg/apache/kafka/common/resource/ResourcePatternFilter;Lorg/apache/kafka/common/acl/AccessControlEntryFilter;)V*
>         at
> org.apache.kafka.connect.mirror.MirrorSourceConnector.<clinit>(MirrorSourceConnector.java:67)
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at java.lang.Class.newInstance(Class.java:442)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:302)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:280)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:208)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:177)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:154)
>         at
> org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:56)
>         at
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
>
> I googled it but could not get much.
>
>
> Any ideas please.
>
>
> Thanks
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 14:44, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Thanks again Liam.
>>
>> This is the error
>>
>> [2021-03-12 14:17:54,670] ERROR Stopping after connector error
>> (org.apache.kafka.connect.cli.ConnectStandalone:113)
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>>         at
>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>>        at
>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>>         at
>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>>         at
>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>>         at
>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>>         at
>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>> Caused by: java.lang.ClassNotFoundException:
>> *org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*
>>
>> It is indeed under $KAFKA_HOME/libs
>>
>>
>> cd $KAFKA_HOME/libs
>>
>>  grep -lRi org.apache.kafka.common.config.ConfigDef
>>
>> kafka-clients-1.1.0.jar
>>
>> So it is in kafka-clients-1.1.0.jar file
>>
>>
>> I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
>> copied over the file to
>> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib
>>
>>
>>
>> grep -lRi
>> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>>
>>
>> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar
>>
>> Still cannot find it!
>>
>> In my plugin I have specified
>>
>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>>
>>
>> So not sure where it is looking
>>
>>
>>
>>
>> Regards,
>>
>>
>> Mich
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
>> liam.clarke@adscale.co.nz> wrote:
>>
>>> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
>>> should already have the common conf lib in there).
>>>
>>>
>>> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <
>>> mich.talebzadeh@gmail.com>
>>> wrote:
>>>
>>> > Thanks Liam for the suggestion.
>>> >
>>> > This is the redone sink file (plain text)
>>> >
>>> > name=bigquery-sink
>>> > connector.type=bigquery-connector
>>> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
>>> > defaultDataset=test
>>> > project=axial-glow-224522
>>> > topics=md
>>> > autoCreateTables=false
>>> > gcsBucketName=tmp_storage_bucket
>>> > queueSize=-1
>>> > bigQueryRetry=0
>>> > bigQueryRetryWait=1000
>>> > bigQueryMessageTimePartitioning=false
>>> > bigQueryPartitionDecorator=true
>>> > timePartitioningType=DAY
>>> > keySource=FILE
>>> > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
>>> > sanitizeTopics=false
>>> >
>>> >
>>> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
>>> > threadPoolSize=10
>>> > allBQFieldsNullable=false
>>> > avroDataCacheSize=100
>>> > batchLoadIntervalSec=120
>>> > convertDoubleSpecialValues=false
>>> > enableBatchLoad=false
>>> > upsertEnabled=false
>>> > deleteEnabled=false
>>> > mergeIntervalMs=60_000L
>>> > mergeRecordsThreshold=-1
>>> > autoCreateBucket=true
>>> > allowNewBigQueryFields=false
>>> > allowBigQueryRequiredFieldRelaxation=false
>>> > allowSchemaUnionization=false
>>> > kafkaDataFieldName=null
>>> > kafkaKeyFieldName=null
>>> >
>>> > Now when I run the command
>>> >
>>> > $KAFKA_HOME/bin/connect-standalone.sh \
>>> >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>>> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> >
>>> > It comes back with this error:
>>> >
>>> > [2021-03-12 09:23:54,523] INFO REST server listening at
>>> > http://50.140.197.220:8083/, advertising URL
>>> http://50.140.197.220:8083/
>>> > (org.apache.kafka.connect.runtime.rest.RestServer:207)
>>> > [2021-03-12 09:23:54,523] INFO Kafka Connect started
>>> > (org.apache.kafka.connect.runtime.Connect:55)
>>> > [2021-03-12 09:23:54,534] ERROR Stopping after connector error
>>> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>>> > java.lang.NoClassDefFoundError:
>>> > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>>> >         at
>>> >
>>> >
>>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>>> >         at
>>> >
>>> >
>>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>>> >         at
>>> >
>>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>>> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
>>> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> >
>>> > I downloaded common-config-6.1.0.jar and added to lib directory in
>>> >
>>> > ..../wepay-kafka-connect-bigquery-2.1.0/lib
>>> >
>>> > But little joy I am afraid.
>>> >
>>> > Cheers,
>>> >
>>> > Mich
>>> >
>>> >
>>> >
>>> > LinkedIn *
>>> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> > <
>>> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> > >*
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any
>>> > loss, damage or destruction of data or any other property which may
>>> arise
>>> > from relying on this email's technical content is explicitly
>>> disclaimed.
>>> > The author will in no case be liable for any monetary damages arising
>>> from
>>> > such loss, damage or destruction.
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
>>> > liam.clarke@adscale.co.nz> wrote:
>>> >
>>> > > Hi Mich,
>>> > >
>>> > > Your bigquery-sink.properties file is in a JSON format - which won't
>>> > work.
>>> > > It needs to follow the usual format of a Java properties file.
>>> > >
>>> > > Kind regards,
>>> > >
>>> > > Liam Clarke-Hutchinson
>>> > >
>>> > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
>>> > > mich.talebzadeh@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi,
>>> > > >
>>> > > >
>>> > > > Trying to stream from Kafka to Google BigQuery.
>>> > > >
>>> > > >
>>> > > >  The connect-standalone.properties is as follows
>>> > > >
>>> > > >
>>> > > > key.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > value.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > > #
>>> > > >
>>> > > > # Converter-specific settings can be passed in by prefixing the
>>> > > Converter's
>>> > > >
>>> > > > # setting with the converter we want to apply it to
>>> > > >
>>> > > > key.converter.schemas.enable=true
>>> > > >
>>> > > > value.converter.schemas.enable=false
>>> > > >
>>> > > >
>>> > > > # The internal converter used for offsets and config data is
>>> > configurable
>>> > > > and
>>> > > >
>>> > > > # must be specified, but most users will always want to use the
>>> > built-in
>>> > > >
>>> > > > # default. Offset and config data is never visible outside of Kafka
>>> > > Connect
>>> > > > in
>>> > > >
>>> > > > # this format.
>>> > > >
>>> > > >
>>> ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > >
>>> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > >
>>> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > >
>>> > >
>>> >
>>> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > internal.key.converter.schemas.enable=false
>>> > > >
>>> > > > internal.value.converter.schemas.enable=false
>>> > > >
>>> > > >
>>> > > > offset.storage.file.filename=/tmp/connect_bq.offsets
>>> > > >
>>> > > > # Flush much faster than normal, which is useful for
>>> testing/debugging
>>> > > >
>>> > > > offset.flush.interval.ms=10000
>>> > > >
>>> > > >
>>> > > > # Set to a list of filesystem paths separated by commas (,) to
>>> enable
>>> > > class
>>> > > >
>>> > > > # loading isolation for plugins (connectors, converters,
>>> > > transformations).
>>> > > > The
>>> > > >
>>> > > > # list should consist of top level directories that include any
>>> > > combination
>>> > > > of:
>>> > > >
>>> > > > # a) directories immediately containing jars with plugins and their
>>> > > > dependencies
>>> > > >
>>> > > > # b) uber-jars with plugins and their dependencies
>>> > > >
>>> > > > # c) directories immediately containing the package directory
>>> structure
>>> > > of
>>> > > >
>>> > > > # classes of plugins and their dependencies Note: symlinks will be
>>> > > followed
>>> > > > to
>>> > > >
>>> > > > # discover dependencies or plugins.
>>> > > >
>>> > > > # Examples:
>>> > > >
>>> > > >
>>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>>> > > >
>>> > > >
>>> > > > And bigquery-sink.properties file has this
>>> > > >
>>> > > >
>>> > > > {
>>> > > >
>>> > > >      "name": "bigquery-sink",
>>> > > >
>>> > > >      "connector.type": "bigquery-connector",
>>> > > >
>>> > > >      "connector.class":
>>> > > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
>>> > > >
>>> > > >      "defaultDataset": "test",
>>> > > >
>>> > > >      "project": "xyz",
>>> > > >
>>> > > >      "topics": "md",
>>> > > >
>>> > > >      "autoCreateTables": "false",
>>> > > >
>>> > > >      "gcsBucketName": "tmp_storage_bucket",
>>> > > >
>>> > > >      "queueSize": "-1",
>>> > > >
>>> > > >      "bigQueryRetry": "0",
>>> > > >
>>> > > >      "bigQueryRetryWait": "1000",
>>> > > >
>>> > > >      "bigQueryMessageTimePartitioning": "false",
>>> > > >
>>> > > >      "bigQueryPartitionDecorator": "true",
>>> > > >
>>> > > >      "timePartitioningType": "DAY",
>>> > > >
>>> > > >      "keySource": "FILE",
>>> > > >
>>> > > >      "keyfile": "/home/hduser/xyz.json",
>>> > > >
>>> > > >      "sanitizeTopics": "false",
>>> > > >
>>> > > >      "schemaRetriever":
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
>>> > > >
>>> > > >      "threadPoolSize": "10",
>>> > > >
>>> > > >      "allBQFieldsNullable": "false",
>>> > > >
>>> > > >      "avroDataCacheSize": "100",
>>> > > >
>>> > > >      "batchLoadIntervalSec": "120",
>>> > > >
>>> > > >      "convertDoubleSpecialValues": "false",
>>> > > >
>>> > > >      "enableBatchLoad": "false",
>>> > > >
>>> > > >      "upsertEnabled": "false",
>>> > > >
>>> > > >      "deleteEnabled": "false",
>>> > > >
>>> > > >      "mergeIntervalMs": "60_000L",
>>> > > >
>>> > > >      "mergeRecordsThreshold": "-1",
>>> > > >
>>> > > >      "autoCreateBucket": "true",
>>> > > >
>>> > > >      "allowNewBigQueryFields": "false",
>>> > > >
>>> > > >      "allowBigQueryRequiredFieldRelaxation": "false",
>>> > > >
>>> > > >      "allowSchemaUnionization": "false",
>>> > > >
>>> > > >      "kafkaDataFieldName": "null",
>>> > > >
>>> > > >      "kafkaKeyFieldName": "null"
>>> > > >
>>> > > > }
>>> > > >
>>> > > > Run as below
>>> > > >
>>> > > >
>>> > > > $KAFKA_HOME/bin/connect-standalone.sh \
>>> > > >
>>> > > >
>>> > >
>>> >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>>> > > >
>>> > > >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> > > >
>>> > > > I get this error
>>> > > >
>>> > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
>>> > > >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
>>> > > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
>>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>>> > > > java.util.concurrent.ExecutionException:
>>> > > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
>>> > > Connector
>>> > > > config {"defaultDataset"="test",,
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
>>> > > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
>>> > > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
>>> > > > "bigQueryMessageTimePartitioning"="false",,
>>> > > > "connector.type"="bigquery-connector",,
>>> > > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
>>> > > > "mergeIntervalMs"="60_000L",,
>>> "convertDoubleSpecialValues"="false",,
>>> > > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
>>> > > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
>>> > > > "topics"="md",, "bigQueryRetry"="0",,
>>> "allBQFieldsNullable"="false",,
>>> > > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
>>> > > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
>>> > > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
>>> > > > "enableBatchLoad"="false",,
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
>>> > > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
>>> > > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
>>> > > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
>>> > > > "kafkaDataFieldName"="null",, }=,
>>> > > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no
>>> connector
>>> > > type
>>> > > >
>>> > > > I think the problem is the wrong entry in the
>>> bigquery-sink.properties
>>> > > > file above.
>>> > > >
>>> > > > I cannot see what it is?
>>> > > >
>>> > > >
>>> > > > Any ideas appreciated.
>>> > > >
>>> > > >
>>> > > > Thanks
>>> > > >
>>> > > > *Disclaimer:* Use it at your own risk. Any and all responsibility
>>> for
>>> > any
>>> > > > loss, damage or destruction of data or any other property which may
>>> > arise
>>> > > > from relying on this email's technical content is explicitly
>>> > disclaimed.
>>> > > > The author will in no case be liable for any monetary damages
>>> arising
>>> > > from
>>> > > > such loss, damage or destruction.
>>> > > >
>>> > >
>>> >
>>>
>>

Re: Error in Kafka property file contains no connector type

Posted by Mich Talebzadeh <mi...@gmail.com>.
Ok this try to stream kafka topic to BigQuery has moved on but I am now
getting this error

[2021-03-12 20:17:41,870] ERROR Stopping due to error
(org.apache.kafka.connect.cli.ConnectStandalone:122)
j*ava.lang.NoSuchMethodError:
org.apache.kafka.common.acl.AclBindingFilter.<init>(Lorg/apache/kafka/common/resource/ResourcePatternFilter;Lorg/apache/kafka/common/acl/AccessControlEntryFilter;)V*
        at
org.apache.kafka.connect.mirror.MirrorSourceConnector.<clinit>(MirrorSourceConnector.java:67)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:302)
        at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:280)
        at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
        at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:208)
        at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:177)
        at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:154)
        at
org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:56)
        at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)

I googled it but could not get much.


Any ideas please.


Thanks



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 12 Mar 2021 at 14:44, Mich Talebzadeh <mi...@gmail.com>
wrote:

> Thanks again Liam.
>
> This is the error
>
> [2021-03-12 14:17:54,670] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:113)
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>         at
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>        at
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>         at
> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>         at
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>         at
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> Caused by: java.lang.ClassNotFoundException:
> *org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*
>
> It is indeed under $KAFKA_HOME/libs
>
>
> cd $KAFKA_HOME/libs
>
>  grep -lRi org.apache.kafka.common.config.ConfigDef
>
> kafka-clients-1.1.0.jar
>
> So it is in kafka-clients-1.1.0.jar file
>
>
> I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
> copied over the file to
> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib
>
>
>
> grep -lRi
> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>
>
> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar
>
> Still cannot find it!
>
> In my plugin I have specified
>
> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>
>
> So not sure where it is looking
>
>
>
>
> Regards,
>
>
> Mich
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
> liam.clarke@adscale.co.nz> wrote:
>
>> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
>> should already have the common conf lib in there).
>>
>>
>> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <
>> mich.talebzadeh@gmail.com>
>> wrote:
>>
>> > Thanks Liam for the suggestion.
>> >
>> > This is the redone sink file (plain text)
>> >
>> > name=bigquery-sink
>> > connector.type=bigquery-connector
>> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
>> > defaultDataset=test
>> > project=axial-glow-224522
>> > topics=md
>> > autoCreateTables=false
>> > gcsBucketName=tmp_storage_bucket
>> > queueSize=-1
>> > bigQueryRetry=0
>> > bigQueryRetryWait=1000
>> > bigQueryMessageTimePartitioning=false
>> > bigQueryPartitionDecorator=true
>> > timePartitioningType=DAY
>> > keySource=FILE
>> > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
>> > sanitizeTopics=false
>> >
>> >
>> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
>> > threadPoolSize=10
>> > allBQFieldsNullable=false
>> > avroDataCacheSize=100
>> > batchLoadIntervalSec=120
>> > convertDoubleSpecialValues=false
>> > enableBatchLoad=false
>> > upsertEnabled=false
>> > deleteEnabled=false
>> > mergeIntervalMs=60_000L
>> > mergeRecordsThreshold=-1
>> > autoCreateBucket=true
>> > allowNewBigQueryFields=false
>> > allowBigQueryRequiredFieldRelaxation=false
>> > allowSchemaUnionization=false
>> > kafkaDataFieldName=null
>> > kafkaKeyFieldName=null
>> >
>> > Now when I run the command
>> >
>> > $KAFKA_HOME/bin/connect-standalone.sh \
>> >
>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>> >
>> > It comes back with this error:
>> >
>> > [2021-03-12 09:23:54,523] INFO REST server listening at
>> > http://50.140.197.220:8083/, advertising URL
>> http://50.140.197.220:8083/
>> > (org.apache.kafka.connect.runtime.rest.RestServer:207)
>> > [2021-03-12 09:23:54,523] INFO Kafka Connect started
>> > (org.apache.kafka.connect.runtime.Connect:55)
>> > [2021-03-12 09:23:54,534] ERROR Stopping after connector error
>> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>> > java.lang.NoClassDefFoundError:
>> > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>> >         at
>> >
>> >
>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>> >         at
>> >
>> >
>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>> >         at
>> >
>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>> >         at
>> >
>> >
>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>> >         at
>> >
>> >
>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>> >         at
>> >
>> >
>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>> > Caused by: java.lang.ClassNotFoundException:
>> > org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> >         at
>> >
>> >
>> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
>> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> >
>> > I downloaded common-config-6.1.0.jar and added to lib directory in
>> >
>> > ..../wepay-kafka-connect-bigquery-2.1.0/lib
>> >
>> > But little joy I am afraid.
>> >
>> > Cheers,
>> >
>> > Mich
>> >
>> >
>> >
>> > LinkedIn *
>> >
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> > <
>> >
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> > >*
>> >
>> >
>> >
>> >
>> >
>> > *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any
>> > loss, damage or destruction of data or any other property which may
>> arise
>> > from relying on this email's technical content is explicitly disclaimed.
>> > The author will in no case be liable for any monetary damages arising
>> from
>> > such loss, damage or destruction.
>> >
>> >
>> >
>> >
>> > On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
>> > liam.clarke@adscale.co.nz> wrote:
>> >
>> > > Hi Mich,
>> > >
>> > > Your bigquery-sink.properties file is in a JSON format - which won't
>> > work.
>> > > It needs to follow the usual format of a Java properties file.
>> > >
>> > > Kind regards,
>> > >
>> > > Liam Clarke-Hutchinson
>> > >
>> > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
>> > > mich.talebzadeh@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > >
>> > > > Trying to stream from Kafka to Google BigQuery.
>> > > >
>> > > >
>> > > >  The connect-standalone.properties is as follows
>> > > >
>> > > >
>> > > > key.converter=org.apache.kafka.connect.storage.StringConverter
>> > > >
>> > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
>> > > >
>> > > > value.converter=org.apache.kafka.connect.json.JsonConverter
>> > > >
>> > > > #
>> > > >
>> > > > # Converter-specific settings can be passed in by prefixing the
>> > > Converter's
>> > > >
>> > > > # setting with the converter we want to apply it to
>> > > >
>> > > > key.converter.schemas.enable=true
>> > > >
>> > > > value.converter.schemas.enable=false
>> > > >
>> > > >
>> > > > # The internal converter used for offsets and config data is
>> > configurable
>> > > > and
>> > > >
>> > > > # must be specified, but most users will always want to use the
>> > built-in
>> > > >
>> > > > # default. Offset and config data is never visible outside of Kafka
>> > > Connect
>> > > > in
>> > > >
>> > > > # this format.
>> > > >
>> > > > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>> > > >
>> > > > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>> > > >
>> > > >
>> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>> > > >
>> > > >
>> > >
>> >
>> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>> > > >
>> > > > internal.key.converter.schemas.enable=false
>> > > >
>> > > > internal.value.converter.schemas.enable=false
>> > > >
>> > > >
>> > > > offset.storage.file.filename=/tmp/connect_bq.offsets
>> > > >
>> > > > # Flush much faster than normal, which is useful for
>> testing/debugging
>> > > >
>> > > > offset.flush.interval.ms=10000
>> > > >
>> > > >
>> > > > # Set to a list of filesystem paths separated by commas (,) to
>> enable
>> > > class
>> > > >
>> > > > # loading isolation for plugins (connectors, converters,
>> > > transformations).
>> > > > The
>> > > >
>> > > > # list should consist of top level directories that include any
>> > > combination
>> > > > of:
>> > > >
>> > > > # a) directories immediately containing jars with plugins and their
>> > > > dependencies
>> > > >
>> > > > # b) uber-jars with plugins and their dependencies
>> > > >
>> > > > # c) directories immediately containing the package directory
>> structure
>> > > of
>> > > >
>> > > > # classes of plugins and their dependencies Note: symlinks will be
>> > > followed
>> > > > to
>> > > >
>> > > > # discover dependencies or plugins.
>> > > >
>> > > > # Examples:
>> > > >
>> > > >
>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>> > > >
>> > > >
>> > > > And bigquery-sink.properties file has this
>> > > >
>> > > >
>> > > > {
>> > > >
>> > > >      "name": "bigquery-sink",
>> > > >
>> > > >      "connector.type": "bigquery-connector",
>> > > >
>> > > >      "connector.class":
>> > > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
>> > > >
>> > > >      "defaultDataset": "test",
>> > > >
>> > > >      "project": "xyz",
>> > > >
>> > > >      "topics": "md",
>> > > >
>> > > >      "autoCreateTables": "false",
>> > > >
>> > > >      "gcsBucketName": "tmp_storage_bucket",
>> > > >
>> > > >      "queueSize": "-1",
>> > > >
>> > > >      "bigQueryRetry": "0",
>> > > >
>> > > >      "bigQueryRetryWait": "1000",
>> > > >
>> > > >      "bigQueryMessageTimePartitioning": "false",
>> > > >
>> > > >      "bigQueryPartitionDecorator": "true",
>> > > >
>> > > >      "timePartitioningType": "DAY",
>> > > >
>> > > >      "keySource": "FILE",
>> > > >
>> > > >      "keyfile": "/home/hduser/xyz.json",
>> > > >
>> > > >      "sanitizeTopics": "false",
>> > > >
>> > > >      "schemaRetriever":
>> > > >
>> > > >
>> > >
>> >
>> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
>> > > >
>> > > >      "threadPoolSize": "10",
>> > > >
>> > > >      "allBQFieldsNullable": "false",
>> > > >
>> > > >      "avroDataCacheSize": "100",
>> > > >
>> > > >      "batchLoadIntervalSec": "120",
>> > > >
>> > > >      "convertDoubleSpecialValues": "false",
>> > > >
>> > > >      "enableBatchLoad": "false",
>> > > >
>> > > >      "upsertEnabled": "false",
>> > > >
>> > > >      "deleteEnabled": "false",
>> > > >
>> > > >      "mergeIntervalMs": "60_000L",
>> > > >
>> > > >      "mergeRecordsThreshold": "-1",
>> > > >
>> > > >      "autoCreateBucket": "true",
>> > > >
>> > > >      "allowNewBigQueryFields": "false",
>> > > >
>> > > >      "allowBigQueryRequiredFieldRelaxation": "false",
>> > > >
>> > > >      "allowSchemaUnionization": "false",
>> > > >
>> > > >      "kafkaDataFieldName": "null",
>> > > >
>> > > >      "kafkaKeyFieldName": "null"
>> > > >
>> > > > }
>> > > >
>> > > > Run as below
>> > > >
>> > > >
>> > > > $KAFKA_HOME/bin/connect-standalone.sh \
>> > > >
>> > > >
>> > >
>> >
>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>> > > >
>> > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>> > > >
>> > > > I get this error
>> > > >
>> > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
>> > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
>> > > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>> > > > java.util.concurrent.ExecutionException:
>> > > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
>> > > Connector
>> > > > config {"defaultDataset"="test",,
>> > > >
>> > > >
>> > >
>> >
>> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
>> > > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
>> > > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
>> > > > "bigQueryMessageTimePartitioning"="false",,
>> > > > "connector.type"="bigquery-connector",,
>> > > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
>> > > > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
>> > > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
>> > > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
>> > > > "topics"="md",, "bigQueryRetry"="0",,
>> "allBQFieldsNullable"="false",,
>> > > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
>> > > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
>> > > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
>> > > > "enableBatchLoad"="false",,
>> > > >
>> > > >
>> > >
>> >
>> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
>> > > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
>> > > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
>> > > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
>> > > > "kafkaDataFieldName"="null",, }=,
>> > > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no
>> connector
>> > > type
>> > > >
>> > > > I think the problem is the wrong entry in the
>> bigquery-sink.properties
>> > > > file above.
>> > > >
>> > > > I cannot see what it is?
>> > > >
>> > > >
>> > > > Any ideas appreciated.
>> > > >
>> > > >
>> > > > Thanks
>> > > >
>> > > > *Disclaimer:* Use it at your own risk. Any and all responsibility
>> for
>> > any
>> > > > loss, damage or destruction of data or any other property which may
>> > arise
>> > > > from relying on this email's technical content is explicitly
>> > disclaimed.
>> > > > The author will in no case be liable for any monetary damages
>> arising
>> > > from
>> > > > such loss, damage or destruction.
>> > > >
>> > >
>> >
>>
>

Re: Error in Kafka property file contains no connector type

Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks again Liam.

This is the error

[2021-03-12 14:17:54,670] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.lang.NoClassDefFoundError:
org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
        at
com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
       at
com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
        at
org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
        at
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
        at
org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
        at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
Caused by: java.lang.ClassNotFoundException:
*org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*

It is indeed under $KAFKA_HOME/libs


cd $KAFKA_HOME/libs

 grep -lRi org.apache.kafka.common.config.ConfigDef

kafka-clients-1.1.0.jar

So it is in kafka-clients-1.1.0.jar file


I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
copied over the file to
share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib



grep -lRi
org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString

share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar

Still cannot find it!

In my plugin I have specified

plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins


So not sure where it is looking




Regards,


Mich



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
liam.clarke@adscale.co.nz> wrote:

> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
> should already have the common conf lib in there).
>
>
> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <mich.talebzadeh@gmail.com
> >
> wrote:
>
> > Thanks Liam for the suggestion.
> >
> > This is the redone sink file (plain text)
> >
> > name=bigquery-sink
> > connector.type=bigquery-connector
> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
> > defaultDataset=test
> > project=axial-glow-224522
> > topics=md
> > autoCreateTables=false
> > gcsBucketName=tmp_storage_bucket
> > queueSize=-1
> > bigQueryRetry=0
> > bigQueryRetryWait=1000
> > bigQueryMessageTimePartitioning=false
> > bigQueryPartitionDecorator=true
> > timePartitioningType=DAY
> > keySource=FILE
> > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
> > sanitizeTopics=false
> >
> >
> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
> > threadPoolSize=10
> > allBQFieldsNullable=false
> > avroDataCacheSize=100
> > batchLoadIntervalSec=120
> > convertDoubleSpecialValues=false
> > enableBatchLoad=false
> > upsertEnabled=false
> > deleteEnabled=false
> > mergeIntervalMs=60_000L
> > mergeRecordsThreshold=-1
> > autoCreateBucket=true
> > allowNewBigQueryFields=false
> > allowBigQueryRequiredFieldRelaxation=false
> > allowSchemaUnionization=false
> > kafkaDataFieldName=null
> > kafkaKeyFieldName=null
> >
> > Now when I run the command
> >
> > $KAFKA_HOME/bin/connect-standalone.sh \
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> >
> > It comes back with this error:
> >
> > [2021-03-12 09:23:54,523] INFO REST server listening at
> > http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
> > (org.apache.kafka.connect.runtime.rest.RestServer:207)
> > [2021-03-12 09:23:54,523] INFO Kafka Connect started
> > (org.apache.kafka.connect.runtime.Connect:55)
> > [2021-03-12 09:23:54,534] ERROR Stopping after connector error
> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > java.lang.NoClassDefFoundError:
> > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
> >         at
> >
> >
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
> >         at
> >
> >
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
> >         at
> > org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
> >         at
> >
> >
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
> >         at
> >
> >
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
> >         at
> >
> >
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> > Caused by: java.lang.ClassNotFoundException:
> > org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >         at
> >
> >
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >
> > I downloaded common-config-6.1.0.jar and added to lib directory in
> >
> > ..../wepay-kafka-connect-bigquery-2.1.0/lib
> >
> > But little joy I am afraid.
> >
> > Cheers,
> >
> > Mich
> >
> >
> >
> > LinkedIn *
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> > <
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> > >*
> >
> >
> >
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
> >
> >
> >
> >
> > On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
> > liam.clarke@adscale.co.nz> wrote:
> >
> > > Hi Mich,
> > >
> > > Your bigquery-sink.properties file is in a JSON format - which won't
> > work.
> > > It needs to follow the usual format of a Java properties file.
> > >
> > > Kind regards,
> > >
> > > Liam Clarke-Hutchinson
> > >
> > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> > > mich.talebzadeh@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > > Trying to stream from Kafka to Google BigQuery.
> > > >
> > > >
> > > >  The connect-standalone.properties is as follows
> > > >
> > > >
> > > > key.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > > value.converter=org.apache.kafka.connect.json.JsonConverter
> > > >
> > > > #
> > > >
> > > > # Converter-specific settings can be passed in by prefixing the
> > > Converter's
> > > >
> > > > # setting with the converter we want to apply it to
> > > >
> > > > key.converter.schemas.enable=true
> > > >
> > > > value.converter.schemas.enable=false
> > > >
> > > >
> > > > # The internal converter used for offsets and config data is
> > configurable
> > > > and
> > > >
> > > > # must be specified, but most users will always want to use the
> > built-in
> > > >
> > > > # default. Offset and config data is never visible outside of Kafka
> > > Connect
> > > > in
> > > >
> > > > # this format.
> > > >
> > > > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> > > >
> > > > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> > > >
> > > >
> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > >
> > >
> >
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > > internal.key.converter.schemas.enable=false
> > > >
> > > > internal.value.converter.schemas.enable=false
> > > >
> > > >
> > > > offset.storage.file.filename=/tmp/connect_bq.offsets
> > > >
> > > > # Flush much faster than normal, which is useful for
> testing/debugging
> > > >
> > > > offset.flush.interval.ms=10000
> > > >
> > > >
> > > > # Set to a list of filesystem paths separated by commas (,) to enable
> > > class
> > > >
> > > > # loading isolation for plugins (connectors, converters,
> > > transformations).
> > > > The
> > > >
> > > > # list should consist of top level directories that include any
> > > combination
> > > > of:
> > > >
> > > > # a) directories immediately containing jars with plugins and their
> > > > dependencies
> > > >
> > > > # b) uber-jars with plugins and their dependencies
> > > >
> > > > # c) directories immediately containing the package directory
> structure
> > > of
> > > >
> > > > # classes of plugins and their dependencies Note: symlinks will be
> > > followed
> > > > to
> > > >
> > > > # discover dependencies or plugins.
> > > >
> > > > # Examples:
> > > >
> > > >
> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
> > > >
> > > >
> > > > And bigquery-sink.properties file has this
> > > >
> > > >
> > > > {
> > > >
> > > >      "name": "bigquery-sink",
> > > >
> > > >      "connector.type": "bigquery-connector",
> > > >
> > > >      "connector.class":
> > > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
> > > >
> > > >      "defaultDataset": "test",
> > > >
> > > >      "project": "xyz",
> > > >
> > > >      "topics": "md",
> > > >
> > > >      "autoCreateTables": "false",
> > > >
> > > >      "gcsBucketName": "tmp_storage_bucket",
> > > >
> > > >      "queueSize": "-1",
> > > >
> > > >      "bigQueryRetry": "0",
> > > >
> > > >      "bigQueryRetryWait": "1000",
> > > >
> > > >      "bigQueryMessageTimePartitioning": "false",
> > > >
> > > >      "bigQueryPartitionDecorator": "true",
> > > >
> > > >      "timePartitioningType": "DAY",
> > > >
> > > >      "keySource": "FILE",
> > > >
> > > >      "keyfile": "/home/hduser/xyz.json",
> > > >
> > > >      "sanitizeTopics": "false",
> > > >
> > > >      "schemaRetriever":
> > > >
> > > >
> > >
> >
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
> > > >
> > > >      "threadPoolSize": "10",
> > > >
> > > >      "allBQFieldsNullable": "false",
> > > >
> > > >      "avroDataCacheSize": "100",
> > > >
> > > >      "batchLoadIntervalSec": "120",
> > > >
> > > >      "convertDoubleSpecialValues": "false",
> > > >
> > > >      "enableBatchLoad": "false",
> > > >
> > > >      "upsertEnabled": "false",
> > > >
> > > >      "deleteEnabled": "false",
> > > >
> > > >      "mergeIntervalMs": "60_000L",
> > > >
> > > >      "mergeRecordsThreshold": "-1",
> > > >
> > > >      "autoCreateBucket": "true",
> > > >
> > > >      "allowNewBigQueryFields": "false",
> > > >
> > > >      "allowBigQueryRequiredFieldRelaxation": "false",
> > > >
> > > >      "allowSchemaUnionization": "false",
> > > >
> > > >      "kafkaDataFieldName": "null",
> > > >
> > > >      "kafkaKeyFieldName": "null"
> > > >
> > > > }
> > > >
> > > > Run as below
> > > >
> > > >
> > > > $KAFKA_HOME/bin/connect-standalone.sh \
> > > >
> > > >
> > >
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> > > >
> > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > > >
> > > > I get this error
> > > >
> > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
> > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
> > > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> > > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > > > java.util.concurrent.ExecutionException:
> > > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
> > > Connector
> > > > config {"defaultDataset"="test",,
> > > >
> > > >
> > >
> >
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> > > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
> > > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> > > > "bigQueryMessageTimePartitioning"="false",,
> > > > "connector.type"="bigquery-connector",,
> > > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> > > > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> > > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> > > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> > > > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> > > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> > > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> > > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> > > > "enableBatchLoad"="false",,
> > > >
> > > >
> > >
> >
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> > > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> > > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> > > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> > > > "kafkaDataFieldName"="null",, }=,
> > > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no
> connector
> > > type
> > > >
> > > > I think the problem is the wrong entry in the
> bigquery-sink.properties
> > > > file above.
> > > >
> > > > I cannot see what it is?
> > > >
> > > >
> > > > Any ideas appreciated.
> > > >
> > > >
> > > > Thanks
> > > >
> > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for
> > any
> > > > loss, damage or destruction of data or any other property which may
> > arise
> > > > from relying on this email's technical content is explicitly
> > disclaimed.
> > > > The author will in no case be liable for any monetary damages arising
> > > from
> > > > such loss, damage or destruction.
> > > >
> > >
> >
>

Re: Error in Kafka property file contains no connector type

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
should already have the common conf lib in there).


On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <mi...@gmail.com>
wrote:

> Thanks Liam for the suggestion.
>
> This is the redone sink file (plain text)
>
> name=bigquery-sink
> connector.type=bigquery-connector
> connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
> defaultDataset=test
> project=axial-glow-224522
> topics=md
> autoCreateTables=false
> gcsBucketName=tmp_storage_bucket
> queueSize=-1
> bigQueryRetry=0
> bigQueryRetryWait=1000
> bigQueryMessageTimePartitioning=false
> bigQueryPartitionDecorator=true
> timePartitioningType=DAY
> keySource=FILE
> keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
> sanitizeTopics=false
>
> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
> threadPoolSize=10
> allBQFieldsNullable=false
> avroDataCacheSize=100
> batchLoadIntervalSec=120
> convertDoubleSpecialValues=false
> enableBatchLoad=false
> upsertEnabled=false
> deleteEnabled=false
> mergeIntervalMs=60_000L
> mergeRecordsThreshold=-1
> autoCreateBucket=true
> allowNewBigQueryFields=false
> allowBigQueryRequiredFieldRelaxation=false
> allowSchemaUnionization=false
> kafkaDataFieldName=null
> kafkaKeyFieldName=null
>
> Now when I run the command
>
> $KAFKA_HOME/bin/connect-standalone.sh \
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>
> It comes back with this error:
>
> [2021-03-12 09:23:54,523] INFO REST server listening at
> http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
> (org.apache.kafka.connect.runtime.rest.RestServer:207)
> [2021-03-12 09:23:54,523] INFO Kafka Connect started
> (org.apache.kafka.connect.runtime.Connect:55)
> [2021-03-12 09:23:54,534] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:113)
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>         at
>
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>         at
>
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>         at
> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>         at
>
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>         at
>
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>         at
>
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at
>
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> I downloaded common-config-6.1.0.jar and added to lib directory in
>
> ..../wepay-kafka-connect-bigquery-2.1.0/lib
>
> But little joy I am afraid.
>
> Cheers,
>
> Mich
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
> liam.clarke@adscale.co.nz> wrote:
>
> > Hi Mich,
> >
> > Your bigquery-sink.properties file is in a JSON format - which won't
> work.
> > It needs to follow the usual format of a Java properties file.
> >
> > Kind regards,
> >
> > Liam Clarke-Hutchinson
> >
> > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> > mich.talebzadeh@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > > Trying to stream from Kafka to Google BigQuery.
> > >
> > >
> > >  The connect-standalone.properties is as follows
> > >
> > >
> > > key.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > > value.converter=org.apache.kafka.connect.json.JsonConverter
> > >
> > > #
> > >
> > > # Converter-specific settings can be passed in by prefixing the
> > Converter's
> > >
> > > # setting with the converter we want to apply it to
> > >
> > > key.converter.schemas.enable=true
> > >
> > > value.converter.schemas.enable=false
> > >
> > >
> > > # The internal converter used for offsets and config data is
> configurable
> > > and
> > >
> > > # must be specified, but most users will always want to use the
> built-in
> > >
> > > # default. Offset and config data is never visible outside of Kafka
> > Connect
> > > in
> > >
> > > # this format.
> > >
> > > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> > >
> > > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> > >
> > > internal.key.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > >
> >
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > > internal.key.converter.schemas.enable=false
> > >
> > > internal.value.converter.schemas.enable=false
> > >
> > >
> > > offset.storage.file.filename=/tmp/connect_bq.offsets
> > >
> > > # Flush much faster than normal, which is useful for testing/debugging
> > >
> > > offset.flush.interval.ms=10000
> > >
> > >
> > > # Set to a list of filesystem paths separated by commas (,) to enable
> > class
> > >
> > > # loading isolation for plugins (connectors, converters,
> > transformations).
> > > The
> > >
> > > # list should consist of top level directories that include any
> > combination
> > > of:
> > >
> > > # a) directories immediately containing jars with plugins and their
> > > dependencies
> > >
> > > # b) uber-jars with plugins and their dependencies
> > >
> > > # c) directories immediately containing the package directory structure
> > of
> > >
> > > # classes of plugins and their dependencies Note: symlinks will be
> > followed
> > > to
> > >
> > > # discover dependencies or plugins.
> > >
> > > # Examples:
> > >
> > > plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
> > >
> > >
> > > And bigquery-sink.properties file has this
> > >
> > >
> > > {
> > >
> > >      "name": "bigquery-sink",
> > >
> > >      "connector.type": "bigquery-connector",
> > >
> > >      "connector.class":
> > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
> > >
> > >      "defaultDataset": "test",
> > >
> > >      "project": "xyz",
> > >
> > >      "topics": "md",
> > >
> > >      "autoCreateTables": "false",
> > >
> > >      "gcsBucketName": "tmp_storage_bucket",
> > >
> > >      "queueSize": "-1",
> > >
> > >      "bigQueryRetry": "0",
> > >
> > >      "bigQueryRetryWait": "1000",
> > >
> > >      "bigQueryMessageTimePartitioning": "false",
> > >
> > >      "bigQueryPartitionDecorator": "true",
> > >
> > >      "timePartitioningType": "DAY",
> > >
> > >      "keySource": "FILE",
> > >
> > >      "keyfile": "/home/hduser/xyz.json",
> > >
> > >      "sanitizeTopics": "false",
> > >
> > >      "schemaRetriever":
> > >
> > >
> >
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
> > >
> > >      "threadPoolSize": "10",
> > >
> > >      "allBQFieldsNullable": "false",
> > >
> > >      "avroDataCacheSize": "100",
> > >
> > >      "batchLoadIntervalSec": "120",
> > >
> > >      "convertDoubleSpecialValues": "false",
> > >
> > >      "enableBatchLoad": "false",
> > >
> > >      "upsertEnabled": "false",
> > >
> > >      "deleteEnabled": "false",
> > >
> > >      "mergeIntervalMs": "60_000L",
> > >
> > >      "mergeRecordsThreshold": "-1",
> > >
> > >      "autoCreateBucket": "true",
> > >
> > >      "allowNewBigQueryFields": "false",
> > >
> > >      "allowBigQueryRequiredFieldRelaxation": "false",
> > >
> > >      "allowSchemaUnionization": "false",
> > >
> > >      "kafkaDataFieldName": "null",
> > >
> > >      "kafkaKeyFieldName": "null"
> > >
> > > }
> > >
> > > Run as below
> > >
> > >
> > > $KAFKA_HOME/bin/connect-standalone.sh \
> > >
> > >
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> > >
> > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > >
> > > I get this error
> > >
> > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
> > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
> > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > > java.util.concurrent.ExecutionException:
> > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
> > Connector
> > > config {"defaultDataset"="test",,
> > >
> > >
> >
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
> > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> > > "bigQueryMessageTimePartitioning"="false",,
> > > "connector.type"="bigquery-connector",,
> > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> > > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> > > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> > > "enableBatchLoad"="false",,
> > >
> > >
> >
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> > > "kafkaDataFieldName"="null",, }=,
> > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector
> > type
> > >
> > > I think the problem is the wrong entry in the bigquery-sink.properties
> > > file above.
> > >
> > > I cannot see what it is?
> > >
> > >
> > > Any ideas appreciated.
> > >
> > >
> > > Thanks
> > >
> > > *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any
> > > loss, damage or destruction of data or any other property which may
> arise
> > > from relying on this email's technical content is explicitly
> disclaimed.
> > > The author will in no case be liable for any monetary damages arising
> > from
> > > such loss, damage or destruction.
> > >
> >
>

Re: Error in Kafka property file contains no connector type

Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks Liam for the suggestion.

This is the redone sink file (plain text)

name=bigquery-sink
connector.type=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
defaultDataset=test
project=axial-glow-224522
topics=md
autoCreateTables=false
gcsBucketName=tmp_storage_bucket
queueSize=-1
bigQueryRetry=0
bigQueryRetryWait=1000
bigQueryMessageTimePartitioning=false
bigQueryPartitionDecorator=true
timePartitioningType=DAY
keySource=FILE
keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
sanitizeTopics=false
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSize=10
allBQFieldsNullable=false
avroDataCacheSize=100
batchLoadIntervalSec=120
convertDoubleSpecialValues=false
enableBatchLoad=false
upsertEnabled=false
deleteEnabled=false
mergeIntervalMs=60_000L
mergeRecordsThreshold=-1
autoCreateBucket=true
allowNewBigQueryFields=false
allowBigQueryRequiredFieldRelaxation=false
allowSchemaUnionization=false
kafkaDataFieldName=null
kafkaKeyFieldName=null

Now when I run the command

$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties

It comes back with this error:

[2021-03-12 09:23:54,523] INFO REST server listening at
http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
(org.apache.kafka.connect.runtime.rest.RestServer:207)
[2021-03-12 09:23:54,523] INFO Kafka Connect started
(org.apache.kafka.connect.runtime.Connect:55)
[2021-03-12 09:23:54,534] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.lang.NoClassDefFoundError:
org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
        at
com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
        at
com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
        at
org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
        at
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
        at
org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
        at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at
org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

I downloaded common-config-6.1.0.jar and added to lib directory in

..../wepay-kafka-connect-bigquery-2.1.0/lib

But little joy I am afraid.

Cheers,

Mich



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
liam.clarke@adscale.co.nz> wrote:

> Hi Mich,
>
> Your bigquery-sink.properties file is in a JSON format - which won't work.
> It needs to follow the usual format of a Java properties file.
>
> Kind regards,
>
> Liam Clarke-Hutchinson
>
> On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> mich.talebzadeh@gmail.com>
> wrote:
>
> > Hi,
> >
> >
> > Trying to stream from Kafka to Google BigQuery.
> >
> >
> >  The connect-standalone.properties is as follows
> >
> >
> > key.converter=org.apache.kafka.connect.storage.StringConverter
> >
> > ##value.converter=org.apache.kafka.connect.storage.StringConverter
> >
> > value.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > #
> >
> > # Converter-specific settings can be passed in by prefixing the
> Converter's
> >
> > # setting with the converter we want to apply it to
> >
> > key.converter.schemas.enable=true
> >
> > value.converter.schemas.enable=false
> >
> >
> > # The internal converter used for offsets and config data is configurable
> > and
> >
> > # must be specified, but most users will always want to use the built-in
> >
> > # default. Offset and config data is never visible outside of Kafka
> Connect
> > in
> >
> > # this format.
> >
> > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > internal.key.converter=org.apache.kafka.connect.storage.StringConverter
> >
> >
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
> >
> > internal.key.converter.schemas.enable=false
> >
> > internal.value.converter.schemas.enable=false
> >
> >
> > offset.storage.file.filename=/tmp/connect_bq.offsets
> >
> > # Flush much faster than normal, which is useful for testing/debugging
> >
> > offset.flush.interval.ms=10000
> >
> >
> > # Set to a list of filesystem paths separated by commas (,) to enable
> class
> >
> > # loading isolation for plugins (connectors, converters,
> transformations).
> > The
> >
> > # list should consist of top level directories that include any
> combination
> > of:
> >
> > # a) directories immediately containing jars with plugins and their
> > dependencies
> >
> > # b) uber-jars with plugins and their dependencies
> >
> > # c) directories immediately containing the package directory structure
> of
> >
> > # classes of plugins and their dependencies Note: symlinks will be
> followed
> > to
> >
> > # discover dependencies or plugins.
> >
> > # Examples:
> >
> > plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
> >
> >
> > And bigquery-sink.properties file has this
> >
> >
> > {
> >
> >      "name": "bigquery-sink",
> >
> >      "connector.type": "bigquery-connector",
> >
> >      "connector.class":
> > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
> >
> >      "defaultDataset": "test",
> >
> >      "project": "xyz",
> >
> >      "topics": "md",
> >
> >      "autoCreateTables": "false",
> >
> >      "gcsBucketName": "tmp_storage_bucket",
> >
> >      "queueSize": "-1",
> >
> >      "bigQueryRetry": "0",
> >
> >      "bigQueryRetryWait": "1000",
> >
> >      "bigQueryMessageTimePartitioning": "false",
> >
> >      "bigQueryPartitionDecorator": "true",
> >
> >      "timePartitioningType": "DAY",
> >
> >      "keySource": "FILE",
> >
> >      "keyfile": "/home/hduser/xyz.json",
> >
> >      "sanitizeTopics": "false",
> >
> >      "schemaRetriever":
> >
> >
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
> >
> >      "threadPoolSize": "10",
> >
> >      "allBQFieldsNullable": "false",
> >
> >      "avroDataCacheSize": "100",
> >
> >      "batchLoadIntervalSec": "120",
> >
> >      "convertDoubleSpecialValues": "false",
> >
> >      "enableBatchLoad": "false",
> >
> >      "upsertEnabled": "false",
> >
> >      "deleteEnabled": "false",
> >
> >      "mergeIntervalMs": "60_000L",
> >
> >      "mergeRecordsThreshold": "-1",
> >
> >      "autoCreateBucket": "true",
> >
> >      "allowNewBigQueryFields": "false",
> >
> >      "allowBigQueryRequiredFieldRelaxation": "false",
> >
> >      "allowSchemaUnionization": "false",
> >
> >      "kafkaDataFieldName": "null",
> >
> >      "kafkaKeyFieldName": "null"
> >
> > }
> >
> > Run as below
> >
> >
> > $KAFKA_HOME/bin/connect-standalone.sh \
> >
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> >
> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> >
> > I get this error
> >
> > [2021-03-11 11:07:58,826] ERROR Failed to create job for
> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > (org.apache.kafka.connect.cli.ConnectStandalone:102)
> > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > java.util.concurrent.ExecutionException:
> > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
> Connector
> > config {"defaultDataset"="test",,
> >
> >
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> > "project"="axial-glow-224522",, "autoCreateTables"="false",,
> > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> > "bigQueryMessageTimePartitioning"="false",,
> > "connector.type"="bigquery-connector",,
> > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> > "enableBatchLoad"="false",,
> >
> >
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> > "kafkaDataFieldName"="null",, }=,
> > "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector
> type
> >
> > I think the problem is the wrong entry in the bigquery-sink.properties
> > file above.
> >
> > I cannot see what it is?
> >
> >
> > Any ideas appreciated.
> >
> >
> > Thanks
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
> >
>

Re: Error in Kafka property file contains no connector type

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi Mich,

Your bigquery-sink.properties file is in a JSON format - which won't work.
It needs to follow the usual format of a Java properties file.

Kind regards,

Liam Clarke-Hutchinson

On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Hi,
>
>
> Trying to stream from Kafka to Google BigQuery.
>
>
>  The connect-standalone.properties is as follows
>
>
> key.converter=org.apache.kafka.connect.storage.StringConverter
>
> ##value.converter=org.apache.kafka.connect.storage.StringConverter
>
> value.converter=org.apache.kafka.connect.json.JsonConverter
>
> #
>
> # Converter-specific settings can be passed in by prefixing the Converter's
>
> # setting with the converter we want to apply it to
>
> key.converter.schemas.enable=true
>
> value.converter.schemas.enable=false
>
>
> # The internal converter used for offsets and config data is configurable
> and
>
> # must be specified, but most users will always want to use the built-in
>
> # default. Offset and config data is never visible outside of Kafka Connect
> in
>
> # this format.
>
> ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>
> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>
> internal.key.converter.schemas.enable=false
>
> internal.value.converter.schemas.enable=false
>
>
> offset.storage.file.filename=/tmp/connect_bq.offsets
>
> # Flush much faster than normal, which is useful for testing/debugging
>
> offset.flush.interval.ms=10000
>
>
> # Set to a list of filesystem paths separated by commas (,) to enable class
>
> # loading isolation for plugins (connectors, converters, transformations).
> The
>
> # list should consist of top level directories that include any combination
> of:
>
> # a) directories immediately containing jars with plugins and their
> dependencies
>
> # b) uber-jars with plugins and their dependencies
>
> # c) directories immediately containing the package directory structure of
>
> # classes of plugins and their dependencies Note: symlinks will be followed
> to
>
> # discover dependencies or plugins.
>
> # Examples:
>
> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>
>
> And bigquery-sink.properties file has this
>
>
> {
>
>      "name": "bigquery-sink",
>
>      "connector.type": "bigquery-connector",
>
>      "connector.class":
> "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
>
>      "defaultDataset": "test",
>
>      "project": "xyz",
>
>      "topics": "md",
>
>      "autoCreateTables": "false",
>
>      "gcsBucketName": "tmp_storage_bucket",
>
>      "queueSize": "-1",
>
>      "bigQueryRetry": "0",
>
>      "bigQueryRetryWait": "1000",
>
>      "bigQueryMessageTimePartitioning": "false",
>
>      "bigQueryPartitionDecorator": "true",
>
>      "timePartitioningType": "DAY",
>
>      "keySource": "FILE",
>
>      "keyfile": "/home/hduser/xyz.json",
>
>      "sanitizeTopics": "false",
>
>      "schemaRetriever":
>
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
>
>      "threadPoolSize": "10",
>
>      "allBQFieldsNullable": "false",
>
>      "avroDataCacheSize": "100",
>
>      "batchLoadIntervalSec": "120",
>
>      "convertDoubleSpecialValues": "false",
>
>      "enableBatchLoad": "false",
>
>      "upsertEnabled": "false",
>
>      "deleteEnabled": "false",
>
>      "mergeIntervalMs": "60_000L",
>
>      "mergeRecordsThreshold": "-1",
>
>      "autoCreateBucket": "true",
>
>      "allowNewBigQueryFields": "false",
>
>      "allowBigQueryRequiredFieldRelaxation": "false",
>
>      "allowSchemaUnionization": "false",
>
>      "kafkaDataFieldName": "null",
>
>      "kafkaKeyFieldName": "null"
>
> }
>
> Run as below
>
>
> $KAFKA_HOME/bin/connect-standalone.sh \
>
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>
> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>
> I get this error
>
> [2021-03-11 11:07:58,826] ERROR Failed to create job for
> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> (org.apache.kafka.connect.cli.ConnectStandalone:102)
> [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:113)
> java.util.concurrent.ExecutionException:
> org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
> config {"defaultDataset"="test",,
>
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> "project"="axial-glow-224522",, "autoCreateTables"="false",,
> "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> "bigQueryMessageTimePartitioning"="false",,
> "connector.type"="bigquery-connector",,
> "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> "enableBatchLoad"="false",,
>
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> "kafkaDataFieldName"="null",, }=,
> "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector type
>
> I think the problem is the wrong entry in the bigquery-sink.properties
> file above.
>
> I cannot see what it is?
>
>
> Any ideas appreciated.
>
>
> Thanks
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>