You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@gobblin.apache.org by Rohit Kalhans <ro...@gmail.com> on 2018/01/29 18:11:23 UTC

Query about using gobblin streaming.

*Hello, Sorry if my last similar email was delivered, resending just to be
sure.*

*We have recently started evaluating gobblin for ingestion purpose. As it
turns out, we specifically hit these road-blocks.*

*1. When using Kafka to Kafka streaming, I keep hitting this error post
which the ingestion streaming stops. *

ERROR  [22:54:35.306] [kafka-producer-network-thread | gobblin]
o.a.k.c.u.KafkaThread [KafkaThread.java:30]  -  Uncaught exception in
kafka-producer-network-thread | gobblin:
java.lang.AssertionError: The acknowledgement counter for this watermark
went negative. Please file a bug!
at org.apache.gobblin.writer.AcknowledgableWatermark.ack(
AcknowledgableWatermark.java:42)
at org.apache.gobblin.stream.StreamEntity.ack(StreamEntity.java:82)
at org.apache.gobblin.writer.AsyncWriterManager$1.
onSuccess(AsyncWriterManager.java:321)
at org.apache.gobblin.writer.AsyncWriterManager$1.
onSuccess(AsyncWriterManager.java:316)
at org.apache.gobblin.kafka.writer.Kafka09DataWriter$2.onCompletion(
Kafka09DataWriter.java:124)
at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.
java:97)
at org.apache.kafka.clients.producer.internals.Sender.
completeBatch(Sender.java:299)
at org.apache.kafka.clients.producer.internals.Sender.
handleProduceResponse(Sender.java:260)
at org.apache.kafka.clients.producer.internals.Sender.
access$100(Sender.java:56)
at org.apache.kafka.clients.producer.internals.Sender$1.
onComplete(Sender.java:342)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:748)


*Here are the key points:*
*1. Gobblin is running in the embedded mode.  It is a part of a  bigger
application.*
*2. here is the config for the job in Json format. the json is converted to
properties internally*
   "config":{
  "job.lock.enabled": "false",
  "task.executionMode": "STREAMING",
  "gobblin.streaming.kafka.topic.key.deserializer":
"org.apache.kafka.common.serialization.StringDeserializer",
  "gobblin.streaming.kafka.topic.value.deserializer":
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
  "source.class": "org.apache.gobblin.source.extractor.extract.kafka.
KafkaSimpleStreamingSource",
  "gobblin.streaming.kafka.topic.singleton": "archive",
  "kafka.brokers": "localhost:9092",
  "streaming.watermarkStateStore.type": "mysql",
  "state.store.db.url": "jdbc:mysql://localhost:3306/Saboo",
  "state.store.db.user": "saboo_root",
  "state.store.db.password": "sabootest",
  "streaming.watermarkStateStore.config.state.store.zk.connectString":
"localhost:2181",
  "streaming.watermark.commitIntervalMillis": "2000",
  "converter.classes": "org.apache.gobblin.converter.SamplingConverter",
  "converter.sample.ratio": "0.10",
  "writer.builder.class": "org.apache.gobblin.kafka.
writer.KafkaDataWriterBuilder",
  "writer.kafka.topic": "archive2",
  "writer.kafka.producerConfig.bootstrap.servers": "localhost:9092",
  "writer.kafka.producerConfig.value.serializer": "org.apache.kafka.common.
serialization.ByteArraySerializer",
  "data.publisher.type": "org.apache.gobblin.publisher.NoopPublisher"
}


*2. When using zk for state store, I keep getting this error which
terminates the job*

