You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by VIVEK KUMAR MISHRA 13BIT0066 <vi...@vit.ac.in> on 2017/03/27 13:22:28 UTC

kafka connector for mongodb as a source

Hi All,

I am creating kafka connector for mongodb as a source .My connector is
starting and connecting with kafka but it is not committing any offset.

This is output after starting connector.

[root@localhost kafka_2.11-0.10.1.1]# bin/connect-standalone.sh
config/connect-standalone.properties config/mongodb.properties
[2017-03-27 18:32:58,019] INFO StandaloneConfig values:
        rest.advertised.host.name = null
        task.shutdown.graceful.timeout.ms = 5000
        rest.host.name = null
        rest.advertised.port = null
        bootstrap.servers = [localhost:9092]
        offset.flush.timeout.ms = 5000
        offset.flush.interval.ms = 10000
        rest.port = 8083
        internal.key.converter = class
org.apache.kafka.connect.json.JsonConverter
        access.control.allow.methods =
        access.control.allow.origin =
        offset.storage.file.filename = /tmp/connect.offsets
        internal.value.converter = class
org.apache.kafka.connect.json.JsonConverter
        value.converter = class org.apache.kafka.connect.json.JsonConverter
        key.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:178)
[2017-03-27 18:32:58,162] INFO Logging initialized @609ms
(org.eclipse.jetty.util.log:186)
[2017-03-27 18:32:58,392] INFO Kafka Connect starting
(org.apache.kafka.connect.runtime.Connect:52)
[2017-03-27 18:32:58,392] INFO Herder starting
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
[2017-03-27 18:32:58,393] INFO Worker starting
(org.apache.kafka.connect.runtime.Worker:113)
[2017-03-27 18:32:58,393] INFO Starting FileOffsetBackingStore with file
/tmp/connect.offsets
(org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2017-03-27 18:32:58,398] INFO Worker started
(org.apache.kafka.connect.runtime.Worker:118)
[2017-03-27 18:32:58,398] INFO Herder started
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
[2017-03-27 18:32:58,398] INFO Starting REST server
(org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-03-27 18:32:58,493] INFO jetty-9.2.15.v20160210
(org.eclipse.jetty.server.Server:327)
[2017-03-27 18:32:59,621] INFO HV000001: Hibernate Validator 5.1.2.Final
(org.hibernate.validator.internal.util.Version:27)
Mar 27, 2017 6:32:59 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The
(sub)resource method listConnectors in
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains
empty path annotation.
WARNING: The (sub)resource method createConnector in
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains
empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
contains empty path annotation.
WARNING: The (sub)resource method serverInfo in
org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty
path annotation.

[2017-03-27 18:33:00,015] INFO Started
o.e.j.s.ServletContextHandler@44e3760b{/,null,AVAILABLE}
(org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-03-27 18:33:00,042] INFO Started ServerConnector@7f58ad44{HTTP/1.1}{
0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-03-27 18:33:00,043] INFO Started @2492ms
(org.eclipse.jetty.server.Server:379)
[2017-03-27 18:33:00,043] INFO REST server listening at
http://127.0.0.1:8083/, advertising URL http://127.0.0.1:8083/
(org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-03-27 18:33:00,043] INFO Kafka Connect started
(org.apache.kafka.connect.runtime.Connect:58)
[2017-03-27 18:33:00,048] INFO ConnectorConfig values:
        connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
        tasks.max = 1
        name = mongodb
        value.converter = null
        key.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:178)
[2017-03-27 18:33:00,048] INFO Creating connector mongodb of type
org.apache.kafka.connect.mongodb.MongodbSourceConnector
(org.apache.kafka.connect.runtime.Worker:159)
[2017-03-27 18:33:00,051] INFO Instantiated connector mongodb with version
0.10.0.1 of type class
org.apache.kafka.connect.mongodb.MongodbSourceConnector
(org.apache.kafka.connect.runtime.Worker:162)
[2017-03-27 18:33:00,053] INFO Finished creating connector mongodb
(org.apache.kafka.connect.runtime.Worker:173)
[2017-03-27 18:33:00,053] INFO SourceConnectorConfig values:
        connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
        tasks.max = 1
        name = mongodb
        value.converter = null
        key.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:178)
