You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2021/03/11 11:12:35 UTC
Error in Kafka property file contains no connector type
Hi,
Trying to stream from Kafka to Google BigQuery.
The connect-standalone.properties is as follows
key.converter=org.apache.kafka.connect.storage.StringConverter
##value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#
# Converter-specific settings can be passed in by prefixing the Converter's
# setting with the converter we want to apply it to
key.converter.schemas.enable=true
value.converter.schemas.enable=false
# The internal converter used for offsets and config data is configurable
and
# must be specified, but most users will always want to use the built-in
# default. Offset and config data is never visible outside of Kafka Connect
in
# this format.
##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect_bq.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class
# loading isolation for plugins (connectors, converters, transformations).
The
# list should consist of top level directories that include any combination
of:
# a) directories immediately containing jars with plugins and their
dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of
# classes of plugins and their dependencies Note: symlinks will be followed
to
# discover dependencies or plugins.
# Examples:
plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
And bigquery-sink.properties file has this
{
"name": "bigquery-sink",
"connector.type": "bigquery-connector",
"connector.class":
"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"defaultDataset": "test",
"project": "xyz",
"topics": "md",
"autoCreateTables": "false",
"gcsBucketName": "tmp_storage_bucket",
"queueSize": "-1",
"bigQueryRetry": "0",
"bigQueryRetryWait": "1000",
"bigQueryMessageTimePartitioning": "false",
"bigQueryPartitionDecorator": "true",
"timePartitioningType": "DAY",
"keySource": "FILE",
"keyfile": "/home/hduser/xyz.json",
"sanitizeTopics": "false",
"schemaRetriever":
"com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
"threadPoolSize": "10",
"allBQFieldsNullable": "false",
"avroDataCacheSize": "100",
"batchLoadIntervalSec": "120",
"convertDoubleSpecialValues": "false",
"enableBatchLoad": "false",
"upsertEnabled": "false",
"deleteEnabled": "false",
"mergeIntervalMs": "60_000L",
"mergeRecordsThreshold": "-1",
"autoCreateBucket": "true",
"allowNewBigQueryFields": "false",
"allowBigQueryRequiredFieldRelaxation": "false",
"allowSchemaUnionization": "false",
"kafkaDataFieldName": "null",
"kafkaKeyFieldName": "null"
}
Run as below
$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
I get this error
[2021-03-11 11:07:58,826] ERROR Failed to create job for
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
(org.apache.kafka.connect.cli.ConnectStandalone:102)
[2021-03-11 11:07:58,826] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException:
org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
config {"defaultDataset"="test",,
"schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
"project"="axial-glow-224522",, "autoCreateTables"="false",,
"deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
"bigQueryMessageTimePartitioning"="false",,
"connector.type"="bigquery-connector",,
"gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
"mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
"kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
"keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
"topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
"keySource"="FILE",, "allowNewBigQueryFields"="false",,
"bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
"threadPoolSize"="10",, "timePartitioningType"="DAY",,
"enableBatchLoad"="false",,
"connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
"mergeRecordsThreshold"="-1",, "queueSize"="-1",,
"batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
"avroDataCacheSize"="100",, "upsertEnabled"="false",,
"kafkaDataFieldName"="null",, }=,
"allowBigQueryRequiredFieldRelaxation"="false",} contains no connector type
I think the problem is the wrong entry in the bigquery-sink.properties
file above.
I cannot see what it is?
Any ideas appreciated.
Thanks
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
Fwd: Error in Kafka property file contains no connector type
Posted by Mich Talebzadeh <mi...@gmail.com>.
@Aakarsh Gopi, check this email
--------- Forwarded message ---------
From: Mich Talebzadeh <mi...@gmail.com>
Date: Sun, 14 Mar 2021 at 19:44
Subject: Re: Error in Kafka property file contains no connector type
To: <us...@kafka.apache.org>
Thanks all.
I was using an older Kafka version.
I upgraded Kafka in the cluster from kafka_2.12-1.1.0 to the latest stable
version kafka_2.12-2.7.0. I also upgraded zookeeper from
zookeeper-3.4.6 to apache-zookeeper-3.6.2-bin
version.
In addition in the run file I added the following:
#!/bin/ksh
unset CLASSPATH
export
CLASSPATH=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins:$KAFKA_HOME/libs
$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
where the first part of CLASSPATH comes from plugins
plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
This solved the issue.
Mich
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Fri, 12 Mar 2021 at 20:33, Mich Talebzadeh <mi...@gmail.com>
wrote:
> Ok this try to stream kafka topic to BigQuery has moved on but I am now
> getting this error
>
> [2021-03-12 20:17:41,870] ERROR Stopping due to error
> (org.apache.kafka.connect.cli.ConnectStandalone:122)
> j*ava.lang.NoSuchMethodError:
> org.apache.kafka.common.acl.AclBindingFilter.<init>(Lorg/apache/kafka/common/resource/ResourcePatternFilter;Lorg/apache/kafka/common/acl/AccessControlEntryFilter;)V*
> at
> org.apache.kafka.connect.mirror.MirrorSourceConnector.<clinit>(MirrorSourceConnector.java:67)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:302)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:280)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:208)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:177)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:154)
> at
> org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:56)
> at
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
>
> I googled it but could not get much.
>
>
> Any ideas please.
>
>
> Thanks
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 14:44, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Thanks again Liam.
>>
>> This is the error
>>
>> [2021-03-12 14:17:54,670] ERROR Stopping after connector error
>> (org.apache.kafka.connect.cli.ConnectStandalone:113)
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>> at
>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>> at
>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>> at
>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>> at
>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>> at
>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>> at
>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>> Caused by: java.lang.ClassNotFoundException:
>> *org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*
>>
>> It is indeed under $KAFKA_HOME/libs
>>
>>
>> cd $KAFKA_HOME/libs
>>
>> grep -lRi org.apache.kafka.common.config.ConfigDef
>>
>> kafka-clients-1.1.0.jar
>>
>> So it is in kafka-clients-1.1.0.jar file
>>
>>
>> I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
>> copied over the file to
>> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib
>>
>>
>>
>> grep -lRi
>> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>>
>>
>> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar
>>
>> Still cannot find it!
>>
>> In my plugin I have specified
>>
>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>>
>>
>> So not sure where it is looking
>>
>>
>>
>>
>> Regards,
>>
>>
>> Mich
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
>> liam.clarke@adscale.co.nz> wrote:
>>
>>> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
>>> should already have the common conf lib in there).
>>>
>>>
>>> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <
>>> mich.talebzadeh@gmail.com>
>>> wrote:
>>>
>>> > Thanks Liam for the suggestion.
>>> >
>>> > This is the redone sink file (plain text)
>>> >
>>> > name=bigquery-sink
>>> > connector.type=bigquery-connector
>>> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
>>> > defaultDataset=test
>>> > project=axial-glow-224522
>>> > topics=md
>>> > autoCreateTables=false
>>> > gcsBucketName=tmp_storage_bucket
>>> > queueSize=-1
>>> > bigQueryRetry=0
>>> > bigQueryRetryWait=1000
>>> > bigQueryMessageTimePartitioning=false
>>> > bigQueryPartitionDecorator=true
>>> > timePartitioningType=DAY
>>> > keySource=FILE
>>> > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
>>> > sanitizeTopics=false
>>> >
>>> >
>>> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
>>> > threadPoolSize=10
>>> > allBQFieldsNullable=false
>>> > avroDataCacheSize=100
>>> > batchLoadIntervalSec=120
>>> > convertDoubleSpecialValues=false
>>> > enableBatchLoad=false
>>> > upsertEnabled=false
>>> > deleteEnabled=false
>>> > mergeIntervalMs=60_000L
>>> > mergeRecordsThreshold=-1
>>> > autoCreateBucket=true
>>> > allowNewBigQueryFields=false
>>> > allowBigQueryRequiredFieldRelaxation=false
>>> > allowSchemaUnionization=false
>>> > kafkaDataFieldName=null
>>> > kafkaKeyFieldName=null
>>> >
>>> > Now when I run the command
>>> >
>>> > $KAFKA_HOME/bin/connect-standalone.sh \
>>> >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>>> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> >
>>> > It comes back with this error:
>>> >
>>> > [2021-03-12 09:23:54,523] INFO REST server listening at
>>> > http://50.140.197.220:8083/, advertising URL
>>> http://50.140.197.220:8083/
>>> > (org.apache.kafka.connect.runtime.rest.RestServer:207)
>>> > [2021-03-12 09:23:54,523] INFO Kafka Connect started
>>> > (org.apache.kafka.connect.runtime.Connect:55)
>>> > [2021-03-12 09:23:54,534] ERROR Stopping after connector error
>>> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>>> > java.lang.NoClassDefFoundError:
>>> > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>>> > at
>>> >
>>> >
>>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>>> > at
>>> >
>>> >
>>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>>> > at
>>> >
>>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>>> > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> >
>>> > I downloaded common-config-6.1.0.jar and added to lib directory in
>>> >
>>> > ..../wepay-kafka-connect-bigquery-2.1.0/lib
>>> >
>>> > But little joy I am afraid.
>>> >
>>> > Cheers,
>>> >
>>> > Mich
>>> >
>>> >
>>> >
>>> > LinkedIn *
>>> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> > <
>>> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> > >*
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any
>>> > loss, damage or destruction of data or any other property which may
>>> arise
>>> > from relying on this email's technical content is explicitly
>>> disclaimed.
>>> > The author will in no case be liable for any monetary damages arising
>>> from
>>> > such loss, damage or destruction.
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
>>> > liam.clarke@adscale.co.nz> wrote:
>>> >
>>> > > Hi Mich,
>>> > >
>>> > > Your bigquery-sink.properties file is in a JSON format - which won't
>>> > work.
>>> > > It needs to follow the usual format of a Java properties file.
>>> > >
>>> > > Kind regards,
>>> > >
>>> > > Liam Clarke-Hutchinson
>>> > >
>>> > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
>>> > > mich.talebzadeh@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi,
>>> > > >
>>> > > >
>>> > > > Trying to stream from Kafka to Google BigQuery.
>>> > > >
>>> > > >
>>> > > > The connect-standalone.properties is as follows
>>> > > >
>>> > > >
>>> > > > key.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > value.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > > #
>>> > > >
>>> > > > # Converter-specific settings can be passed in by prefixing the
>>> > > Converter's
>>> > > >
>>> > > > # setting with the converter we want to apply it to
>>> > > >
>>> > > > key.converter.schemas.enable=true
>>> > > >
>>> > > > value.converter.schemas.enable=false
>>> > > >
>>> > > >
>>> > > > # The internal converter used for offsets and config data is
>>> > configurable
>>> > > > and
>>> > > >
>>> > > > # must be specified, but most users will always want to use the
>>> > built-in
>>> > > >
>>> > > > # default. Offset and config data is never visible outside of Kafka
>>> > > Connect
>>> > > > in
>>> > > >
>>> > > > # this format.
>>> > > >
>>> > > >
>>> ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > >
>>> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > >
>>> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > >
>>> > >
>>> >
>>> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > internal.key.converter.schemas.enable=false
>>> > > >
>>> > > > internal.value.converter.schemas.enable=false
>>> > > >
>>> > > >
>>> > > > offset.storage.file.filename=/tmp/connect_bq.offsets
>>> > > >
>>> > > > # Flush much faster than normal, which is useful for
>>> testing/debugging
>>> > > >
>>> > > > offset.flush.interval.ms=10000
>>> > > >
>>> > > >
>>> > > > # Set to a list of filesystem paths separated by commas (,) to
>>> enable
>>> > > class
>>> > > >
>>> > > > # loading isolation for plugins (connectors, converters,
>>> > > transformations).
>>> > > > The
>>> > > >
>>> > > > # list should consist of top level directories that include any
>>> > > combination
>>> > > > of:
>>> > > >
>>> > > > # a) directories immediately containing jars with plugins and their
>>> > > > dependencies
>>> > > >
>>> > > > # b) uber-jars with plugins and their dependencies
>>> > > >
>>> > > > # c) directories immediately containing the package directory
>>> structure
>>> > > of
>>> > > >
>>> > > > # classes of plugins and their dependencies Note: symlinks will be
>>> > > followed
>>> > > > to
>>> > > >
>>> > > > # discover dependencies or plugins.
>>> > > >
>>> > > > # Examples:
>>> > > >
>>> > > >
>>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>>> > > >
>>> > > >
>>> > > > And bigquery-sink.properties file has this
>>> > > >
>>> > > >
>>> > > > {
>>> > > >
>>> > > > "name": "bigquery-sink",
>>> > > >
>>> > > > "connector.type": "bigquery-connector",
>>> > > >
>>> > > > "connector.class":
>>> > > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
>>> > > >
>>> > > > "defaultDataset": "test",
>>> > > >
>>> > > > "project": "xyz",
>>> > > >
>>> > > > "topics": "md",
>>> > > >
>>> > > > "autoCreateTables": "false",
>>> > > >
>>> > > > "gcsBucketName": "tmp_storage_bucket",
>>> > > >
>>> > > > "queueSize": "-1",
>>> > > >
>>> > > > "bigQueryRetry": "0",
>>> > > >
>>> > > > "bigQueryRetryWait": "1000",
>>> > > >
>>> > > > "bigQueryMessageTimePartitioning": "false",
>>> > > >
>>> > > > "bigQueryPartitionDecorator": "true",
>>> > > >
>>> > > > "timePartitioningType": "DAY",
>>> > > >
>>> > > > "keySource": "FILE",
>>> > > >
>>> > > > "keyfile": "/home/hduser/xyz.json",
>>> > > >
>>> > > > "sanitizeTopics": "false",
>>> > > >
>>> > > > "schemaRetriever":
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
>>> > > >
>>> > > > "threadPoolSize": "10",
>>> > > >
>>> > > > "allBQFieldsNullable": "false",
>>> > > >
>>> > > > "avroDataCacheSize": "100",
>>> > > >
>>> > > > "batchLoadIntervalSec": "120",
>>> > > >
>>> > > > "convertDoubleSpecialValues": "false",
>>> > > >
>>> > > > "enableBatchLoad": "false",
>>> > > >
>>> > > > "upsertEnabled": "false",
>>> > > >
>>> > > > "deleteEnabled": "false",
>>> > > >
>>> > > > "mergeIntervalMs": "60_000L",
>>> > > >
>>> > > > "mergeRecordsThreshold": "-1",
>>> > > >
>>> > > > "autoCreateBucket": "true",
>>> > > >
>>> > > > "allowNewBigQueryFields": "false",
>>> > > >
>>> > > > "allowBigQueryRequiredFieldRelaxation": "false",
>>> > > >
>>> > > > "allowSchemaUnionization": "false",
>>> > > >
>>> > > > "kafkaDataFieldName": "null",
>>> > > >
>>> > > > "kafkaKeyFieldName": "null"
>>> > > >
>>> > > > }
>>> > > >
>>> > > > Run as below
>>> > > >
>>> > > >
>>> > > > $KAFKA_HOME/bin/connect-standalone.sh \
>>> > > >
>>> > > >
>>> > >
>>> >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>>> > > >
>>> > > >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> > > >
>>> > > > I get this error
>>> > > >
>>> > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
>>> > > >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
>>> > > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
>>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>>> > > > java.util.concurrent.ExecutionException:
>>> > > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
>>> > > Connector
>>> > > > config {"defaultDataset"="test",,
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
>>> > > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
>>> > > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
>>> > > > "bigQueryMessageTimePartitioning"="false",,
>>> > > > "connector.type"="bigquery-connector",,
>>> > > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
>>> > > > "mergeIntervalMs"="60_000L",,
>>> "convertDoubleSpecialValues"="false",,
>>> > > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
>>> > > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
>>> > > > "topics"="md",, "bigQueryRetry"="0",,
>>> "allBQFieldsNullable"="false",,
>>> > > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
>>> > > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
>>> > > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
>>> > > > "enableBatchLoad"="false",,
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
>>> > > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
>>> > > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
>>> > > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
>>> > > > "kafkaDataFieldName"="null",, }=,
>>> > > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no
>>> connector
>>> > > type
>>> > > >
>>> > > > I think the problem is the wrong entry in the
>>> bigquery-sink.properties
>>> > > > file above.
>>> > > >
>>> > > > I cannot see what it is?
>>> > > >
>>> > > >
>>> > > > Any ideas appreciated.
>>> > > >
>>> > > >
>>> > > > Thanks
>>> > > >
>>> > > > *Disclaimer:* Use it at your own risk. Any and all responsibility
>>> for
>>> > any
>>> > > > loss, damage or destruction of data or any other property which may
>>> > arise
>>> > > > from relying on this email's technical content is explicitly
>>> > disclaimed.
>>> > > > The author will in no case be liable for any monetary damages
>>> arising
>>> > > from
>>> > > > such loss, damage or destruction.
>>> > > >
>>> > >
>>> >
>>>
>>
Re: Error in Kafka property file contains no connector type
Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks all.
I was using an older Kafka version.
I upgraded Kafka in the cluster from kafka_2.12-1.1.0 to the latest stable
version kafka_2.12-2.7.0. I also upgraded zookeeper from
zookeeper-3.4.6 to apache-zookeeper-3.6.2-bin
version.
In addition in the run file I added the following:
#!/bin/ksh
unset CLASSPATH
export
CLASSPATH=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins:$KAFKA_HOME/libs
$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
where the first part of CLASSPATH comes from plugins
plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
This solved the issue.
Mich
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Fri, 12 Mar 2021 at 20:33, Mich Talebzadeh <mi...@gmail.com>
wrote:
> Ok this try to stream kafka topic to BigQuery has moved on but I am now
> getting this error
>
> [2021-03-12 20:17:41,870] ERROR Stopping due to error
> (org.apache.kafka.connect.cli.ConnectStandalone:122)
> j*ava.lang.NoSuchMethodError:
> org.apache.kafka.common.acl.AclBindingFilter.<init>(Lorg/apache/kafka/common/resource/ResourcePatternFilter;Lorg/apache/kafka/common/acl/AccessControlEntryFilter;)V*
> at
> org.apache.kafka.connect.mirror.MirrorSourceConnector.<clinit>(MirrorSourceConnector.java:67)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:302)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:280)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:208)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:177)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:154)
> at
> org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:56)
> at
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
>
> I googled it but could not get much.
>
>
> Any ideas please.
>
>
> Thanks
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 14:44, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Thanks again Liam.
>>
>> This is the error
>>
>> [2021-03-12 14:17:54,670] ERROR Stopping after connector error
>> (org.apache.kafka.connect.cli.ConnectStandalone:113)
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>> at
>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>> at
>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>> at
>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>> at
>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>> at
>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>> at
>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>> Caused by: java.lang.ClassNotFoundException:
>> *org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*
>>
>> It is indeed under $KAFKA_HOME/libs
>>
>>
>> cd $KAFKA_HOME/libs
>>
>> grep -lRi org.apache.kafka.common.config.ConfigDef
>>
>> kafka-clients-1.1.0.jar
>>
>> So it is in kafka-clients-1.1.0.jar file
>>
>>
>> I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
>> copied over the file to
>> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib
>>
>>
>>
>> grep -lRi
>> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>>
>>
>> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar
>>
>> Still cannot find it!
>>
>> In my plugin I have specified
>>
>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>>
>>
>> So not sure where it is looking
>>
>>
>>
>>
>> Regards,
>>
>>
>> Mich
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
>> liam.clarke@adscale.co.nz> wrote:
>>
>>> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
>>> should already have the common conf lib in there).
>>>
>>>
>>> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <
>>> mich.talebzadeh@gmail.com>
>>> wrote:
>>>
>>> > Thanks Liam for the suggestion.
>>> >
>>> > This is the redone sink file (plain text)
>>> >
>>> > name=bigquery-sink
>>> > connector.type=bigquery-connector
>>> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
>>> > defaultDataset=test
>>> > project=axial-glow-224522
>>> > topics=md
>>> > autoCreateTables=false
>>> > gcsBucketName=tmp_storage_bucket
>>> > queueSize=-1
>>> > bigQueryRetry=0
>>> > bigQueryRetryWait=1000
>>> > bigQueryMessageTimePartitioning=false
>>> > bigQueryPartitionDecorator=true
>>> > timePartitioningType=DAY
>>> > keySource=FILE
>>> > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
>>> > sanitizeTopics=false
>>> >
>>> >
>>> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
>>> > threadPoolSize=10
>>> > allBQFieldsNullable=false
>>> > avroDataCacheSize=100
>>> > batchLoadIntervalSec=120
>>> > convertDoubleSpecialValues=false
>>> > enableBatchLoad=false
>>> > upsertEnabled=false
>>> > deleteEnabled=false
>>> > mergeIntervalMs=60_000L
>>> > mergeRecordsThreshold=-1
>>> > autoCreateBucket=true
>>> > allowNewBigQueryFields=false
>>> > allowBigQueryRequiredFieldRelaxation=false
>>> > allowSchemaUnionization=false
>>> > kafkaDataFieldName=null
>>> > kafkaKeyFieldName=null
>>> >
>>> > Now when I run the command
>>> >
>>> > $KAFKA_HOME/bin/connect-standalone.sh \
>>> >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>>> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> >
>>> > It comes back with this error:
>>> >
>>> > [2021-03-12 09:23:54,523] INFO REST server listening at
>>> > http://50.140.197.220:8083/, advertising URL
>>> http://50.140.197.220:8083/
>>> > (org.apache.kafka.connect.runtime.rest.RestServer:207)
>>> > [2021-03-12 09:23:54,523] INFO Kafka Connect started
>>> > (org.apache.kafka.connect.runtime.Connect:55)
>>> > [2021-03-12 09:23:54,534] ERROR Stopping after connector error
>>> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>>> > java.lang.NoClassDefFoundError:
>>> > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>>> > at
>>> >
>>> >
>>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>>> > at
>>> >
>>> >
>>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>>> > at
>>> >
>>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>>> > Caused by: java.lang.ClassNotFoundException:
>>> > org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>>> > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> >
>>> > I downloaded common-config-6.1.0.jar and added to lib directory in
>>> >
>>> > ..../wepay-kafka-connect-bigquery-2.1.0/lib
>>> >
>>> > But little joy I am afraid.
>>> >
>>> > Cheers,
>>> >
>>> > Mich
>>> >
>>> >
>>> >
>>> > LinkedIn *
>>> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> > <
>>> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> > >*
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any
>>> > loss, damage or destruction of data or any other property which may
>>> arise
>>> > from relying on this email's technical content is explicitly
>>> disclaimed.
>>> > The author will in no case be liable for any monetary damages arising
>>> from
>>> > such loss, damage or destruction.
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
>>> > liam.clarke@adscale.co.nz> wrote:
>>> >
>>> > > Hi Mich,
>>> > >
>>> > > Your bigquery-sink.properties file is in a JSON format - which won't
>>> > work.
>>> > > It needs to follow the usual format of a Java properties file.
>>> > >
>>> > > Kind regards,
>>> > >
>>> > > Liam Clarke-Hutchinson
>>> > >
>>> > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
>>> > > mich.talebzadeh@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi,
>>> > > >
>>> > > >
>>> > > > Trying to stream from Kafka to Google BigQuery.
>>> > > >
>>> > > >
>>> > > > The connect-standalone.properties is as follows
>>> > > >
>>> > > >
>>> > > > key.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > value.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > > #
>>> > > >
>>> > > > # Converter-specific settings can be passed in by prefixing the
>>> > > Converter's
>>> > > >
>>> > > > # setting with the converter we want to apply it to
>>> > > >
>>> > > > key.converter.schemas.enable=true
>>> > > >
>>> > > > value.converter.schemas.enable=false
>>> > > >
>>> > > >
>>> > > > # The internal converter used for offsets and config data is
>>> > configurable
>>> > > > and
>>> > > >
>>> > > > # must be specified, but most users will always want to use the
>>> > built-in
>>> > > >
>>> > > > # default. Offset and config data is never visible outside of Kafka
>>> > > Connect
>>> > > > in
>>> > > >
>>> > > > # this format.
>>> > > >
>>> > > >
>>> ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > >
>>> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>>> > > >
>>> > > >
>>> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > >
>>> > >
>>> >
>>> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>>> > > >
>>> > > > internal.key.converter.schemas.enable=false
>>> > > >
>>> > > > internal.value.converter.schemas.enable=false
>>> > > >
>>> > > >
>>> > > > offset.storage.file.filename=/tmp/connect_bq.offsets
>>> > > >
>>> > > > # Flush much faster than normal, which is useful for
>>> testing/debugging
>>> > > >
>>> > > > offset.flush.interval.ms=10000
>>> > > >
>>> > > >
>>> > > > # Set to a list of filesystem paths separated by commas (,) to
>>> enable
>>> > > class
>>> > > >
>>> > > > # loading isolation for plugins (connectors, converters,
>>> > > transformations).
>>> > > > The
>>> > > >
>>> > > > # list should consist of top level directories that include any
>>> > > combination
>>> > > > of:
>>> > > >
>>> > > > # a) directories immediately containing jars with plugins and their
>>> > > > dependencies
>>> > > >
>>> > > > # b) uber-jars with plugins and their dependencies
>>> > > >
>>> > > > # c) directories immediately containing the package directory
>>> structure
>>> > > of
>>> > > >
>>> > > > # classes of plugins and their dependencies Note: symlinks will be
>>> > > followed
>>> > > > to
>>> > > >
>>> > > > # discover dependencies or plugins.
>>> > > >
>>> > > > # Examples:
>>> > > >
>>> > > >
>>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>>> > > >
>>> > > >
>>> > > > And bigquery-sink.properties file has this
>>> > > >
>>> > > >
>>> > > > {
>>> > > >
>>> > > > "name": "bigquery-sink",
>>> > > >
>>> > > > "connector.type": "bigquery-connector",
>>> > > >
>>> > > > "connector.class":
>>> > > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
>>> > > >
>>> > > > "defaultDataset": "test",
>>> > > >
>>> > > > "project": "xyz",
>>> > > >
>>> > > > "topics": "md",
>>> > > >
>>> > > > "autoCreateTables": "false",
>>> > > >
>>> > > > "gcsBucketName": "tmp_storage_bucket",
>>> > > >
>>> > > > "queueSize": "-1",
>>> > > >
>>> > > > "bigQueryRetry": "0",
>>> > > >
>>> > > > "bigQueryRetryWait": "1000",
>>> > > >
>>> > > > "bigQueryMessageTimePartitioning": "false",
>>> > > >
>>> > > > "bigQueryPartitionDecorator": "true",
>>> > > >
>>> > > > "timePartitioningType": "DAY",
>>> > > >
>>> > > > "keySource": "FILE",
>>> > > >
>>> > > > "keyfile": "/home/hduser/xyz.json",
>>> > > >
>>> > > > "sanitizeTopics": "false",
>>> > > >
>>> > > > "schemaRetriever":
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
>>> > > >
>>> > > > "threadPoolSize": "10",
>>> > > >
>>> > > > "allBQFieldsNullable": "false",
>>> > > >
>>> > > > "avroDataCacheSize": "100",
>>> > > >
>>> > > > "batchLoadIntervalSec": "120",
>>> > > >
>>> > > > "convertDoubleSpecialValues": "false",
>>> > > >
>>> > > > "enableBatchLoad": "false",
>>> > > >
>>> > > > "upsertEnabled": "false",
>>> > > >
>>> > > > "deleteEnabled": "false",
>>> > > >
>>> > > > "mergeIntervalMs": "60_000L",
>>> > > >
>>> > > > "mergeRecordsThreshold": "-1",
>>> > > >
>>> > > > "autoCreateBucket": "true",
>>> > > >
>>> > > > "allowNewBigQueryFields": "false",
>>> > > >
>>> > > > "allowBigQueryRequiredFieldRelaxation": "false",
>>> > > >
>>> > > > "allowSchemaUnionization": "false",
>>> > > >
>>> > > > "kafkaDataFieldName": "null",
>>> > > >
>>> > > > "kafkaKeyFieldName": "null"
>>> > > >
>>> > > > }
>>> > > >
>>> > > > Run as below
>>> > > >
>>> > > >
>>> > > > $KAFKA_HOME/bin/connect-standalone.sh \
>>> > > >
>>> > > >
>>> > >
>>> >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>>> > > >
>>> > > >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> > > >
>>> > > > I get this error
>>> > > >
>>> > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
>>> > > >
>>> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
>>> > > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
>>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>>> > > > java.util.concurrent.ExecutionException:
>>> > > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
>>> > > Connector
>>> > > > config {"defaultDataset"="test",,
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
>>> > > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
>>> > > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
>>> > > > "bigQueryMessageTimePartitioning"="false",,
>>> > > > "connector.type"="bigquery-connector",,
>>> > > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
>>> > > > "mergeIntervalMs"="60_000L",,
>>> "convertDoubleSpecialValues"="false",,
>>> > > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
>>> > > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
>>> > > > "topics"="md",, "bigQueryRetry"="0",,
>>> "allBQFieldsNullable"="false",,
>>> > > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
>>> > > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
>>> > > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
>>> > > > "enableBatchLoad"="false",,
>>> > > >
>>> > > >
>>> > >
>>> >
>>> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
>>> > > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
>>> > > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
>>> > > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
>>> > > > "kafkaDataFieldName"="null",, }=,
>>> > > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no
>>> connector
>>> > > type
>>> > > >
>>> > > > I think the problem is the wrong entry in the
>>> bigquery-sink.properties
>>> > > > file above.
>>> > > >
>>> > > > I cannot see what it is?
>>> > > >
>>> > > >
>>> > > > Any ideas appreciated.
>>> > > >
>>> > > >
>>> > > > Thanks
>>> > > >
>>> > > > *Disclaimer:* Use it at your own risk. Any and all responsibility
>>> for
>>> > any
>>> > > > loss, damage or destruction of data or any other property which may
>>> > arise
>>> > > > from relying on this email's technical content is explicitly
>>> > disclaimed.
>>> > > > The author will in no case be liable for any monetary damages
>>> arising
>>> > > from
>>> > > > such loss, damage or destruction.
>>> > > >
>>> > >
>>> >
>>>
>>
Re: Error in Kafka property file contains no connector type
Posted by Mich Talebzadeh <mi...@gmail.com>.
Ok this try to stream kafka topic to BigQuery has moved on but I am now
getting this error
[2021-03-12 20:17:41,870] ERROR Stopping due to error
(org.apache.kafka.connect.cli.ConnectStandalone:122)
j*ava.lang.NoSuchMethodError:
org.apache.kafka.common.acl.AclBindingFilter.<init>(Lorg/apache/kafka/common/resource/ResourcePatternFilter;Lorg/apache/kafka/common/acl/AccessControlEntryFilter;)V*
at
org.apache.kafka.connect.mirror.MirrorSourceConnector.<clinit>(MirrorSourceConnector.java:67)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:302)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:280)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:208)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:177)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:154)
at
org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:56)
at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)
I googled it but could not get much.
Any ideas please.
Thanks
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Fri, 12 Mar 2021 at 14:44, Mich Talebzadeh <mi...@gmail.com>
wrote:
> Thanks again Liam.
>
> This is the error
>
> [2021-03-12 14:17:54,670] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:113)
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
> at
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
> at
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
> at
> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
> at
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
> at
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
> at
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> Caused by: java.lang.ClassNotFoundException:
> *org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*
>
> It is indeed under $KAFKA_HOME/libs
>
>
> cd $KAFKA_HOME/libs
>
> grep -lRi org.apache.kafka.common.config.ConfigDef
>
> kafka-clients-1.1.0.jar
>
> So it is in kafka-clients-1.1.0.jar file
>
>
> I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
> copied over the file to
> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib
>
>
>
> grep -lRi
> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>
>
> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar
>
> Still cannot find it!
>
> In my plugin I have specified
>
> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>
>
> So not sure where it is looking
>
>
>
>
> Regards,
>
>
> Mich
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
> liam.clarke@adscale.co.nz> wrote:
>
>> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
>> should already have the common conf lib in there).
>>
>>
>> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <
>> mich.talebzadeh@gmail.com>
>> wrote:
>>
>> > Thanks Liam for the suggestion.
>> >
>> > This is the redone sink file (plain text)
>> >
>> > name=bigquery-sink
>> > connector.type=bigquery-connector
>> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
>> > defaultDataset=test
>> > project=axial-glow-224522
>> > topics=md
>> > autoCreateTables=false
>> > gcsBucketName=tmp_storage_bucket
>> > queueSize=-1
>> > bigQueryRetry=0
>> > bigQueryRetryWait=1000
>> > bigQueryMessageTimePartitioning=false
>> > bigQueryPartitionDecorator=true
>> > timePartitioningType=DAY
>> > keySource=FILE
>> > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
>> > sanitizeTopics=false
>> >
>> >
>> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
>> > threadPoolSize=10
>> > allBQFieldsNullable=false
>> > avroDataCacheSize=100
>> > batchLoadIntervalSec=120
>> > convertDoubleSpecialValues=false
>> > enableBatchLoad=false
>> > upsertEnabled=false
>> > deleteEnabled=false
>> > mergeIntervalMs=60_000L
>> > mergeRecordsThreshold=-1
>> > autoCreateBucket=true
>> > allowNewBigQueryFields=false
>> > allowBigQueryRequiredFieldRelaxation=false
>> > allowSchemaUnionization=false
>> > kafkaDataFieldName=null
>> > kafkaKeyFieldName=null
>> >
>> > Now when I run the command
>> >
>> > $KAFKA_HOME/bin/connect-standalone.sh \
>> >
>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>> >
>> > It comes back with this error:
>> >
>> > [2021-03-12 09:23:54,523] INFO REST server listening at
>> > http://50.140.197.220:8083/, advertising URL
>> http://50.140.197.220:8083/
>> > (org.apache.kafka.connect.runtime.rest.RestServer:207)
>> > [2021-03-12 09:23:54,523] INFO Kafka Connect started
>> > (org.apache.kafka.connect.runtime.Connect:55)
>> > [2021-03-12 09:23:54,534] ERROR Stopping after connector error
>> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>> > java.lang.NoClassDefFoundError:
>> > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>> > at
>> >
>> >
>> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>> > at
>> >
>> >
>> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>> > at
>> >
>> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>> > at
>> >
>> >
>> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>> > at
>> >
>> >
>> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>> > at
>> >
>> >
>> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
>> > Caused by: java.lang.ClassNotFoundException:
>> > org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>> > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> > at
>> >
>> >
>> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> >
>> > I downloaded common-config-6.1.0.jar and added to lib directory in
>> >
>> > ..../wepay-kafka-connect-bigquery-2.1.0/lib
>> >
>> > But little joy I am afraid.
>> >
>> > Cheers,
>> >
>> > Mich
>> >
>> >
>> >
>> > LinkedIn *
>> >
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> > <
>> >
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> > >*
>> >
>> >
>> >
>> >
>> >
>> > *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any
>> > loss, damage or destruction of data or any other property which may
>> arise
>> > from relying on this email's technical content is explicitly disclaimed.
>> > The author will in no case be liable for any monetary damages arising
>> from
>> > such loss, damage or destruction.
>> >
>> >
>> >
>> >
>> > On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
>> > liam.clarke@adscale.co.nz> wrote:
>> >
>> > > Hi Mich,
>> > >
>> > > Your bigquery-sink.properties file is in a JSON format - which won't
>> > work.
>> > > It needs to follow the usual format of a Java properties file.
>> > >
>> > > Kind regards,
>> > >
>> > > Liam Clarke-Hutchinson
>> > >
>> > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
>> > > mich.talebzadeh@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > >
>> > > > Trying to stream from Kafka to Google BigQuery.
>> > > >
>> > > >
>> > > > The connect-standalone.properties is as follows
>> > > >
>> > > >
>> > > > key.converter=org.apache.kafka.connect.storage.StringConverter
>> > > >
>> > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
>> > > >
>> > > > value.converter=org.apache.kafka.connect.json.JsonConverter
>> > > >
>> > > > #
>> > > >
>> > > > # Converter-specific settings can be passed in by prefixing the
>> > > Converter's
>> > > >
>> > > > # setting with the converter we want to apply it to
>> > > >
>> > > > key.converter.schemas.enable=true
>> > > >
>> > > > value.converter.schemas.enable=false
>> > > >
>> > > >
>> > > > # The internal converter used for offsets and config data is
>> > configurable
>> > > > and
>> > > >
>> > > > # must be specified, but most users will always want to use the
>> > built-in
>> > > >
>> > > > # default. Offset and config data is never visible outside of Kafka
>> > > Connect
>> > > > in
>> > > >
>> > > > # this format.
>> > > >
>> > > > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>> > > >
>> > > > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>> > > >
>> > > >
>> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>> > > >
>> > > >
>> > >
>> >
>> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>> > > >
>> > > > internal.key.converter.schemas.enable=false
>> > > >
>> > > > internal.value.converter.schemas.enable=false
>> > > >
>> > > >
>> > > > offset.storage.file.filename=/tmp/connect_bq.offsets
>> > > >
>> > > > # Flush much faster than normal, which is useful for
>> testing/debugging
>> > > >
>> > > > offset.flush.interval.ms=10000
>> > > >
>> > > >
>> > > > # Set to a list of filesystem paths separated by commas (,) to
>> enable
>> > > class
>> > > >
>> > > > # loading isolation for plugins (connectors, converters,
>> > > transformations).
>> > > > The
>> > > >
>> > > > # list should consist of top level directories that include any
>> > > combination
>> > > > of:
>> > > >
>> > > > # a) directories immediately containing jars with plugins and their
>> > > > dependencies
>> > > >
>> > > > # b) uber-jars with plugins and their dependencies
>> > > >
>> > > > # c) directories immediately containing the package directory
>> structure
>> > > of
>> > > >
>> > > > # classes of plugins and their dependencies Note: symlinks will be
>> > > followed
>> > > > to
>> > > >
>> > > > # discover dependencies or plugins.
>> > > >
>> > > > # Examples:
>> > > >
>> > > >
>> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>> > > >
>> > > >
>> > > > And bigquery-sink.properties file has this
>> > > >
>> > > >
>> > > > {
>> > > >
>> > > > "name": "bigquery-sink",
>> > > >
>> > > > "connector.type": "bigquery-connector",
>> > > >
>> > > > "connector.class":
>> > > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
>> > > >
>> > > > "defaultDataset": "test",
>> > > >
>> > > > "project": "xyz",
>> > > >
>> > > > "topics": "md",
>> > > >
>> > > > "autoCreateTables": "false",
>> > > >
>> > > > "gcsBucketName": "tmp_storage_bucket",
>> > > >
>> > > > "queueSize": "-1",
>> > > >
>> > > > "bigQueryRetry": "0",
>> > > >
>> > > > "bigQueryRetryWait": "1000",
>> > > >
>> > > > "bigQueryMessageTimePartitioning": "false",
>> > > >
>> > > > "bigQueryPartitionDecorator": "true",
>> > > >
>> > > > "timePartitioningType": "DAY",
>> > > >
>> > > > "keySource": "FILE",
>> > > >
>> > > > "keyfile": "/home/hduser/xyz.json",
>> > > >
>> > > > "sanitizeTopics": "false",
>> > > >
>> > > > "schemaRetriever":
>> > > >
>> > > >
>> > >
>> >
>> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
>> > > >
>> > > > "threadPoolSize": "10",
>> > > >
>> > > > "allBQFieldsNullable": "false",
>> > > >
>> > > > "avroDataCacheSize": "100",
>> > > >
>> > > > "batchLoadIntervalSec": "120",
>> > > >
>> > > > "convertDoubleSpecialValues": "false",
>> > > >
>> > > > "enableBatchLoad": "false",
>> > > >
>> > > > "upsertEnabled": "false",
>> > > >
>> > > > "deleteEnabled": "false",
>> > > >
>> > > > "mergeIntervalMs": "60_000L",
>> > > >
>> > > > "mergeRecordsThreshold": "-1",
>> > > >
>> > > > "autoCreateBucket": "true",
>> > > >
>> > > > "allowNewBigQueryFields": "false",
>> > > >
>> > > > "allowBigQueryRequiredFieldRelaxation": "false",
>> > > >
>> > > > "allowSchemaUnionization": "false",
>> > > >
>> > > > "kafkaDataFieldName": "null",
>> > > >
>> > > > "kafkaKeyFieldName": "null"
>> > > >
>> > > > }
>> > > >
>> > > > Run as below
>> > > >
>> > > >
>> > > > $KAFKA_HOME/bin/connect-standalone.sh \
>> > > >
>> > > >
>> > >
>> >
>> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>> > > >
>> > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>> > > >
>> > > > I get this error
>> > > >
>> > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
>> > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
>> > > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
>> > > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
>> > > > java.util.concurrent.ExecutionException:
>> > > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
>> > > Connector
>> > > > config {"defaultDataset"="test",,
>> > > >
>> > > >
>> > >
>> >
>> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
>> > > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
>> > > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
>> > > > "bigQueryMessageTimePartitioning"="false",,
>> > > > "connector.type"="bigquery-connector",,
>> > > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
>> > > > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
>> > > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
>> > > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
>> > > > "topics"="md",, "bigQueryRetry"="0",,
>> "allBQFieldsNullable"="false",,
>> > > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
>> > > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
>> > > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
>> > > > "enableBatchLoad"="false",,
>> > > >
>> > > >
>> > >
>> >
>> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
>> > > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
>> > > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
>> > > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
>> > > > "kafkaDataFieldName"="null",, }=,
>> > > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no
>> connector
>> > > type
>> > > >
>> > > > I think the problem is the wrong entry in the
>> bigquery-sink.properties
>> > > > file above.
>> > > >
>> > > > I cannot see what it is?
>> > > >
>> > > >
>> > > > Any ideas appreciated.
>> > > >
>> > > >
>> > > > Thanks
>> > > >
>> > > > *Disclaimer:* Use it at your own risk. Any and all responsibility
>> for
>> > any
>> > > > loss, damage or destruction of data or any other property which may
>> > arise
>> > > > from relying on this email's technical content is explicitly
>> > disclaimed.
>> > > > The author will in no case be liable for any monetary damages
>> arising
>> > > from
>> > > > such loss, damage or destruction.
>> > > >
>> > >
>> >
>>
>
Re: Error in Kafka property file contains no connector type
Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks again Liam.
This is the error
[2021-03-12 14:17:54,670] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.lang.NoClassDefFoundError:
org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
at
com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
at
com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
at
org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
at
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
at
org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
Caused by: java.lang.ClassNotFoundException:
*org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*
It is indeed under $KAFKA_HOME/libs
cd $KAFKA_HOME/libs
grep -lRi org.apache.kafka.common.config.ConfigDef
kafka-clients-1.1.0.jar
So it is in kafka-clients-1.1.0.jar file
I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
copied over the file to
share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib
grep -lRi
org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar
Still cannot find it!
In my plugin I have specified
plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
So not sure where it is looking
Regards,
Mich
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
liam.clarke@adscale.co.nz> wrote:
> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
> should already have the common conf lib in there).
>
>
> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <mich.talebzadeh@gmail.com
> >
> wrote:
>
> > Thanks Liam for the suggestion.
> >
> > This is the redone sink file (plain text)
> >
> > name=bigquery-sink
> > connector.type=bigquery-connector
> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
> > defaultDataset=test
> > project=axial-glow-224522
> > topics=md
> > autoCreateTables=false
> > gcsBucketName=tmp_storage_bucket
> > queueSize=-1
> > bigQueryRetry=0
> > bigQueryRetryWait=1000
> > bigQueryMessageTimePartitioning=false
> > bigQueryPartitionDecorator=true
> > timePartitioningType=DAY
> > keySource=FILE
> > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
> > sanitizeTopics=false
> >
> >
> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
> > threadPoolSize=10
> > allBQFieldsNullable=false
> > avroDataCacheSize=100
> > batchLoadIntervalSec=120
> > convertDoubleSpecialValues=false
> > enableBatchLoad=false
> > upsertEnabled=false
> > deleteEnabled=false
> > mergeIntervalMs=60_000L
> > mergeRecordsThreshold=-1
> > autoCreateBucket=true
> > allowNewBigQueryFields=false
> > allowBigQueryRequiredFieldRelaxation=false
> > allowSchemaUnionization=false
> > kafkaDataFieldName=null
> > kafkaKeyFieldName=null
> >
> > Now when I run the command
> >
> > $KAFKA_HOME/bin/connect-standalone.sh \
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> >
> > It comes back with this error:
> >
> > [2021-03-12 09:23:54,523] INFO REST server listening at
> > http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
> > (org.apache.kafka.connect.runtime.rest.RestServer:207)
> > [2021-03-12 09:23:54,523] INFO Kafka Connect started
> > (org.apache.kafka.connect.runtime.Connect:55)
> > [2021-03-12 09:23:54,534] ERROR Stopping after connector error
> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > java.lang.NoClassDefFoundError:
> > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
> > at
> >
> >
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
> > at
> >
> >
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
> > at
> > org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
> > at
> >
> >
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
> > at
> >
> >
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
> > at
> >
> >
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> > Caused by: java.lang.ClassNotFoundException:
> > org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > at
> >
> >
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >
> > I downloaded common-config-6.1.0.jar and added to lib directory in
> >
> > ..../wepay-kafka-connect-bigquery-2.1.0/lib
> >
> > But little joy I am afraid.
> >
> > Cheers,
> >
> > Mich
> >
> >
> >
> > LinkedIn *
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> > <
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> > >*
> >
> >
> >
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
> >
> >
> >
> >
> > On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
> > liam.clarke@adscale.co.nz> wrote:
> >
> > > Hi Mich,
> > >
> > > Your bigquery-sink.properties file is in a JSON format - which won't
> > work.
> > > It needs to follow the usual format of a Java properties file.
> > >
> > > Kind regards,
> > >
> > > Liam Clarke-Hutchinson
> > >
> > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> > > mich.talebzadeh@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > > Trying to stream from Kafka to Google BigQuery.
> > > >
> > > >
> > > > The connect-standalone.properties is as follows
> > > >
> > > >
> > > > key.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > > value.converter=org.apache.kafka.connect.json.JsonConverter
> > > >
> > > > #
> > > >
> > > > # Converter-specific settings can be passed in by prefixing the
> > > Converter's
> > > >
> > > > # setting with the converter we want to apply it to
> > > >
> > > > key.converter.schemas.enable=true
> > > >
> > > > value.converter.schemas.enable=false
> > > >
> > > >
> > > > # The internal converter used for offsets and config data is
> > configurable
> > > > and
> > > >
> > > > # must be specified, but most users will always want to use the
> > built-in
> > > >
> > > > # default. Offset and config data is never visible outside of Kafka
> > > Connect
> > > > in
> > > >
> > > > # this format.
> > > >
> > > > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> > > >
> > > > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> > > >
> > > >
> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > >
> > >
> >
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
> > > >
> > > > internal.key.converter.schemas.enable=false
> > > >
> > > > internal.value.converter.schemas.enable=false
> > > >
> > > >
> > > > offset.storage.file.filename=/tmp/connect_bq.offsets
> > > >
> > > > # Flush much faster than normal, which is useful for
> testing/debugging
> > > >
> > > > offset.flush.interval.ms=10000
> > > >
> > > >
> > > > # Set to a list of filesystem paths separated by commas (,) to enable
> > > class
> > > >
> > > > # loading isolation for plugins (connectors, converters,
> > > transformations).
> > > > The
> > > >
> > > > # list should consist of top level directories that include any
> > > combination
> > > > of:
> > > >
> > > > # a) directories immediately containing jars with plugins and their
> > > > dependencies
> > > >
> > > > # b) uber-jars with plugins and their dependencies
> > > >
> > > > # c) directories immediately containing the package directory
> structure
> > > of
> > > >
> > > > # classes of plugins and their dependencies Note: symlinks will be
> > > followed
> > > > to
> > > >
> > > > # discover dependencies or plugins.
> > > >
> > > > # Examples:
> > > >
> > > >
> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
> > > >
> > > >
> > > > And bigquery-sink.properties file has this
> > > >
> > > >
> > > > {
> > > >
> > > > "name": "bigquery-sink",
> > > >
> > > > "connector.type": "bigquery-connector",
> > > >
> > > > "connector.class":
> > > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
> > > >
> > > > "defaultDataset": "test",
> > > >
> > > > "project": "xyz",
> > > >
> > > > "topics": "md",
> > > >
> > > > "autoCreateTables": "false",
> > > >
> > > > "gcsBucketName": "tmp_storage_bucket",
> > > >
> > > > "queueSize": "-1",
> > > >
> > > > "bigQueryRetry": "0",
> > > >
> > > > "bigQueryRetryWait": "1000",
> > > >
> > > > "bigQueryMessageTimePartitioning": "false",
> > > >
> > > > "bigQueryPartitionDecorator": "true",
> > > >
> > > > "timePartitioningType": "DAY",
> > > >
> > > > "keySource": "FILE",
> > > >
> > > > "keyfile": "/home/hduser/xyz.json",
> > > >
> > > > "sanitizeTopics": "false",
> > > >
> > > > "schemaRetriever":
> > > >
> > > >
> > >
> >
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
> > > >
> > > > "threadPoolSize": "10",
> > > >
> > > > "allBQFieldsNullable": "false",
> > > >
> > > > "avroDataCacheSize": "100",
> > > >
> > > > "batchLoadIntervalSec": "120",
> > > >
> > > > "convertDoubleSpecialValues": "false",
> > > >
> > > > "enableBatchLoad": "false",
> > > >
> > > > "upsertEnabled": "false",
> > > >
> > > > "deleteEnabled": "false",
> > > >
> > > > "mergeIntervalMs": "60_000L",
> > > >
> > > > "mergeRecordsThreshold": "-1",
> > > >
> > > > "autoCreateBucket": "true",
> > > >
> > > > "allowNewBigQueryFields": "false",
> > > >
> > > > "allowBigQueryRequiredFieldRelaxation": "false",
> > > >
> > > > "allowSchemaUnionization": "false",
> > > >
> > > > "kafkaDataFieldName": "null",
> > > >
> > > > "kafkaKeyFieldName": "null"
> > > >
> > > > }
> > > >
> > > > Run as below
> > > >
> > > >
> > > > $KAFKA_HOME/bin/connect-standalone.sh \
> > > >
> > > >
> > >
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> > > >
> > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > > >
> > > > I get this error
> > > >
> > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
> > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
> > > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> > > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > > > java.util.concurrent.ExecutionException:
> > > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
> > > Connector
> > > > config {"defaultDataset"="test",,
> > > >
> > > >
> > >
> >
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> > > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
> > > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> > > > "bigQueryMessageTimePartitioning"="false",,
> > > > "connector.type"="bigquery-connector",,
> > > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> > > > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> > > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> > > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> > > > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> > > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> > > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> > > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> > > > "enableBatchLoad"="false",,
> > > >
> > > >
> > >
> >
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> > > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> > > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> > > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> > > > "kafkaDataFieldName"="null",, }=,
> > > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no
> connector
> > > type
> > > >
> > > > I think the problem is the wrong entry in the
> bigquery-sink.properties
> > > > file above.
> > > >
> > > > I cannot see what it is?
> > > >
> > > >
> > > > Any ideas appreciated.
> > > >
> > > >
> > > > Thanks
> > > >
> > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for
> > any
> > > > loss, damage or destruction of data or any other property which may
> > arise
> > > > from relying on this email's technical content is explicitly
> > disclaimed.
> > > > The author will in no case be liable for any monetary damages arising
> > > from
> > > > such loss, damage or destruction.
> > > >
> > >
> >
>
Re: Error in Kafka property file contains no connector type
Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
should already have the common conf lib in there).
On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <mi...@gmail.com>
wrote:
> Thanks Liam for the suggestion.
>
> This is the redone sink file (plain text)
>
> name=bigquery-sink
> connector.type=bigquery-connector
> connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
> defaultDataset=test
> project=axial-glow-224522
> topics=md
> autoCreateTables=false
> gcsBucketName=tmp_storage_bucket
> queueSize=-1
> bigQueryRetry=0
> bigQueryRetryWait=1000
> bigQueryMessageTimePartitioning=false
> bigQueryPartitionDecorator=true
> timePartitioningType=DAY
> keySource=FILE
> keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
> sanitizeTopics=false
>
> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
> threadPoolSize=10
> allBQFieldsNullable=false
> avroDataCacheSize=100
> batchLoadIntervalSec=120
> convertDoubleSpecialValues=false
> enableBatchLoad=false
> upsertEnabled=false
> deleteEnabled=false
> mergeIntervalMs=60_000L
> mergeRecordsThreshold=-1
> autoCreateBucket=true
> allowNewBigQueryFields=false
> allowBigQueryRequiredFieldRelaxation=false
> allowSchemaUnionization=false
> kafkaDataFieldName=null
> kafkaKeyFieldName=null
>
> Now when I run the command
>
> $KAFKA_HOME/bin/connect-standalone.sh \
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>
> It comes back with this error:
>
> [2021-03-12 09:23:54,523] INFO REST server listening at
> http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
> (org.apache.kafka.connect.runtime.rest.RestServer:207)
> [2021-03-12 09:23:54,523] INFO Kafka Connect started
> (org.apache.kafka.connect.runtime.Connect:55)
> [2021-03-12 09:23:54,534] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:113)
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
> at
>
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
> at
>
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
> at
> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
> at
>
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
> at
>
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
> at
>
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
>
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> I downloaded common-config-6.1.0.jar and added to lib directory in
>
> ..../wepay-kafka-connect-bigquery-2.1.0/lib
>
> But little joy I am afraid.
>
> Cheers,
>
> Mich
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
> liam.clarke@adscale.co.nz> wrote:
>
> > Hi Mich,
> >
> > Your bigquery-sink.properties file is in a JSON format - which won't
> work.
> > It needs to follow the usual format of a Java properties file.
> >
> > Kind regards,
> >
> > Liam Clarke-Hutchinson
> >
> > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> > mich.talebzadeh@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > > Trying to stream from Kafka to Google BigQuery.
> > >
> > >
> > > The connect-standalone.properties is as follows
> > >
> > >
> > > key.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > > value.converter=org.apache.kafka.connect.json.JsonConverter
> > >
> > > #
> > >
> > > # Converter-specific settings can be passed in by prefixing the
> > Converter's
> > >
> > > # setting with the converter we want to apply it to
> > >
> > > key.converter.schemas.enable=true
> > >
> > > value.converter.schemas.enable=false
> > >
> > >
> > > # The internal converter used for offsets and config data is
> configurable
> > > and
> > >
> > > # must be specified, but most users will always want to use the
> built-in
> > >
> > > # default. Offset and config data is never visible outside of Kafka
> > Connect
> > > in
> > >
> > > # this format.
> > >
> > > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> > >
> > > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> > >
> > > internal.key.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > >
> >
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > > internal.key.converter.schemas.enable=false
> > >
> > > internal.value.converter.schemas.enable=false
> > >
> > >
> > > offset.storage.file.filename=/tmp/connect_bq.offsets
> > >
> > > # Flush much faster than normal, which is useful for testing/debugging
> > >
> > > offset.flush.interval.ms=10000
> > >
> > >
> > > # Set to a list of filesystem paths separated by commas (,) to enable
> > class
> > >
> > > # loading isolation for plugins (connectors, converters,
> > transformations).
> > > The
> > >
> > > # list should consist of top level directories that include any
> > combination
> > > of:
> > >
> > > # a) directories immediately containing jars with plugins and their
> > > dependencies
> > >
> > > # b) uber-jars with plugins and their dependencies
> > >
> > > # c) directories immediately containing the package directory structure
> > of
> > >
> > > # classes of plugins and their dependencies Note: symlinks will be
> > followed
> > > to
> > >
> > > # discover dependencies or plugins.
> > >
> > > # Examples:
> > >
> > > plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
> > >
> > >
> > > And bigquery-sink.properties file has this
> > >
> > >
> > > {
> > >
> > > "name": "bigquery-sink",
> > >
> > > "connector.type": "bigquery-connector",
> > >
> > > "connector.class":
> > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
> > >
> > > "defaultDataset": "test",
> > >
> > > "project": "xyz",
> > >
> > > "topics": "md",
> > >
> > > "autoCreateTables": "false",
> > >
> > > "gcsBucketName": "tmp_storage_bucket",
> > >
> > > "queueSize": "-1",
> > >
> > > "bigQueryRetry": "0",
> > >
> > > "bigQueryRetryWait": "1000",
> > >
> > > "bigQueryMessageTimePartitioning": "false",
> > >
> > > "bigQueryPartitionDecorator": "true",
> > >
> > > "timePartitioningType": "DAY",
> > >
> > > "keySource": "FILE",
> > >
> > > "keyfile": "/home/hduser/xyz.json",
> > >
> > > "sanitizeTopics": "false",
> > >
> > > "schemaRetriever":
> > >
> > >
> >
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
> > >
> > > "threadPoolSize": "10",
> > >
> > > "allBQFieldsNullable": "false",
> > >
> > > "avroDataCacheSize": "100",
> > >
> > > "batchLoadIntervalSec": "120",
> > >
> > > "convertDoubleSpecialValues": "false",
> > >
> > > "enableBatchLoad": "false",
> > >
> > > "upsertEnabled": "false",
> > >
> > > "deleteEnabled": "false",
> > >
> > > "mergeIntervalMs": "60_000L",
> > >
> > > "mergeRecordsThreshold": "-1",
> > >
> > > "autoCreateBucket": "true",
> > >
> > > "allowNewBigQueryFields": "false",
> > >
> > > "allowBigQueryRequiredFieldRelaxation": "false",
> > >
> > > "allowSchemaUnionization": "false",
> > >
> > > "kafkaDataFieldName": "null",
> > >
> > > "kafkaKeyFieldName": "null"
> > >
> > > }
> > >
> > > Run as below
> > >
> > >
> > > $KAFKA_HOME/bin/connect-standalone.sh \
> > >
> > >
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> > >
> > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > >
> > > I get this error
> > >
> > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
> > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
> > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > > java.util.concurrent.ExecutionException:
> > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
> > Connector
> > > config {"defaultDataset"="test",,
> > >
> > >
> >
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
> > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> > > "bigQueryMessageTimePartitioning"="false",,
> > > "connector.type"="bigquery-connector",,
> > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> > > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> > > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> > > "enableBatchLoad"="false",,
> > >
> > >
> >
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> > > "kafkaDataFieldName"="null",, }=,
> > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector
> > type
> > >
> > > I think the problem is the wrong entry in the bigquery-sink.properties
> > > file above.
> > >
> > > I cannot see what it is?
> > >
> > >
> > > Any ideas appreciated.
> > >
> > >
> > > Thanks
> > >
> > > *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any
> > > loss, damage or destruction of data or any other property which may
> arise
> > > from relying on this email's technical content is explicitly
> disclaimed.
> > > The author will in no case be liable for any monetary damages arising
> > from
> > > such loss, damage or destruction.
> > >
> >
>
Re: Error in Kafka property file contains no connector type
Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks Liam for the suggestion.
This is the redone sink file (plain text)
name=bigquery-sink
connector.type=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
defaultDataset=test
project=axial-glow-224522
topics=md
autoCreateTables=false
gcsBucketName=tmp_storage_bucket
queueSize=-1
bigQueryRetry=0
bigQueryRetryWait=1000
bigQueryMessageTimePartitioning=false
bigQueryPartitionDecorator=true
timePartitioningType=DAY
keySource=FILE
keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
sanitizeTopics=false
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSize=10
allBQFieldsNullable=false
avroDataCacheSize=100
batchLoadIntervalSec=120
convertDoubleSpecialValues=false
enableBatchLoad=false
upsertEnabled=false
deleteEnabled=false
mergeIntervalMs=60_000L
mergeRecordsThreshold=-1
autoCreateBucket=true
allowNewBigQueryFields=false
allowBigQueryRequiredFieldRelaxation=false
allowSchemaUnionization=false
kafkaDataFieldName=null
kafkaKeyFieldName=null
Now when I run the command
$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
It comes back with this error:
[2021-03-12 09:23:54,523] INFO REST server listening at
http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
(org.apache.kafka.connect.runtime.rest.RestServer:207)
[2021-03-12 09:23:54,523] INFO Kafka Connect started
(org.apache.kafka.connect.runtime.Connect:55)
[2021-03-12 09:23:54,534] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.lang.NoClassDefFoundError:
org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
at
com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
at
com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
at
org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
at
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
at
org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at
org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
I downloaded common-config-6.1.0.jar and added to lib directory in
..../wepay-kafka-connect-bigquery-2.1.0/lib
But little joy I am afraid.
Cheers,
Mich
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
liam.clarke@adscale.co.nz> wrote:
> Hi Mich,
>
> Your bigquery-sink.properties file is in a JSON format - which won't work.
> It needs to follow the usual format of a Java properties file.
>
> Kind regards,
>
> Liam Clarke-Hutchinson
>
> On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> mich.talebzadeh@gmail.com>
> wrote:
>
> > Hi,
> >
> >
> > Trying to stream from Kafka to Google BigQuery.
> >
> >
> > The connect-standalone.properties is as follows
> >
> >
> > key.converter=org.apache.kafka.connect.storage.StringConverter
> >
> > ##value.converter=org.apache.kafka.connect.storage.StringConverter
> >
> > value.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > #
> >
> > # Converter-specific settings can be passed in by prefixing the
> Converter's
> >
> > # setting with the converter we want to apply it to
> >
> > key.converter.schemas.enable=true
> >
> > value.converter.schemas.enable=false
> >
> >
> > # The internal converter used for offsets and config data is configurable
> > and
> >
> > # must be specified, but most users will always want to use the built-in
> >
> > # default. Offset and config data is never visible outside of Kafka
> Connect
> > in
> >
> > # this format.
> >
> > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > internal.key.converter=org.apache.kafka.connect.storage.StringConverter
> >
> >
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
> >
> > internal.key.converter.schemas.enable=false
> >
> > internal.value.converter.schemas.enable=false
> >
> >
> > offset.storage.file.filename=/tmp/connect_bq.offsets
> >
> > # Flush much faster than normal, which is useful for testing/debugging
> >
> > offset.flush.interval.ms=10000
> >
> >
> > # Set to a list of filesystem paths separated by commas (,) to enable
> class
> >
> > # loading isolation for plugins (connectors, converters,
> transformations).
> > The
> >
> > # list should consist of top level directories that include any
> combination
> > of:
> >
> > # a) directories immediately containing jars with plugins and their
> > dependencies
> >
> > # b) uber-jars with plugins and their dependencies
> >
> > # c) directories immediately containing the package directory structure
> of
> >
> > # classes of plugins and their dependencies Note: symlinks will be
> followed
> > to
> >
> > # discover dependencies or plugins.
> >
> > # Examples:
> >
> > plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
> >
> >
> > And bigquery-sink.properties file has this
> >
> >
> > {
> >
> > "name": "bigquery-sink",
> >
> > "connector.type": "bigquery-connector",
> >
> > "connector.class":
> > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
> >
> > "defaultDataset": "test",
> >
> > "project": "xyz",
> >
> > "topics": "md",
> >
> > "autoCreateTables": "false",
> >
> > "gcsBucketName": "tmp_storage_bucket",
> >
> > "queueSize": "-1",
> >
> > "bigQueryRetry": "0",
> >
> > "bigQueryRetryWait": "1000",
> >
> > "bigQueryMessageTimePartitioning": "false",
> >
> > "bigQueryPartitionDecorator": "true",
> >
> > "timePartitioningType": "DAY",
> >
> > "keySource": "FILE",
> >
> > "keyfile": "/home/hduser/xyz.json",
> >
> > "sanitizeTopics": "false",
> >
> > "schemaRetriever":
> >
> >
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
> >
> > "threadPoolSize": "10",
> >
> > "allBQFieldsNullable": "false",
> >
> > "avroDataCacheSize": "100",
> >
> > "batchLoadIntervalSec": "120",
> >
> > "convertDoubleSpecialValues": "false",
> >
> > "enableBatchLoad": "false",
> >
> > "upsertEnabled": "false",
> >
> > "deleteEnabled": "false",
> >
> > "mergeIntervalMs": "60_000L",
> >
> > "mergeRecordsThreshold": "-1",
> >
> > "autoCreateBucket": "true",
> >
> > "allowNewBigQueryFields": "false",
> >
> > "allowBigQueryRequiredFieldRelaxation": "false",
> >
> > "allowSchemaUnionization": "false",
> >
> > "kafkaDataFieldName": "null",
> >
> > "kafkaKeyFieldName": "null"
> >
> > }
> >
> > Run as below
> >
> >
> > $KAFKA_HOME/bin/connect-standalone.sh \
> >
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> >
> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> >
> > I get this error
> >
> > [2021-03-11 11:07:58,826] ERROR Failed to create job for
> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > (org.apache.kafka.connect.cli.ConnectStandalone:102)
> > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > java.util.concurrent.ExecutionException:
> > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
> Connector
> > config {"defaultDataset"="test",,
> >
> >
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> > "project"="axial-glow-224522",, "autoCreateTables"="false",,
> > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> > "bigQueryMessageTimePartitioning"="false",,
> > "connector.type"="bigquery-connector",,
> > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> > "enableBatchLoad"="false",,
> >
> >
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> > "kafkaDataFieldName"="null",, }=,
> > "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector
> type
> >
> > I think the problem is the wrong entry in the bigquery-sink.properties
> > file above.
> >
> > I cannot see what it is?
> >
> >
> > Any ideas appreciated.
> >
> >
> > Thanks
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
> >
>
Re: Error in Kafka property file contains no connector type
Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi Mich,
Your bigquery-sink.properties file is in a JSON format - which won't work.
It needs to follow the usual format of a Java properties file.
Kind regards,
Liam Clarke-Hutchinson
On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <mi...@gmail.com>
wrote:
> Hi,
>
>
> Trying to stream from Kafka to Google BigQuery.
>
>
> The connect-standalone.properties is as follows
>
>
> key.converter=org.apache.kafka.connect.storage.StringConverter
>
> ##value.converter=org.apache.kafka.connect.storage.StringConverter
>
> value.converter=org.apache.kafka.connect.json.JsonConverter
>
> #
>
> # Converter-specific settings can be passed in by prefixing the Converter's
>
> # setting with the converter we want to apply it to
>
> key.converter.schemas.enable=true
>
> value.converter.schemas.enable=false
>
>
> # The internal converter used for offsets and config data is configurable
> and
>
> # must be specified, but most users will always want to use the built-in
>
> # default. Offset and config data is never visible outside of Kafka Connect
> in
>
> # this format.
>
> ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>
> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>
> internal.key.converter.schemas.enable=false
>
> internal.value.converter.schemas.enable=false
>
>
> offset.storage.file.filename=/tmp/connect_bq.offsets
>
> # Flush much faster than normal, which is useful for testing/debugging
>
> offset.flush.interval.ms=10000
>
>
> # Set to a list of filesystem paths separated by commas (,) to enable class
>
> # loading isolation for plugins (connectors, converters, transformations).
> The
>
> # list should consist of top level directories that include any combination
> of:
>
> # a) directories immediately containing jars with plugins and their
> dependencies
>
> # b) uber-jars with plugins and their dependencies
>
> # c) directories immediately containing the package directory structure of
>
> # classes of plugins and their dependencies Note: symlinks will be followed
> to
>
> # discover dependencies or plugins.
>
> # Examples:
>
> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>
>
> And bigquery-sink.properties file has this
>
>
> {
>
> "name": "bigquery-sink",
>
> "connector.type": "bigquery-connector",
>
> "connector.class":
> "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
>
> "defaultDataset": "test",
>
> "project": "xyz",
>
> "topics": "md",
>
> "autoCreateTables": "false",
>
> "gcsBucketName": "tmp_storage_bucket",
>
> "queueSize": "-1",
>
> "bigQueryRetry": "0",
>
> "bigQueryRetryWait": "1000",
>
> "bigQueryMessageTimePartitioning": "false",
>
> "bigQueryPartitionDecorator": "true",
>
> "timePartitioningType": "DAY",
>
> "keySource": "FILE",
>
> "keyfile": "/home/hduser/xyz.json",
>
> "sanitizeTopics": "false",
>
> "schemaRetriever":
>
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
>
> "threadPoolSize": "10",
>
> "allBQFieldsNullable": "false",
>
> "avroDataCacheSize": "100",
>
> "batchLoadIntervalSec": "120",
>
> "convertDoubleSpecialValues": "false",
>
> "enableBatchLoad": "false",
>
> "upsertEnabled": "false",
>
> "deleteEnabled": "false",
>
> "mergeIntervalMs": "60_000L",
>
> "mergeRecordsThreshold": "-1",
>
> "autoCreateBucket": "true",
>
> "allowNewBigQueryFields": "false",
>
> "allowBigQueryRequiredFieldRelaxation": "false",
>
> "allowSchemaUnionization": "false",
>
> "kafkaDataFieldName": "null",
>
> "kafkaKeyFieldName": "null"
>
> }
>
> Run as below
>
>
> $KAFKA_HOME/bin/connect-standalone.sh \
>
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
>
> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>
> I get this error
>
> [2021-03-11 11:07:58,826] ERROR Failed to create job for
> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> (org.apache.kafka.connect.cli.ConnectStandalone:102)
> [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:113)
> java.util.concurrent.ExecutionException:
> org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
> config {"defaultDataset"="test",,
>
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> "project"="axial-glow-224522",, "autoCreateTables"="false",,
> "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> "bigQueryMessageTimePartitioning"="false",,
> "connector.type"="bigquery-connector",,
> "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> "enableBatchLoad"="false",,
>
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> "kafkaDataFieldName"="null",, }=,
> "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector type
>
> I think the problem is the wrong entry in the bigquery-sink.properties
> file above.
>
> I cannot see what it is?
>
>
> Any ideas appreciated.
>
>
> Thanks
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>