o.a.g.r.AbstractJobLauncher [AbstractJobLauncher.java:468]  -  Failed to
launch and run job job_kafkaStreaming_1517248081731:
java.lang.NoClassDefFoundError:
Could not initialize class org.apache.log4j.LogManager
java.lang.NoClassDefFoundError: Could not initialize class
org.apache.log4j.LogManager
at org.apache.log4j.Logger.getLogger(Logger.java:117)
at org.apache.helix.manager.zk.ZkCacheBaseDataAccessor.<clinit>(
ZkCacheBaseDataAccessor.java:50)
at org.apache.gobblin.metastore.ZkStateStore.<init>(ZkStateStore.java:89)
at org.apache.gobblin.metastore.ZkStateStoreFactory.createStateStore(
ZkStateStoreFactory.java:38)
at org.apache.gobblin.runtime.StateStoreBasedWatermarkStorage.<init>(
StateStoreBasedWatermarkStorage.java:101)
at org.apache.gobblin.runtime.TaskContext.getWatermarkStorage(
TaskContext.java:389)
at org.apache.gobblin.runtime.Task.<init>(Task.java:234)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.createTaskRunnable(
GobblinMultiTaskAttempt.java:363)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.runWorkUnits(
GobblinMultiTaskAttempt.java:344)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.run(
GobblinMultiTaskAttempt.java:134)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.
runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.java:369)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.runWorkUnits(
GobblinMultiTaskAttempt.java:391)
at org.apache.gobblin.runtime.local.LocalJobLauncher.runWorkUnitStream(
LocalJobLauncher.java:142)
at org.apache.gobblin.runtime.AbstractJobLauncher.launchJob(
AbstractJobLauncher.java:443)
at org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver$
DriverRunnable.call(JobLauncherExecutionDriver.java:159)
at org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver$
DriverRunnable.call(JobLauncherExecutionDriver.java:147)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)



-- 
Cheerio!

*Rohit*

Re: Query about using gobblin streaming.

Posted by Sudarshan Vasudevan <su...@linkedin.com>.
Hi Rohit,
Can you set the following configuration in your JobConfig file:



task.execution.synchronousExecutionModel=false

By default, the streaming mode runs with synchronous execution model which has been deprecated. While we change this, you can try your streaming job with the above config.

Thanks,
Sudarshan

From: Sudarshan Vasudevan <su...@linkedin.com>
Reply-To: "user@gobblin.incubator.apache.org" <us...@gobblin.incubator.apache.org>
Date: Monday, January 29, 2018 at 11:06 AM
To: "user@gobblin.incubator.apache.org" <us...@gobblin.incubator.apache.org>
Subject: Re: Query about using gobblin streaming.

Hi Rohit,
Thanks for bringing this up! The gobblin team is aware of this issue and is working on a fix. There was a major overhaul of the gobblin internals and I think we broke the streaming mode in the process. Please do file a JIRA for this issue, so that we can track its progress.

Thanks again,
Sudarshan

From: Rohit Kalhans <ro...@gmail.com>
Reply-To: "user@gobblin.incubator.apache.org" <us...@gobblin.incubator.apache.org>
Date: Monday, January 29, 2018 at 10:12 AM
To: "user@gobblin.incubator.apache.org" <us...@gobblin.incubator.apache.org>
Subject: Query about using gobblin streaming.

Hello, Sorry if my last similar email was delivered, resending just to be sure.

We have recently started evaluating gobblin for ingestion purpose. As it turns out, we specifically hit these road-blocks.

1. When using Kafka to Kafka streaming, I keep hitting this error post which the ingestion streaming stops.

ERROR  [22:54:35.306] [kafka-producer-network-thread | gobblin] o.a.k.c.u.KafkaThread [KafkaThread.java:30]  -  Uncaught exception in kafka-producer-network-thread | gobblin:
java.lang.AssertionError: The acknowledgement counter for this watermark went negative. Please file a bug!
at org.apache.gobblin.writer.AcknowledgableWatermark.ack(AcknowledgableWatermark.java:42)
at org.apache.gobblin.stream.StreamEntity.ack(StreamEntity.java:82)
at org.apache.gobblin.writer.AsyncWriterManager$1.onSuccess(AsyncWriterManager.java:321)
at org.apache.gobblin.writer.AsyncWriterManager$1.onSuccess(AsyncWriterManager.java:316)
at org.apache.gobblin.kafka.writer.Kafka09DataWriter$2.onCompletion(Kafka09DataWriter.java:124)
at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:97)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:299)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:260)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:56)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:342)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:748)