[2017-03-27 18:33:00,056] INFO Creating task mongodb-0
(org.apache.kafka.connect.runtime.Worker:252)
[2017-03-27 18:33:00,056] INFO ConnectorConfig values:
        connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
        tasks.max = 1
        name = mongodb
        value.converter = null
        key.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:178)
[2017-03-27 18:33:00,057] INFO TaskConfig values:
        task.class = class
org.apache.kafka.connect.mongodb.MongodbSourceTask
 (org.apache.kafka.connect.runtime.TaskConfig:178)
[2017-03-27 18:33:00,057] INFO Instantiated task mongodb-0 with version
0.10.0.1 of type org.apache.kafka.connect.mongodb.MongodbSourceTask
(org.apache.kafka.connect.runtime.Worker:264)
[2017-03-27 18:33:00,066] INFO ProducerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [localhost:9092]
        ssl.keystore.type = JKS
        sasl.mechanism = GSSAPI
        max.block.ms = 9223372036854775807
        interceptor.classes = null
        ssl.truststore.password = null
        client.id =
        ssl.endpoint.identification.algorithm = null
        request.timeout.ms = 2147483647
        acks = all
        receive.buffer.bytes = 32768
        ssl.truststore.type = JKS
        retries = 2147483647
        ssl.truststore.location = null
        ssl.keystore.password = null
        send.buffer.bytes = 131072
        compression.type = none
        metadata.fetch.timeout.ms = 60000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        block.on.buffer.full = false
        ssl.key.password = null
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        max.in.flight.requests.per.connection = 1
        metrics.num.samples = 2
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        batch.size = 16384
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        max.request.size = 1048576
        value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        linger.ms = 0
 (org.apache.kafka.clients.producer.ProducerConfig:178)
[2017-03-27 18:33:00,103] INFO ProducerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [localhost:9092]
        ssl.keystore.type = JKS
        sasl.mechanism = GSSAPI
        max.block.ms = 9223372036854775807
        interceptor.classes = null
        ssl.truststore.password = null
        client.id = producer-1
        ssl.endpoint.identification.algorithm = null
        request.timeout.ms = 2147483647
        acks = all
        receive.buffer.bytes = 32768
        ssl.truststore.type = JKS
        retries = 2147483647
        ssl.truststore.location = null
        ssl.keystore.password = null
        send.buffer.bytes = 131072
        compression.type = none
        metadata.fetch.timeout.ms = 60000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        block.on.buffer.full = false
        ssl.key.password = null
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        max.in.flight.requests.per.connection = 1
        metrics.num.samples = 2
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        batch.size = 16384
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        max.request.size = 1048576
        value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        linger.ms = 0
 (org.apache.kafka.clients.producer.ProducerConfig:178)
[2017-03-27 18:33:00,104] INFO Kafka version : 0.10.0.1
(org.apache.kafka.common.utils.AppInfoParser:83)
[2017-03-27 18:33:00,104] INFO Kafka commitId : a7a17cdec9eaa6c5
(org.apache.kafka.common.utils.AppInfoParser:84)
[2017-03-27 18:33:00,121] INFO Created connector mongodb
(org.apache.kafka.connect.cli.ConnectStandalone:93)
[2017-03-27 18:33:00,319] INFO Cluster created with settings
{hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN,
serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,397] INFO Source task WorkerSourceTask{id=mongodb-0}
finished initialization and start
(org.apache.kafka.connect.runtime.WorkerSourceTask:138)
[2017-03-27 18:33:00,442] INFO No server chosen by
ReadPreferenceServerSelector{readPreference=primary} from cluster
description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE,
all=[ServerDescription{address=localhost:27017, type=UNKNOWN,
state=CONNECTING}]}. Waiting for 30000 ms before timing out
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,455] INFO Opened connection
[connectionId{localValue:1, serverValue:4}] to localhost:27017
(org.mongodb.driver.connection:71)
[2017-03-27 18:33:00,457] INFO Monitor thread successfully connected to
server with description ServerDescription{address=localhost:27017,
type=STANDALONE, state=CONNECTED, ok=true,
version=ServerVersion{versionList=[3, 2, 12]}, minWireVersion=0,
maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=536169}
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,491] INFO Opened connection
[connectionId{localValue:2, serverValue:5}] to localhost:27017
(org.mongodb.driver.connection:71)
[2017-03-27 18:33:02,162] WARN could not create Dir using directory from
url
file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib.
skipping. (org.reflections.Reflections:104)
java.lang.NullPointerException
        at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
        at org.reflections.Reflections.scan(Reflections.java:237)
        at org.reflections.Reflections.scan(Reflections.java:204)
        at org.reflections.Reflections.<init>(Reflections.java:129)
        at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
        at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
        at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:02,170] WARN could not create Vfs.Dir from url. ignoring
