You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2022/04/01 21:00:35 UTC

Fwd: Error in Kafka property file contains no connector type

@Aakarsh Gopi, check this email


--------- Forwarded message ---------
From: Mich Talebzadeh <mi...@gmail.com>
Date: Sun, 14 Mar 2021 at 19:44
Subject: Re: Error in Kafka property file contains no connector type
To: <us...@kafka.apache.org>


Thanks all.

I was using an older Kafka version.

I upgraded Kafka in the cluster from kafka_2.12-1.1.0 to the latest stable
version kafka_2.12-2.7.0. I also upgraded zookeeper from
zookeeper-3.4.6 to apache-zookeeper-3.6.2-bin
version.

In addition in the run file I added the following:


#!/bin/ksh
unset CLASSPATH
export
CLASSPATH=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins:$KAFKA_HOME/libs
$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties

where the first part of CLASSPATH comes from plugins

plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins

This solved the issue.

Mich


LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 12 Mar 2021 at 20:33, Mich Talebzadeh <mi...@gmail.com>
wrote:

> Ok this try to stream kafka topic to BigQuery has moved on but I am now
> getting this error
>
> [2021-03-12 20:17:41,870] ERROR Stopping due to error
> (org.apache.kafka.connect.cli.ConnectStandalone:122)
> j*ava.lang.NoSuchMethodError:
> org.apache.kafka.common.acl.AclBindingFilter.<init>(Lorg/apache/kafka/common/resource/ResourcePatternFilter;Lorg/apache/kafka/common/acl/AccessControlEntryFilter;)V*
>         at
> org.apache.kafka.connect.mirror.MirrorSourceConnector.<clinit>(MirrorSourceConnector.java:67)
>         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>         at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>         at java.lang.Class.newInstance(Class.java:442)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:302)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:280)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:208)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:177)
>         at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:154)
>         at
> org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:56)
>         at
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
>
> I googled it but could not get much.
>
>
> Any ideas please.
>
>
> Thanks
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 14:44, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Thanks again Liam.
>>
>> This is the error
>>
>> [2021-03-12 14:17:54,670] ERROR Stopping after connector error
>> (org.apache.kafka.connect.cli.ConnectStandalone:113)
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>>         at
>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>>        at
>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>>         at
>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>>         at
>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>>         at
>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>>         at
>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>> Caused by: java.lang.ClassNotFoundException:
>> *org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*
>>
>> It is indeed under $KAFKA_HOME/libs
>>
>>
>> cd $KAFKA_HOME/libs
>>
>>  grep -lRi org.apache.kafka.common.config.ConfigDef
>>
>> kafka-clients-1.1.0.jar
>>
>> So it is in kafka-clients-1.1.0.jar file
>>
>>
>> I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
>> copied over the file to
>> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib
>>
>>
>>
>> grep -lRi
>> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>>
>>
>> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar
>>
>> Still cannot find it!
>>
>> In my plugin I have specified
>>
>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>>
>>
>> So not sure where it is looking
>>
>>
>>
>>
>> Regards,
>>
>>
>> Mich
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
>> liam.clarke@adscale.co.nz> wrote:
>>
>>> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
>>> should already have the common conf lib in there).
>>>
>>>
>>> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <
>>> mich.talebzadeh@gmail.com>
>>> wrote:
>>>
>>> > Thanks Liam for the suggestion.
>>> >
>>> > This is the redone sink file (plain text)
>>> >
>>> > name=bigquery-sink
>>> > connector.type=bigquery-connector
>>> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
>>> > defaultDataset=test
>>> > project=axial-glow-224522
>>> > topics=md
>>> > autoCreateTables=false
>>> > gcsBucketName=tmp_storage_bucket
>>> > queueSize=-1
>>> > bigQueryRetry=0
>>> > bigQueryRetryWait=1000
>>> > bigQueryMessageTimePartitioning=false
>>> > bigQueryPartitionDecorator=true
>>> > timePartitioningType=DAY
>>> > keySource=FILE
>>> > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
>>> > sanitizeTopics=false
>>> >
>>> >
>>> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
>>> > threadPoolSize=10
>>> > allBQFieldsNullable=false
>>> > avroDataCacheSize=100
>>> > batchLoadIntervalSec=120
>>> > convertDoubleSpecialValues=false
>>> > enableBatchLoad=false
>>> > upsertEnabled=false
>>> > deleteEnabled=false
>>> > mergeIntervalMs=60_000L
>>> > mergeRecordsThreshold=-1
>>> > autoCreateBucket=true
>>> > allowNewBigQueryFields=false
>>> > allowBigQueryRequiredFieldRelaxation=false
>>> > allowSchemaUnionization=false
>>> > kafkaDataFieldName=null
>>> > kafkaKeyFieldName=null
>>> >
>>> > Now when I run the command
>>> >
>>> > $KAFKA_HOME/bin/connect-standalone.sh \
>>> >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>>> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> >
>>> > It comes back with this error:
>>> >
>>> > [2021-03-12 09:23:54,523] INFO REST server listening at
>>> > http://50.140.197.220:8083/, advertising URL
>>> http://50.140.197.220:8083/
>>> > (org.apache.kafka.connect.runtime.rest.RestServer:207)
>>> > [2021-03-12 09:23:54,523] INFO Kafka Connect started
>>> > (org.apache.kafka.connect.runtime.Connect:55)
>>> > [2021-03-12 09:23:54,534] ERROR Stopping after connector error
>>> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>>> > java.lang.NoClassDefFoundError:
>>> > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>>> >         at
>>> >
>>> >
>>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>>> >         at
>>> >
>>> >
>>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>>> >         at
>>> >
>>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>>> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> >         at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
>>> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> >
>>> > I downloaded common-config-6.1.0.jar and added to lib directory in
>>> >
>>> > ..../wepay-kafka-connect-bigquery-2.1.0/lib
>>> >
>>> > But little joy I am afraid.
>>> >
>>> > Cheers,
>>> >
>>> > Mich
>>> >
>>> >
>>> >
>>> > LinkedIn *
>>> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> > <
>>> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> > >*
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any
>>> > loss, damage or destruction of data or any other property which may
>>> arise
>>> > from relying on this email's technical content is explicitly
>>> disclaimed.
>>> > The author will in no case be liable for any monetary damages arising
>>> from
>>> > such loss, damage or destruction.
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
>>> > liam.clarke@adscale.co.nz> wrote:
>>> >
>>> > > Hi Mich,
>>> > >
>>> > > Your bigquery-sink.properties file is in a JSON format - which won't
>>> > work.
>>> > > It needs to follow the usual format of a Java properties file.
>>> > >
>>> > > Kind regards,
>>> > >
>>> > > Liam Clarke-Hutchinson
>>> > >
>>> > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
>>> > > mich.talebzadeh@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi,
>>> > > >
>>> > > >
>>> > > > Trying to stream from Kafka to Google BigQuery.
>>> > > >
>>> > > >
>>> > > >  The connect-standalone.properties is as follows
>>> > > >
>>> > > >
>>> > > > key.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > value.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > > #
>>> > > >
>>> > > > # Converter-specific settings can be passed in by prefixing the
>>> > > Converter's
>>> > > >
>>> > > > # setting with the converter we want to apply it to
>>> > > >
>>> > > > key.converter.schemas.enable=true
>>> > > >
>>> > > > value.converter.schemas.enable=false
>>> > > >
>>> > > >
>>> > > > # The internal converter used for offsets and config data is
>>> > configurable
>>> > > > and
>>> > > >
>>> > > > # must be specified, but most users will always want to use the
>>> > built-in
>>> > > >
>>> > > > # default. Offset and config data is never visible outside of Kafka
>>> > > Connect
>>> > > > in
>>> > > >
>>> > > > # this format.
>>> > > >
>>> > > >
>>> ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > >
>>> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > >
>>> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > >
>>> > >
>>> >
>>> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > internal.key.converter.schemas.enable=false
>>> > > >
>>> > > > internal.value.converter.schemas.enable=false
>>> > > >
>>> > > >
>>> > > > offset.storage.file.filename=/tmp/connect_bq.offsets
>>> > > >
>>> > > > # Flush much faster than normal, which is useful for
>>> testing/debugging
>>> > > >
>>> > > > offset.flush.interval.ms=10000
>>> > > >
>>> > > >
>>> > > > # Set to a list of filesystem paths separated by commas (,) to
>>> enable
>>> > > class
>>> > > >
>>> > > > # loading isolation for plugins (connectors, converters,
>>> > > transformations).
>>> > > > The
>>> > > >
>>> > > > # list should consist of top level directories that include any
>>> > > combination
>>> > > > of:
>>> > > >
>>> > > > # a) directories immediately containing jars with plugins and their
>>> > > > dependencies
>>> > > >
>>> > > > # b) uber-jars with plugins and their dependencies
>>> > > >
>>> > > > # c) directories immediately containing the package directory
>>> structure
>>> > > of
>>> > > >
>>> > > > # classes of plugins and their dependencies Note: symlinks will be
>>> > > followed
>>> > > > to
>>> > > >
>>> > > > # discover dependencies or plugins.
>>> > > >
>>> > > > # Examples:
>>> > > >
>>> > > >
>>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>>> > > >
>>> > > >
>>> > > > And bigquery-sink.properties file has this
>>> > > >
>>> > > >
>>> > > > {
>>> > > >
>>> > > >      "name": "bigquery-sink",
>>> > > >
>>> > > >      "connector.type": "bigquery-connector",
>>> > > >
>>> > > >      "connector.class":
>>> > > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
>>> > > >
>>> > > >      "defaultDataset": "test",
>>> > > >
>>> > > >      "project": "xyz",
>>> > > >
>>> > > >      "topics": "md",
>>> > > >
>>> > > >      "autoCreateTables": "false",
>>> > > >
>>> > > >      "gcsBucketName": "tmp_storage_bucket",
>>> > > >
>>> > > >      "queueSize": "-1",
>>> > > >
>>> > > >      "bigQueryRetry": "0",
>>> > > >
>>> > > >      "bigQueryRetryWait": "1000",
>>> > > >
>>> > > >      "bigQueryMessageTimePartitioning": "false",
>>> > > >
>>> > > >      "bigQueryPartitionDecorator": "true",
>>> > > >
>>> > > >      "timePartitioningType": "DAY",
>>> > > >
>>> > > >      "keySource": "FILE",
>>> > > >
>>> > > >      "keyfile": "/home/hduser/xyz.json",
>>> > > >
>>> > > >      "sanitizeTopics": "false",
>>> > > >
>>> > > >      "schemaRetriever":
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
>>> > > >
>>> > > >      "threadPoolSize": "10",
>>> > > >
>>> > > >      "allBQFieldsNullable": "false",
>>> > > >
>>> > > >      "avroDataCacheSize": "100",
>>> > > >
>>> > > >      "batchLoadIntervalSec": "120",
>>> > > >
>>> > > >      "convertDoubleSpecialValues": "false",
>>> > > >
>>> > > >      "enableBatchLoad": "false",
>>> > > >
>>> > > >      "upsertEnabled": "false",
>>> > > >
>>> > > >      "deleteEnabled": "false",
>>> > > >
>>> > > >      "mergeIntervalMs": "60_000L",
>>> > > >
>>> > > >      "mergeRecordsThreshold": "-1",
>>> > > >
>>> > > >      "autoCreateBucket": "true",
>>> > > >
>>> > > >      "allowNewBigQueryFields": "false",
>>> > > >
>>> > > >      "allowBigQueryRequiredFieldRelaxation": "false",
>>> > > >
>>> > > >      "allowSchemaUnionization": "false",
>>> > > >
>>> > > >      "kafkaDataFieldName": "null",
>>> > > >
>>> > > >      "kafkaKeyFieldName": "null"
>>> > > >
>>> > > > }
>>> > > >
>>> > > > Run as below
>>> > > >
>>> > > >
>>> > > > $KAFKA_HOME/bin/connect-standalone.sh \
>>> > > >
>>> > > >
>>> > >
>>> >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>>> > > >
>>> > > >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> > > >
>>> > > > I get this error
>>> > > >
>>> > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
>>> > > >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
>>> > > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
>>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>>> > > > java.util.concurrent.ExecutionException:
>>> > > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
>>> > > Connector
>>> > > > config {"defaultDataset"="test",,
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
>>> > > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
>>> > > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
>>> > > > "bigQueryMessageTimePartitioning"="false",,
>>> > > > "connector.type"="bigquery-connector",,
>>> > > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
>>> > > > "mergeIntervalMs"="60_000L",,
>>> "convertDoubleSpecialValues"="false",,
>>> > > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
>>> > > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
>>> > > > "topics"="md",, "bigQueryRetry"="0",,
>>> "allBQFieldsNullable"="false",,
>>> > > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
>>> > > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
>>> > > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
>>> > > > "enableBatchLoad"="false",,
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
>>> > > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
>>> > > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
>>> > > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
>>> > > > "kafkaDataFieldName"="null",, }=,
>>> > > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no
>>> connector
>>> > > type
>>> > > >
>>> > > > I think the problem is the wrong entry in the
>>> bigquery-sink.properties
>>> > > > file above.
>>> > > >
>>> > > > I cannot see what it is?
>>> > > >
>>> > > >
>>> > > > Any ideas appreciated.
>>> > > >
>>> > > >
>>> > > > Thanks
>>> > > >
>>> > > > *Disclaimer:* Use it at your own risk. Any and all responsibility
>>> for
>>> > any
>>> > > > loss, damage or destruction of data or any other property which may
>>> > arise
>>> > > > from relying on this email's technical content is explicitly
>>> > disclaimed.
>>> > > > The author will in no case be liable for any monetary damages
>>> arising
>>> > > from
>>> > > > such loss, damage or destruction.
>>> > > >
>>> > >
>>> >
>>>
>>