Here are the key points:
1. Gobblin is running in the embedded mode.  It is a part of a  bigger application.
2. here is the config for the job in Json format. the json is converted to properties internally
   "config":{
  "job.lock.enabled": "false",
  "task.executionMode": "STREAMING",
  "gobblin.streaming.kafka.topic.key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
  "gobblin.streaming.kafka.topic.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
  "source.class": "org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleStreamingSource",
  "gobblin.streaming.kafka.topic.singleton": "archive",
  "kafka.brokers": "localhost:9092",
  "streaming.watermarkStateStore.type": "mysql",
  "state.store.db.url": "jdbc:mysql://localhost:3306/Saboo",
  "state.store.db.user": "saboo_root",
  "state.store.db.password": "sabootest",
  "streaming.watermarkStateStore.config.state.store.zk.connectString": "localhost:2181",
  "streaming.watermark.commitIntervalMillis": "2000",
  "converter.classes": "org.apache.gobblin.converter.SamplingConverter",
  "converter.sample.ratio": "0.10",
  "writer.builder.class": "org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder",
  "writer.kafka.topic": "archive2",
  "writer.kafka.producerConfig.bootstrap.servers": "localhost:9092",
  "writer.kafka.producerConfig.value.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
  "data.publisher.type": "org.apache.gobblin.publisher.NoopPublisher"
}

2. When using zk for state store, I keep getting this error which terminates the job
o.a.g.r.AbstractJobLauncher [AbstractJobLauncher.java:468]  -  Failed to launch and run job job_kafkaStreaming_1517248081731: java.lang.NoClassDefFoundError: Could not initialize class org.apache.log4j.LogManager
java.lang.NoClassDefFoundError: Could not initialize class org.apache.log4j.LogManager
at org.apache.log4j.Logger.getLogger(Logger.java:117)
at org.apache.helix.manager.zk.ZkCacheBaseDataAccessor.<clinit>(ZkCacheBaseDataAccessor.java:50)
at org.apache.gobblin.metastore.ZkStateStore.<init>(ZkStateStore.java:89)
at org.apache.gobblin.metastore.ZkStateStoreFactory.createStateStore(ZkStateStoreFactory.java:38)
at org.apache.gobblin.runtime.StateStoreBasedWatermarkStorage.<init>(StateStoreBasedWatermarkStorage.java:101)
at org.apache.gobblin.runtime.TaskContext.getWatermarkStorage(TaskContext.java:389)
at org.apache.gobblin.runtime.Task.<init>(Task.java:234)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.createTaskRunnable(GobblinMultiTaskAttempt.java:363)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.runWorkUnits(GobblinMultiTaskAttempt.java:344)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.run(GobblinMultiTaskAttempt.java:134)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.java:369)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.runWorkUnits(GobblinMultiTaskAttempt.java:391)
at org.apache.gobblin.runtime.local.LocalJobLauncher.runWorkUnitStream(LocalJobLauncher.java:142)
at org.apache.gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:443)
at org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver$DriverRunnable.call(JobLauncherExecutionDriver.java:159)
at org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver$DriverRunnable.call(JobLauncherExecutionDriver.java:147)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)


--
Cheerio!

Rohit

Re: Query about using gobblin streaming.

Posted by Sudarshan Vasudevan <su...@linkedin.com>.
Hi Rohit,
Thanks for bringing this up! The gobblin team is aware of this issue and is working on a fix. There was a major overhaul of the gobblin internals and I think we broke the streaming mode in the process. Please do file a JIRA for this issue, so that we can track its progress.

Thanks again,
Sudarshan

From: Rohit Kalhans <ro...@gmail.com>
Reply-To: "user@gobblin.incubator.apache.org" <us...@gobblin.incubator.apache.org>
Date: Monday, January 29, 2018 at 10:12 AM
To: "user@gobblin.incubator.apache.org" <us...@gobblin.incubator.apache.org>
Subject: Query about using gobblin streaming.

Hello, Sorry if my last similar email was delivered, resending just to be sure.

We have recently started evaluating gobblin for ingestion purpose. As it turns out, we specifically hit these road-blocks.

1. When using Kafka to Kafka streaming, I keep hitting this error post which the ingestion streaming stops.