the exception and continuing (org.reflections.Reflections:208)
org.reflections.ReflectionsException: could not create Vfs.Dir from url, no
matching UrlType was found
[file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib]
either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
static setDefaultURLTypes(final List<UrlType> urlTypes) or
addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
        at org.reflections.Reflections.scan(Reflections.java:237)
        at org.reflections.Reflections.scan(Reflections.java:204)
        at org.reflections.Reflections.<init>(Reflections.java:129)
        at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
        at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
        at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:02,471] WARN could not create Dir using directory from
url file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib.
skipping. (org.reflections.Reflections:104)
java.lang.NullPointerException
        at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
        at org.reflections.Reflections.scan(Reflections.java:237)
        at org.reflections.Reflections.scan(Reflections.java:204)
        at org.reflections.Reflections.<init>(Reflections.java:129)
        at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
        at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
        at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:02,473] WARN could not create Vfs.Dir from url. ignoring
the exception and continuing (org.reflections.Reflections:208)
org.reflections.ReflectionsException: could not create Vfs.Dir from url, no
matching UrlType was found
[file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib]
either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
static setDefaultURLTypes(final List<UrlType> urlTypes) or
addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
        at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
        at org.reflections.Reflections.scan(Reflections.java:237)
        at org.reflections.Reflections.scan(Reflections.java:204)
        at org.reflections.Reflections.<init>(Reflections.java:129)
        at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
        at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
        at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:04,338] INFO Reflections took 5826 ms to scan 187 urls,
producing 6162 keys and 38674 values  (org.reflections.Reflections:229)



After that it is not doing anything.

This is my config file.

name=mongodb
connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector
tasks.max=1
host=localhost
port=27017
batch.size=100
schema.name=mongodbschema
topic.prefix=mongo-prefix
databases=sampledb.hero



Please do suggest me the error ASAP.

Thanks

Re: kafka connector for mongodb as a source

Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
There is some log noise in there from Reflections, but it does look like
your connector & task are being created:

[2017-03-27 18:33:00,057] INFO Instantiated task mongodb-0 with version
0.10.0.1 of type org.apache.kafka.connect.mongodb.MongodbSourceTask
(org.apache.kafka.connect.runtime.Worker:264)

And I see the producer configs for the source task's underlying producer
being logged. Then we see the following, suggesting some sort of connection
is being made successfully:

[2017-03-27 18:33:00,397] INFO Source task WorkerSourceTask{id=mongodb-0}
finished initialization and start
(org.apache.kafka.connect.runtime.WorkerSourceTask:138)
[2017-03-27 18:33:00,442] INFO No server chosen by
ReadPreferenceServerSelector{readPreference=primary} from cluster
description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE,
all=[ServerDescription{address=localhost:27017, type=UNKNOWN,
state=CONNECTING}]}. Waiting for 30000 ms before timing out
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,455] INFO Opened connection
[connectionId{localValue:1, serverValue:4}] to localhost:27017
(org.mongodb.driver.connection:71)
[2017-03-27 18:33:00,457] INFO Monitor thread successfully connected to
server with description ServerDescription{address=localhost:27017,
type=STANDALONE, state=CONNECTED, ok=true,
version=ServerVersion{versionList=[3, 2, 12]}, minWireVersion=0,
maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=536169}
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,491] INFO Opened connection
[connectionId{localValue:2, serverValue:5}] to localhost:27017
(org.mongodb.driver.connection:71)

But then the logs stop. The framework should just be calling poll() on your
source task. Perhaps you could add some logging to your code to give some
hint as to where it is getting stuck? You could also try increasing the log
level for the framework to DEBUG or even TRACE.

-Ewen

On Mon, Mar 27, 2017 at 6:22 AM, VIVEK KUMAR MISHRA 13BIT0066 <
vivekkumar.mishra2013@vit.ac.in> wrote:

