You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by VIVEK KUMAR MISHRA 13BIT0066 <vi...@vit.ac.in> on 2017/03/27 13:22:28 UTC
kafka connector for mongodb as a source
Hi All,
I am creating kafka connector for mongodb as a source .My connector is
starting and connecting with kafka but it is not committing any offset.
This is output after starting connector.
[root@localhost kafka_2.11-0.10.1.1]# bin/connect-standalone.sh
config/connect-standalone.properties config/mongodb.properties
[2017-03-27 18:32:58,019] INFO StandaloneConfig values:
rest.advertised.host.name = null
task.shutdown.graceful.timeout.ms = 5000
rest.host.name = null
rest.advertised.port = null
bootstrap.servers = [localhost:9092]
offset.flush.timeout.ms = 5000
offset.flush.interval.ms = 10000
rest.port = 8083
internal.key.converter = class
org.apache.kafka.connect.json.JsonConverter
access.control.allow.methods =
access.control.allow.origin =
offset.storage.file.filename = /tmp/connect.offsets
internal.value.converter = class
org.apache.kafka.connect.json.JsonConverter
value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.standalone.StandaloneConfig:178)
[2017-03-27 18:32:58,162] INFO Logging initialized @609ms
(org.eclipse.jetty.util.log:186)
[2017-03-27 18:32:58,392] INFO Kafka Connect starting
(org.apache.kafka.connect.runtime.Connect:52)
[2017-03-27 18:32:58,392] INFO Herder starting
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
[2017-03-27 18:32:58,393] INFO Worker starting
(org.apache.kafka.connect.runtime.Worker:113)
[2017-03-27 18:32:58,393] INFO Starting FileOffsetBackingStore with file
/tmp/connect.offsets
(org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2017-03-27 18:32:58,398] INFO Worker started
(org.apache.kafka.connect.runtime.Worker:118)
[2017-03-27 18:32:58,398] INFO Herder started
(org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
[2017-03-27 18:32:58,398] INFO Starting REST server
(org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-03-27 18:32:58,493] INFO jetty-9.2.15.v20160210
(org.eclipse.jetty.server.Server:327)
[2017-03-27 18:32:59,621] INFO HV000001: Hibernate Validator 5.1.2.Final
(org.hibernate.validator.internal.util.Version:27)
Mar 27, 2017 6:32:59 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The
(sub)resource method listConnectors in
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains
empty path annotation.
WARNING: The (sub)resource method createConnector in
org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains
empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
contains empty path annotation.
WARNING: The (sub)resource method serverInfo in
org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty
path annotation.
[2017-03-27 18:33:00,015] INFO Started
o.e.j.s.ServletContextHandler@44e3760b{/,null,AVAILABLE}
(org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-03-27 18:33:00,042] INFO Started ServerConnector@7f58ad44{HTTP/1.1}{
0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-03-27 18:33:00,043] INFO Started @2492ms
(org.eclipse.jetty.server.Server:379)
[2017-03-27 18:33:00,043] INFO REST server listening at
http://127.0.0.1:8083/, advertising URL http://127.0.0.1:8083/
(org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-03-27 18:33:00,043] INFO Kafka Connect started
(org.apache.kafka.connect.runtime.Connect:58)
[2017-03-27 18:33:00,048] INFO ConnectorConfig values:
connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
tasks.max = 1
name = mongodb
value.converter = null
key.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:178)
[2017-03-27 18:33:00,048] INFO Creating connector mongodb of type
org.apache.kafka.connect.mongodb.MongodbSourceConnector
(org.apache.kafka.connect.runtime.Worker:159)
[2017-03-27 18:33:00,051] INFO Instantiated connector mongodb with version
0.10.0.1 of type class
org.apache.kafka.connect.mongodb.MongodbSourceConnector
(org.apache.kafka.connect.runtime.Worker:162)
[2017-03-27 18:33:00,053] INFO Finished creating connector mongodb
(org.apache.kafka.connect.runtime.Worker:173)
[2017-03-27 18:33:00,053] INFO SourceConnectorConfig values:
connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
tasks.max = 1
name = mongodb
value.converter = null
key.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig:178)
[2017-03-27 18:33:00,056] INFO Creating task mongodb-0
(org.apache.kafka.connect.runtime.Worker:252)
[2017-03-27 18:33:00,056] INFO ConnectorConfig values:
connector.class =
org.apache.kafka.connect.mongodb.MongodbSourceConnector
tasks.max = 1
name = mongodb
value.converter = null
key.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig:178)
[2017-03-27 18:33:00,057] INFO TaskConfig values:
task.class = class
org.apache.kafka.connect.mongodb.MongodbSourceTask
(org.apache.kafka.connect.runtime.TaskConfig:178)
[2017-03-27 18:33:00,057] INFO Instantiated task mongodb-0 with version
0.10.0.1 of type org.apache.kafka.connect.mongodb.MongodbSourceTask
(org.apache.kafka.connect.runtime.Worker:264)
[2017-03-27 18:33:00,066] INFO ProducerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [localhost:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 9223372036854775807
interceptor.classes = null
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 2147483647
acks = all
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 2147483647
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 1
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 0
(org.apache.kafka.clients.producer.ProducerConfig:178)
[2017-03-27 18:33:00,103] INFO ProducerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [localhost:9092]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 9223372036854775807
interceptor.classes = null
ssl.truststore.password = null
client.id = producer-1
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 2147483647
acks = all
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 2147483647
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 1
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 0
(org.apache.kafka.clients.producer.ProducerConfig:178)
[2017-03-27 18:33:00,104] INFO Kafka version : 0.10.0.1
(org.apache.kafka.common.utils.AppInfoParser:83)
[2017-03-27 18:33:00,104] INFO Kafka commitId : a7a17cdec9eaa6c5
(org.apache.kafka.common.utils.AppInfoParser:84)
[2017-03-27 18:33:00,121] INFO Created connector mongodb
(org.apache.kafka.connect.cli.ConnectStandalone:93)
[2017-03-27 18:33:00,319] INFO Cluster created with settings
{hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN,
serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,397] INFO Source task WorkerSourceTask{id=mongodb-0}
finished initialization and start
(org.apache.kafka.connect.runtime.WorkerSourceTask:138)
[2017-03-27 18:33:00,442] INFO No server chosen by
ReadPreferenceServerSelector{readPreference=primary} from cluster
description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE,
all=[ServerDescription{address=localhost:27017, type=UNKNOWN,
state=CONNECTING}]}. Waiting for 30000 ms before timing out
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,455] INFO Opened connection
[connectionId{localValue:1, serverValue:4}] to localhost:27017
(org.mongodb.driver.connection:71)
[2017-03-27 18:33:00,457] INFO Monitor thread successfully connected to
server with description ServerDescription{address=localhost:27017,
type=STANDALONE, state=CONNECTED, ok=true,
version=ServerVersion{versionList=[3, 2, 12]}, minWireVersion=0,
maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=536169}
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,491] INFO Opened connection
[connectionId{localValue:2, serverValue:5}] to localhost:27017
(org.mongodb.driver.connection:71)
[2017-03-27 18:33:02,162] WARN could not create Dir using directory from
url
file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib.
skipping. (org.reflections.Reflections:104)
java.lang.NullPointerException
at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
at org.reflections.Reflections.scan(Reflections.java:237)
at org.reflections.Reflections.scan(Reflections.java:204)
at org.reflections.Reflections.<init>(Reflections.java:129)
at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:02,170] WARN could not create Vfs.Dir from url. ignoring
the exception and continuing (org.reflections.Reflections:208)
org.reflections.ReflectionsException: could not create Vfs.Dir from url, no
matching UrlType was found
[file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib]
either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
static setDefaultURLTypes(final List<UrlType> urlTypes) or
addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
at org.reflections.Reflections.scan(Reflections.java:237)
at org.reflections.Reflections.scan(Reflections.java:204)
at org.reflections.Reflections.<init>(Reflections.java:129)
at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:02,471] WARN could not create Dir using directory from
url file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib.
skipping. (org.reflections.Reflections:104)
java.lang.NullPointerException
at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
at org.reflections.Reflections.scan(Reflections.java:237)
at org.reflections.Reflections.scan(Reflections.java:204)
at org.reflections.Reflections.<init>(Reflections.java:129)
at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:02,473] WARN could not create Vfs.Dir from url. ignoring
the exception and continuing (org.reflections.Reflections:208)
org.reflections.ReflectionsException: could not create Vfs.Dir from url, no
matching UrlType was found
[file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib]
either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
static setDefaultURLTypes(final List<UrlType> urlTypes) or
addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
at org.reflections.Reflections.scan(Reflections.java:237)
at org.reflections.Reflections.scan(Reflections.java:204)
at org.reflections.Reflections.<init>(Reflections.java:129)
at
org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
at
org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
at java.lang.Thread.run(Thread.java:745)
[2017-03-27 18:33:04,338] INFO Reflections took 5826 ms to scan 187 urls,
producing 6162 keys and 38674 values (org.reflections.Reflections:229)
After that it is not doing anything.
This is my config file.
name=mongodb
connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector
tasks.max=1
host=localhost
port=27017
batch.size=100
schema.name=mongodbschema
topic.prefix=mongo-prefix
databases=sampledb.hero
Please do suggest me the error ASAP.
Thanks
Re: kafka connector for mongodb as a source
Posted by Ewen Cheslack-Postava <ew...@confluent.io>.
There is some log noise in there from Reflections, but it does look like
your connector & task are being created:
[2017-03-27 18:33:00,057] INFO Instantiated task mongodb-0 with version
0.10.0.1 of type org.apache.kafka.connect.mongodb.MongodbSourceTask
(org.apache.kafka.connect.runtime.Worker:264)
And I see the producer configs for the source task's underlying producer
being logged. Then we see the following, suggesting some sort of connection
is being made successfully:
[2017-03-27 18:33:00,397] INFO Source task WorkerSourceTask{id=mongodb-0}
finished initialization and start
(org.apache.kafka.connect.runtime.WorkerSourceTask:138)
[2017-03-27 18:33:00,442] INFO No server chosen by
ReadPreferenceServerSelector{readPreference=primary} from cluster
description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE,
all=[ServerDescription{address=localhost:27017, type=UNKNOWN,
state=CONNECTING}]}. Waiting for 30000 ms before timing out
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,455] INFO Opened connection
[connectionId{localValue:1, serverValue:4}] to localhost:27017
(org.mongodb.driver.connection:71)
[2017-03-27 18:33:00,457] INFO Monitor thread successfully connected to
server with description ServerDescription{address=localhost:27017,
type=STANDALONE, state=CONNECTED, ok=true,
version=ServerVersion{versionList=[3, 2, 12]}, minWireVersion=0,
maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=536169}
(org.mongodb.driver.cluster:71)
[2017-03-27 18:33:00,491] INFO Opened connection
[connectionId{localValue:2, serverValue:5}] to localhost:27017
(org.mongodb.driver.connection:71)
But then the logs stop. The framework should just be calling poll() on your
source task. Perhaps you could add some logging to your code to give some
hint as to where it is getting stuck? You could also try increasing the log
level for the framework to DEBUG or even TRACE.
-Ewen
On Mon, Mar 27, 2017 at 6:22 AM, VIVEK KUMAR MISHRA 13BIT0066 <
vivekkumar.mishra2013@vit.ac.in> wrote:
> Hi All,
>
> I am creating kafka connector for mongodb as a source .My connector is
> starting and connecting with kafka but it is not committing any offset.
>
> This is output after starting connector.
>
> [root@localhost kafka_2.11-0.10.1.1]# bin/connect-standalone.sh
> config/connect-standalone.properties config/mongodb.properties
> [2017-03-27 18:32:58,019] INFO StandaloneConfig values:
> rest.advertised.host.name = null
> task.shutdown.graceful.timeout.ms = 5000
> rest.host.name = null
> rest.advertised.port = null
> bootstrap.servers = [localhost:9092]
> offset.flush.timeout.ms = 5000
> offset.flush.interval.ms = 10000
> rest.port = 8083
> internal.key.converter = class
> org.apache.kafka.connect.json.JsonConverter
> access.control.allow.methods =
> access.control.allow.origin =
> offset.storage.file.filename = /tmp/connect.offsets
> internal.value.converter = class
> org.apache.kafka.connect.json.JsonConverter
> value.converter = class org.apache.kafka.connect.json.
> JsonConverter
> key.converter = class org.apache.kafka.connect.json.JsonConverter
> (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:178)
> [2017-03-27 18:32:58,162] INFO Logging initialized @609ms
> (org.eclipse.jetty.util.log:186)
> [2017-03-27 18:32:58,392] INFO Kafka Connect starting
> (org.apache.kafka.connect.runtime.Connect:52)
> [2017-03-27 18:32:58,392] INFO Herder starting
> (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
> [2017-03-27 18:32:58,393] INFO Worker starting
> (org.apache.kafka.connect.runtime.Worker:113)
> [2017-03-27 18:32:58,393] INFO Starting FileOffsetBackingStore with file
> /tmp/connect.offsets
> (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
> [2017-03-27 18:32:58,398] INFO Worker started
> (org.apache.kafka.connect.runtime.Worker:118)
> [2017-03-27 18:32:58,398] INFO Herder started
> (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
> [2017-03-27 18:32:58,398] INFO Starting REST server
> (org.apache.kafka.connect.runtime.rest.RestServer:98)
> [2017-03-27 18:32:58,493] INFO jetty-9.2.15.v20160210
> (org.eclipse.jetty.server.Server:327)
> [2017-03-27 18:32:59,621] INFO HV000001: Hibernate Validator 5.1.2.Final
> (org.hibernate.validator.internal.util.Version:27)
> Mar 27, 2017 6:32:59 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The
> (sub)resource method listConnectors in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
> contains
> empty path annotation.
> WARNING: The (sub)resource method createConnector in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource
> contains
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains
> empty
> path annotation.
>
> [2017-03-27 18:33:00,015] INFO Started
> o.e.j.s.ServletContextHandler@44e3760b{/,null,AVAILABLE}
> (org.eclipse.jetty.server.handler.ContextHandler:744)
> [2017-03-27 18:33:00,042] INFO Started ServerConnector@7f58ad44{HTTP/1.1}{
> 0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
> [2017-03-27 18:33:00,043] INFO Started @2492ms
> (org.eclipse.jetty.server.Server:379)
> [2017-03-27 18:33:00,043] INFO REST server listening at
> http://127.0.0.1:8083/, advertising URL http://127.0.0.1:8083/
> (org.apache.kafka.connect.runtime.rest.RestServer:150)
> [2017-03-27 18:33:00,043] INFO Kafka Connect started
> (org.apache.kafka.connect.runtime.Connect:58)
> [2017-03-27 18:33:00,048] INFO ConnectorConfig values:
> connector.class =
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
> tasks.max = 1
> name = mongodb
> value.converter = null
> key.converter = null
> (org.apache.kafka.connect.runtime.ConnectorConfig:178)
> [2017-03-27 18:33:00,048] INFO Creating connector mongodb of type
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
> (org.apache.kafka.connect.runtime.Worker:159)
> [2017-03-27 18:33:00,051] INFO Instantiated connector mongodb with version
> 0.10.0.1 of type class
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
> (org.apache.kafka.connect.runtime.Worker:162)
> [2017-03-27 18:33:00,053] INFO Finished creating connector mongodb
> (org.apache.kafka.connect.runtime.Worker:173)
> [2017-03-27 18:33:00,053] INFO SourceConnectorConfig values:
> connector.class =
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
> tasks.max = 1
> name = mongodb
> value.converter = null
> key.converter = null
> (org.apache.kafka.connect.runtime.SourceConnectorConfig:178)
> [2017-03-27 18:33:00,056] INFO Creating task mongodb-0
> (org.apache.kafka.connect.runtime.Worker:252)
> [2017-03-27 18:33:00,056] INFO ConnectorConfig values:
> connector.class =
> org.apache.kafka.connect.mongodb.MongodbSourceConnector
> tasks.max = 1
> name = mongodb
> value.converter = null
> key.converter = null
> (org.apache.kafka.connect.runtime.ConnectorConfig:178)
> [2017-03-27 18:33:00,057] INFO TaskConfig values:
> task.class = class
> org.apache.kafka.connect.mongodb.MongodbSourceTask
> (org.apache.kafka.connect.runtime.TaskConfig:178)
> [2017-03-27 18:33:00,057] INFO Instantiated task mongodb-0 with version
> 0.10.0.1 of type org.apache.kafka.connect.mongodb.MongodbSourceTask
> (org.apache.kafka.connect.runtime.Worker:264)
> [2017-03-27 18:33:00,066] INFO ProducerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 300000
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> bootstrap.servers = [localhost:9092]
> ssl.keystore.type = JKS
> sasl.mechanism = GSSAPI
> max.block.ms = 9223372036854775807
> interceptor.classes = null
> ssl.truststore.password = null
> client.id =
> ssl.endpoint.identification.algorithm = null
> request.timeout.ms = 2147483647
> acks = all
> receive.buffer.bytes = 32768
> ssl.truststore.type = JKS
> retries = 2147483647
> ssl.truststore.location = null
> ssl.keystore.password = null
> send.buffer.bytes = 131072
> compression.type = none
> metadata.fetch.timeout.ms = 60000
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> buffer.memory = 33554432
> timeout.ms = 30000
> key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> block.on.buffer.full = false
> ssl.key.password = null
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> max.in.flight.requests.per.connection = 1
> metrics.num.samples = 2
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> batch.size = 16384
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> max.request.size = 1048576
> value.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> linger.ms = 0
> (org.apache.kafka.clients.producer.ProducerConfig:178)
> [2017-03-27 18:33:00,103] INFO ProducerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 300000
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> bootstrap.servers = [localhost:9092]
> ssl.keystore.type = JKS
> sasl.mechanism = GSSAPI
> max.block.ms = 9223372036854775807
> interceptor.classes = null
> ssl.truststore.password = null
> client.id = producer-1
> ssl.endpoint.identification.algorithm = null
> request.timeout.ms = 2147483647
> acks = all
> receive.buffer.bytes = 32768
> ssl.truststore.type = JKS
> retries = 2147483647
> ssl.truststore.location = null
> ssl.keystore.password = null
> send.buffer.bytes = 131072
> compression.type = none
> metadata.fetch.timeout.ms = 60000
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> buffer.memory = 33554432
> timeout.ms = 30000
> key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> block.on.buffer.full = false
> ssl.key.password = null
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> max.in.flight.requests.per.connection = 1
> metrics.num.samples = 2
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> batch.size = 16384
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> max.request.size = 1048576
> value.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> linger.ms = 0
> (org.apache.kafka.clients.producer.ProducerConfig:178)
> [2017-03-27 18:33:00,104] INFO Kafka version : 0.10.0.1
> (org.apache.kafka.common.utils.AppInfoParser:83)
> [2017-03-27 18:33:00,104] INFO Kafka commitId : a7a17cdec9eaa6c5
> (org.apache.kafka.common.utils.AppInfoParser:84)
> [2017-03-27 18:33:00,121] INFO Created connector mongodb
> (org.apache.kafka.connect.cli.ConnectStandalone:93)
> [2017-03-27 18:33:00,319] INFO Cluster created with settings
> {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN,
> serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
> (org.mongodb.driver.cluster:71)
> [2017-03-27 18:33:00,397] INFO Source task WorkerSourceTask{id=mongodb-0}
> finished initialization and start
> (org.apache.kafka.connect.runtime.WorkerSourceTask:138)
> [2017-03-27 18:33:00,442] INFO No server chosen by
> ReadPreferenceServerSelector{readPreference=primary} from cluster
> description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE,
> all=[ServerDescription{address=localhost:27017, type=UNKNOWN,
> state=CONNECTING}]}. Waiting for 30000 ms before timing out
> (org.mongodb.driver.cluster:71)
> [2017-03-27 18:33:00,455] INFO Opened connection
> [connectionId{localValue:1, serverValue:4}] to localhost:27017
> (org.mongodb.driver.connection:71)
> [2017-03-27 18:33:00,457] INFO Monitor thread successfully connected to
> server with description ServerDescription{address=localhost:27017,
> type=STANDALONE, state=CONNECTED, ok=true,
> version=ServerVersion{versionList=[3, 2, 12]}, minWireVersion=0,
> maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=536169}
> (org.mongodb.driver.cluster:71)
> [2017-03-27 18:33:00,491] INFO Opened connection
> [connectionId{localValue:2, serverValue:5}] to localhost:27017
> (org.mongodb.driver.connection:71)
> [2017-03-27 18:33:02,162] WARN could not create Dir using directory from
> url
> file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib.
> skipping. (org.reflections.Reflections:104)
> java.lang.NullPointerException
> at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
> at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
> at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
> at org.reflections.Reflections.scan(Reflections.java:237)
> at org.reflections.Reflections.scan(Reflections.java:204)
> at org.reflections.Reflections.<init>(Reflections.java:129)
> at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
> at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
> at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:02,170] WARN could not create Vfs.Dir from url. ignoring
> the exception and continuing (org.reflections.Reflections:208)
> org.reflections.ReflectionsException: could not create Vfs.Dir from url,
> no
> matching UrlType was found
> [file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/rdbms/jlib]
> either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
> static setDefaultURLTypes(final List<UrlType> urlTypes) or
> addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
> at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
> at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
> at org.reflections.Reflections.scan(Reflections.java:237)
> at org.reflections.Reflections.scan(Reflections.java:204)
> at org.reflections.Reflections.<init>(Reflections.java:129)
> at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
> at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
> at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:02,471] WARN could not create Dir using directory from
> url file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib.
> skipping. (org.reflections.Reflections:104)
> java.lang.NullPointerException
> at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239)
> at org.reflections.vfs.Vfs.fromURL(Vfs.java:98)
> at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
> at org.reflections.Reflections.scan(Reflections.java:237)
> at org.reflections.Reflections.scan(Reflections.java:204)
> at org.reflections.Reflections.<init>(Reflections.java:129)
> at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
> at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
> at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:02,473] WARN could not create Vfs.Dir from url. ignoring
> the exception and continuing (org.reflections.Reflections:208)
> org.reflections.ReflectionsException: could not create Vfs.Dir from url,
> no
> matching UrlType was found
> [file:/home/oracle/app/u01/app/oracle/product/12.2.0/dbhome_1/jlib]
> either use fromURL(final URL url, final List<UrlType> urlTypes) or use the
> static setDefaultURLTypes(final List<UrlType> urlTypes) or
> addDefaultURLTypes(UrlType urlType) with your specialized UrlType.
> at org.reflections.vfs.Vfs.fromURL(Vfs.java:109)
> at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
> at org.reflections.Reflections.scan(Reflections.java:237)
> at org.reflections.Reflections.scan(Reflections.java:204)
> at org.reflections.Reflections.<init>(Reflections.java:129)
> at
> org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(
> AbstractHerder.java:275)
> at
> org.apache.kafka.connect.runtime.AbstractHerder$1.run(
> AbstractHerder.java:384)
> at java.lang.Thread.run(Thread.java:745)
> [2017-03-27 18:33:04,338] INFO Reflections took 5826 ms to scan 187 urls,
> producing 6162 keys and 38674 values (org.reflections.Reflections:229)
>
>
>
> After that it is not doing anything.
>
> This is my config file.
>
> name=mongodb
> connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector
> tasks.max=1
> host=localhost
> port=27017
> batch.size=100
> schema.name=mongodbschema
> topic.prefix=mongo-prefix
> databases=sampledb.hero
>
>
>
> Please do suggest me the error ASAP.
>
> Thanks
>