ERROR  [22:54:35.306] [kafka-producer-network-thread | gobblin] o.a.k.c.u.KafkaThread [KafkaThread.java:30]  -  Uncaught exception in kafka-producer-network-thread | gobblin:
java.lang.AssertionError: The acknowledgement counter for this watermark went negative. Please file a bug!
at org.apache.gobblin.writer.AcknowledgableWatermark.ack(AcknowledgableWatermark.java:42)
at org.apache.gobblin.stream.StreamEntity.ack(StreamEntity.java:82)
at org.apache.gobblin.writer.AsyncWriterManager$1.onSuccess(AsyncWriterManager.java:321)
at org.apache.gobblin.writer.AsyncWriterManager$1.onSuccess(AsyncWriterManager.java:316)
at org.apache.gobblin.kafka.writer.Kafka09DataWriter$2.onCompletion(Kafka09DataWriter.java:124)
at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:97)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:299)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:260)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:56)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:342)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:748)


Here are the key points:
1. Gobblin is running in the embedded mode.  It is a part of a  bigger application.
2. here is the config for the job in Json format. the json is converted to properties internally
   "config":{
  "job.lock.enabled": "false",
  "task.executionMode": "STREAMING",
  "gobblin.streaming.kafka.topic.key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
  "gobblin.streaming.kafka.topic.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
  "source.class": "org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleStreamingSource",
  "gobblin.streaming.kafka.topic.singleton": "archive",
  "kafka.brokers": "localhost:9092",
  "streaming.watermarkStateStore.type": "mysql",
  "state.store.db.url": "jdbc:mysql://localhost:3306/Saboo",
  "state.store.db.user": "saboo_root",
  "state.store.db.password": "sabootest",
  "streaming.watermarkStateStore.config.state.store.zk.connectString": "localhost:2181",
  "streaming.watermark.commitIntervalMillis": "2000",
  "converter.classes": "org.apache.gobblin.converter.SamplingConverter",
  "converter.sample.ratio": "0.10",
  "writer.builder.class": "org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder",
  "writer.kafka.topic": "archive2",
  "writer.kafka.producerConfig.bootstrap.servers": "localhost:9092",
  "writer.kafka.producerConfig.value.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
  "data.publisher.type": "org.apache.gobblin.publisher.NoopPublisher"
}

2. When using zk for state store, I keep getting this error which terminates the job
o.a.g.r.AbstractJobLauncher [AbstractJobLauncher.java:468]  -  Failed to launch and run job job_kafkaStreaming_1517248081731: java.lang.NoClassDefFoundError: Could not initialize class org.apache.log4j.LogManager
java.lang.NoClassDefFoundError: Could not initialize class org.apache.log4j.LogManager
at org.apache.log4j.Logger.getLogger(Logger.java:117)
at org.apache.helix.manager.zk.ZkCacheBaseDataAccessor.<clinit>(ZkCacheBaseDataAccessor.java:50)
at org.apache.gobblin.metastore.ZkStateStore.<init>(ZkStateStore.java:89)
at org.apache.gobblin.metastore.ZkStateStoreFactory.createStateStore(ZkStateStoreFactory.java:38)
at org.apache.gobblin.runtime.StateStoreBasedWatermarkStorage.<init>(StateStoreBasedWatermarkStorage.java:101)
at org.apache.gobblin.runtime.TaskContext.getWatermarkStorage(TaskContext.java:389)
at org.apache.gobblin.runtime.Task.<init>(Task.java:234)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.createTaskRunnable(GobblinMultiTaskAttempt.java:363)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.runWorkUnits(GobblinMultiTaskAttempt.java:344)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.run(GobblinMultiTaskAttempt.java:134)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.java:369)
at org.apache.gobblin.runtime.GobblinMultiTaskAttempt.runWorkUnits(GobblinMultiTaskAttempt.java:391)
at org.apache.gobblin.runtime.local.LocalJobLauncher.runWorkUnitStream(LocalJobLauncher.java:142)
at org.apache.gobblin.runtime.AbstractJobLauncher.launchJob(AbstractJobLauncher.java:443)
at org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver$DriverRunnable.call(JobLauncherExecutionDriver.java:159)
at org.apache.gobblin.runtime.job_exec.JobLauncherExecutionDriver$DriverRunnable.call(JobLauncherExecutionDriver.java:147)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)


--
Cheerio!

Rohit