> Hi All,
>
> I am creating kafka connector for mongodb as a source .My connector is
> starting and connecting with kafka but it is not committing any offset.
>
> This is output after starting connector.
>
> [root@localhost kafka_2.11-0.10.1.1]# bin/connect-standalone.sh
> config/connect-standalone.properties config/mongodb.properties
> [2017-03-27 18:32:58,019] INFO StandaloneConfig values:
>         rest.advertised.host.name = null
>         task.shutdown.graceful.timeout.ms = 5000
>         rest.host.name = null
>         rest.advertised.port = null
>         bootstrap.servers = [localhost:9092]
>         offset.flush.timeout.ms = 5000
>         offset.flush.interval.ms = 10000
>         rest.port = 8083
>         internal.key.converter = class
> org.apache.kafka.connect.json.JsonConverter
>         access.control.allow.methods =
>         access.control.allow.origin =
>         offset.storage.file.filename = /tmp/connect.offsets
>         internal.value.converter = class
> org.apache.kafka.connect.json.JsonConverter
>         value.converter = class org.apache.kafka.connect.json.
> JsonConverter
>         key.converter = class org.apache.kafka.connect.json.JsonConverter
>  (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:178)
> [2017-03-27 18:32:58,162] INFO Logging initialized @609ms
> (org.eclipse.jetty.util.log:186)
> [2017-03-27 18:32:58,392] INFO Kafka Connect starting
> (org.apache.kafka.connect.runtime.Connect:52)
> [2017-03-27 18:32:58,392] INFO Herder starting
> (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
> [2017-03-27 18:32:58,393] INFO Worker starting
> (org.apache.kafka.connect.runtime.Worker:113)
> [2017-03-27 18:32:58,393] INFO Starting FileOffsetBackingStore with file
> /tmp/connect.offsets
> (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
> [2017-03-27 18:32:58,398] INFO Worker started
> (org.apache.kafka.connect.runtime.Worker:118)
> [2017-03-27 18:32:58,398] INFO Herder started
> (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
> [2017-03-27 18:32:58,398] INFO Starting REST server
> (org.apache.kafka.connect.runtime.rest.RestServer:98)
> [2017-03-27 18:32:58,493] INFO jetty-9.2.15.v20160210
> (org.eclipse.jetty.server.Server:327)
> [2017-03-27 18:32:59,621] INFO HV000001: Hibernate Validator 5.1.2.Final
> (org.hibernate.validator.internal.util.Version:27)
> Mar 27, 2017 6:32:59 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The
> (sub)resource method listConnectors in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
> contains
> empty path annotation.
> WARNING: The (sub)resource method createConnector in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
> contains
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains
> empty
> path annotation.
>
> [2017-03-27 18:33:00,015] INFO Started
> o.e.j.s.ServletContextHandler@44e3760b{/,null,AVAILABLE}
> (org.eclipse.jetty.server.handler.ContextHandler:744)
> [2017-03-27 18:33:00,042] INFO Started ServerConnector@7f58ad44{HTTP/1.1}{
> 0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
> [2017-03-27 18:33:00,043] INFO Started @2492ms
> (org.eclipse.jetty.server.Server:379)
> [2017-03-27 18:33:00,043] INFO REST server listening at
> http://127.0.0.1:8083/, advertising URL http://127.0.0.1:8083/
> (org.apache.kafka.connect.runtime.rest.RestServer:150)
> [2017-03-27 18:33:00,043] INFO Kafka Connect started
> (org.apache.kafka.connect.runtime.Connect:58)
> [2017-03-27 18:33:00,048] INFO ConnectorConfig values:
>         connector.class =
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
>         tasks.max = 1
>         name = mongodb
>         value.converter = null
>         key.converter = null
>  (org.apache.kafka.connect.runtime.ConnectorConfig:178)
> [2017-03-27 18:33:00,048] INFO Creating connector mongodb of type
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
> (org.apache.kafka.connect.runtime.Worker:159)
> [2017-03-27 18:33:00,051] INFO Instantiated connector mongodb with version
> 0.10.0.1 of type class
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
> (org.apache.kafka.connect.runtime.Worker:162)
> [2017-03-27 18:33:00,053] INFO Finished creating connector mongodb
> (org.apache.kafka.connect.runtime.Worker:173)
> [2017-03-27 18:33:00,053] INFO SourceConnectorConfig values:
>         connector.class =
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
>         tasks.max = 1
>         name = mongodb
>         value.converter = null
>         key.converter = null
>  (org.apache.kafka.connect.runtime.SourceConnectorConfig:178)
> [2017-03-27 18:33:00,056] INFO Creating task mongodb-0
> (org.apache.kafka.connect.runtime.Worker:252)
> [2017-03-27 18:33:00,056] INFO ConnectorConfig values:
>         connector.class =
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
>         tasks.max = 1
>         name = mongodb
>         value.converter = null
>         key.converter = null
>  (org.apache.kafka.connect.runtime.ConnectorConfig:178)
> [2017-03-27 18:33:00,057] INFO TaskConfig values:
>         task.class = class
> org.apache.kafka.connect.mongodb.MongodbSourceTask
>  (org.apache.kafka.connect.runtime.TaskConfig:178)
> [2017-03-27 18:33:00,057] INFO Instantiated task mongodb-0 with version
> 0.10.0.1 of type org.apache.kafka.connect.mongodb.MongodbSourceTask
> (org.apache.kafka.connect.runtime.Worker:264)
> [2017-03-27 18:33:00,066] INFO ProducerConfig values:
>         metric.reporters = []
>         metadata.max.age.ms = 300000
>         reconnect.backoff.ms = 50
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         bootstrap.servers = [localhost:9092]
>         ssl.keystore.type = JKS
>         sasl.mechanism = GSSAPI
>         max.block.ms = 9223372036854775807
>         interceptor.classes = null
>         ssl.truststore.password = null
>         client.id =
>         ssl.endpoint.identification.algorithm = null
>         request.timeout.ms = 2147483647
>         acks = all
>         receive.buffer.bytes = 32768
>         ssl.truststore.type = JKS
>         retries = 2147483647
>         ssl.truststore.location = null
>         ssl.keystore.password = null
>         send.buffer.bytes = 131072
>         compression.type = none
>         metadata.fetch.timeout.ms = 60000
>         retry.backoff.ms = 100
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         buffer.memory = 33554432
>         timeout.ms = 30000
>         key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         ssl.trustmanager.algorithm = PKIX
>         block.on.buffer.full = false
>         ssl.key.password = null
>         sasl.kerberos.min.time.before.relogin = 60000
>         connections.max.idle.ms = 540000
>         max.in.flight.requests.per.connection = 1
>         metrics.num.samples = 2
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         batch.size = 16384
>         ssl.keystore.location = null
>         ssl.cipher.suites = null
>         security.protocol = PLAINTEXT
>         max.request.size = 1048576
>         value.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         ssl.keymanager.algorithm = SunX509
>         metrics.sample.window.ms = 30000
>         partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>         linger.ms = 0
>  (org.apache.kafka.clients.producer.ProducerConfig:178)
> [2017-03-27 18:33:00,103] INFO ProducerConfig values:
>         metric.reporters = []
>         metadata.max.age.ms = 300000
>         reconnect.backoff.ms = 50
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         bootstrap.servers = [localhost:9092]
>         ssl.keystore.type = JKS
>         sasl.mechanism = GSSAPI
>         max.block.ms = 9223372036854775807
>         interceptor.classes = null
>         ssl.truststore.password = null
>         client.id = producer-1
>         ssl.endpoint.identification.algorithm = null
>         request.timeout.ms = 2147483647
>         acks = all
>         receive.buffer.bytes = 32768
>         ssl.truststore.type = JKS
>         retries = 2147483647
>         ssl.truststore.location = null
>         ssl.keystore.password = null
>         send.buffer.bytes = 131072
>         compression.type = none
>         metadata.fetch.timeout.ms = 60000
>         retry.backoff.ms = 100
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         buffer.memory = 33554432
>         timeout.ms = 30000
>         key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         ssl.trustmanager.algorithm = PKIX
>         block.on.buffer.full = false
>         ssl.key.password = null
>         sasl.kerberos.min.time.before.relogin = 60000
>         connections.max.idle.ms = 540000
>         max.in.flight.requests.per.connection = 1
>         metrics.num.samples = 2
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         batch.size = 16384
>         ssl.keystore.location = null
>         ssl.cipher.suites = null
>         security.protocol = PLAINTEXT
>         max.request.size = 1048576
>         value.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>         ssl.keymanager.algorithm = SunX509
>         metrics.sample.window.ms = 30000
>         partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>         linger.ms = 0
>  (org.apache.kafka.clients.producer.ProducerConfig:178)
> [2017-03-27 18:33:00,104] INFO Kafka version : 0.10.0.1
> (org.apache.kafka.common.utils.AppInfoParser:83)
> [2017-03-27 18:33:00,104] INFO Kafka commitId : a7a17cdec9eaa6c5
> (org.apache.kafka.common.utils.AppInfoParser:84)
> [2017-03-27 18:33:00,121] INFO Created connector mongodb
> (org.apache.kafka.connect.cli.ConnectStandalone:93)
> [2017-03-27 18:33:00,319] INFO Cluster created with settings
> {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN,
> serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
> (org.mongodb.driver.cluster:71)
> [2017-03-27 18:33:00,397] INFO Source task WorkerSourceTask{id=mongodb-0}
> finished initialization and start
> (org.apache.kafka.connect.runtime.WorkerSourceTask:138)
> [2017-03-27 18:33:00,442] INFO No server chosen by
> ReadPreferenceServerSelector{readPreference=primary} from cluster
> description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE,
> all=[ServerDescription{address=localhost:27017, type=UNKNOWN,
> state=CONNECTING}]}. Waiting for 30000 ms before timing out
> (org.mongodb.driver.cluster:71)
> [2017-03-27 18:33:00,455] INFO Opened connection
> [connectionId{localValue:1, serverValue:4}] to localhost:27017
> (org.mongodb.driver.connection:71)
> [2017-03-27 18:33:00,457] INFO Monitor thread successfully connected to
> server with description ServerDescription{address=localhost:27017,
> type=STANDALONE, state=CONNECTED, ok=true,
> version=ServerVersion{versionList=[3, 2, 12]}, minWireVersion=0,
> maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=536169}
> (org.mongodb.driver.cluster:71)
> [2017-03-27 18:33:00,491] INFO Opened connection
> [connectionId{localValue:2, serverValue:5}] to localhost:27017
> (org.mongodb.driver.connection:71)
> [2017-03-27 18:33:02,162] WARN could not create Dir using directory from
> url
> file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib.
> skipping. (org.reflections.Reflections:104)
> java.lang.NullPointerException
>         at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
>         at org.reflections.Reflections.scan(Reflections.java:237)
>         at org.reflections.Reflections.scan(Reflections.java:204)
>         at org.reflections.Reflections.<init>(Reflections.java:129)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
>         at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:02,170] WARN could not create Vfs.Dir from url. ignoring
> the exception and continuing (org.reflections.Reflections:208)
> org.reflections.ReflectionsException: could not create Vfs.Dir from url,
> no
> matching UrlType was found
> [file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib]
> either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
> static setDefaultURLTypes(final List<UrlType> urlTypes) or
> addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
>         at org.reflections.Reflections.scan(Reflections.java:237)
>         at org.reflections.Reflections.scan(Reflections.java:204)
>         at org.reflections.Reflections.<init>(Reflections.java:129)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
>         at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:02,471] WARN could not create Dir using directory from
> url file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib.
> skipping. (org.reflections.Reflections:104)
> java.lang.NullPointerException
>         at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
>         at org.reflections.Reflections.scan(Reflections.java:237)
>         at org.reflections.Reflections.scan(Reflections.java:204)
>         at org.reflections.Reflections.<init>(Reflections.java:129)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
>         at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:02,473] WARN could not create Vfs.Dir from url. ignoring
> the exception and continuing (org.reflections.Reflections:208)
> org.reflections.ReflectionsException: could not create Vfs.Dir from url,
> no
> matching UrlType was found
> [file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib]
> either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
> static setDefaultURLTypes(final List<UrlType> urlTypes) or
> addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
>         at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
>         at org.reflections.Reflections.scan(Reflections.java:237)
>         at org.reflections.Reflections.scan(Reflections.java:204)
>         at org.reflections.Reflections.<init>(Reflections.java:129)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
>         at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
>         at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:04,338] INFO Reflections took 5826 ms to scan 187 urls,
> producing 6162 keys and 38674 values  (org.reflections.Reflections:229)
>
>
>
> After that it is not doing anything.
>
> This is my config file.
>
> name=mongodb
> connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector
> tasks.max=1
> host=localhost
> port=27017
> batch.size=100
> schema.name=mongodbschema
> topic.prefix=mongo-prefix
> databases=sampledb.hero
>
>
>
> Please do suggest me the error ASAP.
>
> Thanks
>