You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Thunder Stumpges <ts...@ntent.com> on 2018/07/06 18:43:43 UTC

Samza 0.14.1 : OffsetOutOfRangeException even with auto.offset.reset=smallest

Hi all,


We've just run into a strange problem with samza 0.14.1. We had a job down for a bit, while kafka cleaned past our saved offsets. When starting the job now, we get repeated org.apache.kafka.common.errors.OffsetOutOfRangeException. And it just retries over and over again. We HAVE set

systems.kafka.consumer.auto.offset.reset=smallest as well. Has anyone else seen this? Our understanding from the documentation is that this setting says what to do if the offset is out of range.



systems.system-name.consumer.auto.offset.reset : This setting determines what happens if a consumer attempts to read an offset that is outside of the current valid range. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message history retained by the brokers.



This is the set of messages that keeps repeating:



2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Verifying properties

2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property client.id is overridden to samza_consumer-stg_apollo_crawler_stream_task-1

2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to kafka-server.ntent.com:9092

2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000

2018-07-06 18:32:15 INFO  kafka.client.ClientUtils$ - Fetching metadata from broker BrokerEndPoint(0,kafka-server,9092) with correlation id 12 for 1 topic(s) Set(my-topic)

2018-07-06 18:32:15 INFO  kafka.producer.SyncProducer - Connected to kafka-server:9092 for producing

2018-07-06 18:32:15 INFO  kafka.producer.SyncProducer - Disconnecting from kafka-server:9092

2018-07-06 18:32:15 INFO  o.a.samza.system.kafka.GetOffset - Validating offset 6883929 for topic and partition [my-topic,10]

2018-07-06 18:32:15 WARN  o.a.s.s.kafka.KafkaSystemConsumer - While refreshing brokers for [my-topic,10]: org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.. Retrying.



Thanks!

Thunder



RE: Samza 0.14.1 : OffsetOutOfRangeException even with auto.offset.reset=smallest

Posted by Thunder Stumpges <ts...@ntent.com>.
Hi, 

Unfortunately we already did reset our offsets in order to continue our processing, so I'm unable to reproduce this again right now.

Here is log output with Debug level for KafkaSystemConsumer. You'll notice there is no stacktrace even in the debug line that should have it. Just an echo of the same message about the OffsetOutOfRange exception.

Thanks,
Thunder


2018-07-06 18:57:01 logback 5545 [main] INFO  o.a.samza.checkpoint.OffsetManager - Checkpointed offset is currently 7598450 for SystemStreamPartition [kafka, my-topic-backfill, 3]
2018-07-06 18:57:01 logback 5545 [main] INFO  o.a.samza.checkpoint.OffsetManager - Checkpointed offset is currently 671804914 for SystemStreamPartition [kafka, my-topic-new, 3]
2018-07-06 18:57:01 logback 5545 [main] INFO  o.a.samza.checkpoint.OffsetManager - Checkpointed offset is currently 663279798 for SystemStreamPartition [kafka, my-topic-new, 9]
2018-07-06 18:57:01 logback 5545 [main] INFO  o.a.samza.checkpoint.OffsetManager - Checkpointed offset is currently 7598982 for SystemStreamPartition [kafka, my-topic-backfill, 9]
2018-07-06 18:57:01 logback 5545 [main] INFO  o.a.samza.checkpoint.OffsetManager - Checkpointed offset is currently 645599258 for SystemStreamPartition [kafka, my-topic-new, 1]
2018-07-06 18:57:01 logback 5545 [main] INFO  o.a.samza.checkpoint.OffsetManager - Checkpointed offset is currently 7595444 for SystemStreamPartition [kafka, my-topic-backfill, 1]
2018-07-06 18:57:01 logback 5545 [main] INFO  o.a.samza.checkpoint.OffsetManager - Checkpointed offset is currently 7596127 for SystemStreamPartition [kafka, my-topic-backfill, 7]
2018-07-06 18:57:01 logback 5546 [main] INFO  o.a.samza.checkpoint.OffsetManager - Checkpointed offset is currently 671082000 for SystemStreamPartition [kafka, my-topic-new, 7]
2018-07-06 18:57:01 logback 5546 [main] INFO  o.a.samza.checkpoint.OffsetManager - Checkpointed offset is currently 7596438 for SystemStreamPartition [kafka, my-topic-backfill, 5]
2018-07-06 18:57:01 logback 5546 [main] INFO  o.a.samza.checkpoint.OffsetManager - Checkpointed offset is currently 675851134 for SystemStreamPartition [kafka, my-topic-new, 5]
2018-07-06 18:57:01 logback 5546 [main] INFO  o.a.samza.checkpoint.OffsetManager - Checkpointed offset is currently 7595266 for SystemStreamPartition [kafka, my-topic-backfill, 11]
2018-07-06 18:57:01 logback 5559 [main] INFO  o.a.samza.checkpoint.OffsetManager - Successfully loaded last processed offsets: {Partition 1={SystemStreamPartition [kafka, my-topic-new, 1]=645599258, SystemStreamPartition [kafka, my-topic-backfill, 1]=7595444}, Partition 3={SystemStreamPartition [kafka, my-topic-new, 3]=671804914, SystemStreamPartition [kafka, my-topic-backfill, 3]=7598450}, Partition 5={SystemStreamPartition [kafka, my-topic-backfill, 5]=7596438, SystemStreamPartition [kafka, my-topic-new, 5]=675851134}, Partition 7={SystemStreamPartition [kafka, my-topic-backfill, 7]=7596127, SystemStreamPartition [kafka, my-topic-new, 7]=671082000}, Partition 9={SystemStreamPartition [kafka, my-topic-new, 9]=663279798, SystemStreamPartition [kafka, my-topic-backfill, 9]=7598982}, Partition 11={SystemStreamPartition [kafka, my-topic-backfill, 11]=7595266}}
2018-07-06 18:57:01 logback 5561 [main] INFO  o.a.samza.checkpoint.OffsetManager - Successfully loaded starting offsets: Map(Partition 9 -> Map(SystemStreamPartition [kafka, my-topic-backfill, 9] -> 7598983, SystemStreamPartition [kafka, my-topic-new, 9] -> 663279799), Partition 3 -> Map(SystemStreamPartition [kafka, my-topic-backfill, 3] -> 7598451, SystemStreamPartition [kafka, my-topic-new, 3] -> 671804915), Partition 5 -> Map(SystemStreamPartition [kafka, my-topic-backfill, 5] -> 7596439, SystemStreamPartition [kafka, my-topic-new, 5] -> 675851135), Partition 7 -> Map(SystemStreamPartition [kafka, my-topic-backfill, 7] -> 7596128, SystemStreamPartition [kafka, my-topic-new, 7] -> 671082001), Partition 1 -> Map(SystemStreamPartition [kafka, my-topic-backfill, 1] -> 7595445, SystemStreamPartition [kafka, my-topic-new, 1] -> 645599259), Partition 11 -> Map(SystemStreamPartition [kafka, my-topic-backfill, 11] -> 7595267))
2018-07-06 18:57:01 logback 5562 [main] INFO  o.a.samza.container.SamzaContainer - Starting stores in task instance Partition 3
2018-07-06 18:57:01 logback 5566 [main] INFO  o.a.s.storage.TaskStorageManager - Validating change log streams: Map()
2018-07-06 18:57:01 logback 5568 [main] INFO  o.a.s.storage.TaskStorageManager - Got change log stream metadata: Map()
2018-07-06 18:57:01 logback 5571 [main] INFO  o.a.s.storage.TaskStorageManager - Assigning oldest change log offsets for taskName Partition 3: Map()
2018-07-06 18:57:01 logback 5577 [main] INFO  o.a.samza.container.SamzaContainer - Starting stores in task instance Partition 9
2018-07-06 18:57:01 logback 5577 [main] INFO  o.a.s.storage.TaskStorageManager - Validating change log streams: Map()
2018-07-06 18:57:01 logback 5578 [main] INFO  o.a.s.storage.TaskStorageManager - Got change log stream metadata: Map()
2018-07-06 18:57:01 logback 5578 [main] INFO  o.a.s.storage.TaskStorageManager - Assigning oldest change log offsets for taskName Partition 9: Map()
2018-07-06 18:57:01 logback 5578 [main] INFO  o.a.samza.container.SamzaContainer - Starting stores in task instance Partition 1
2018-07-06 18:57:01 logback 5578 [main] INFO  o.a.s.storage.TaskStorageManager - Validating change log streams: Map()
2018-07-06 18:57:01 logback 5578 [main] INFO  o.a.s.storage.TaskStorageManager - Got change log stream metadata: Map()
2018-07-06 18:57:01 logback 5578 [main] INFO  o.a.s.storage.TaskStorageManager - Assigning oldest change log offsets for taskName Partition 1: Map()
2018-07-06 18:57:01 logback 5578 [main] INFO  o.a.samza.container.SamzaContainer - Starting stores in task instance Partition 7
2018-07-06 18:57:01 logback 5578 [main] INFO  o.a.s.storage.TaskStorageManager - Validating change log streams: Map()
2018-07-06 18:57:01 logback 5579 [main] INFO  o.a.s.storage.TaskStorageManager - Got change log stream metadata: Map()
2018-07-06 18:57:01 logback 5579 [main] INFO  o.a.s.storage.TaskStorageManager - Assigning oldest change log offsets for taskName Partition 7: Map()
2018-07-06 18:57:01 logback 5579 [main] INFO  o.a.samza.container.SamzaContainer - Starting stores in task instance Partition 5
2018-07-06 18:57:01 logback 5579 [main] INFO  o.a.s.storage.TaskStorageManager - Validating change log streams: Map()
2018-07-06 18:57:01 logback 5579 [main] INFO  o.a.s.storage.TaskStorageManager - Got change log stream metadata: Map()
2018-07-06 18:57:01 logback 5579 [main] INFO  o.a.s.storage.TaskStorageManager - Assigning oldest change log offsets for taskName Partition 5: Map()
2018-07-06 18:57:01 logback 5579 [main] INFO  o.a.samza.container.SamzaContainer - Starting stores in task instance Partition 11
2018-07-06 18:57:01 logback 5579 [main] INFO  o.a.s.storage.TaskStorageManager - Validating change log streams: Map()
2018-07-06 18:57:01 logback 5580 [main] INFO  o.a.s.storage.TaskStorageManager - Got change log stream metadata: Map()
2018-07-06 18:57:01 logback 5580 [main] INFO  o.a.s.storage.TaskStorageManager - Assigning oldest change log offsets for taskName Partition 11: Map()
2018-07-06 18:57:01 logback 5581 [main] INFO  o.a.samza.container.SamzaContainer - Starting table manager in task instance Partition 3
2018-07-06 18:57:01 logback 5584 [main] INFO  o.a.samza.container.SamzaContainer - Starting table manager in task instance Partition 9
2018-07-06 18:57:01 logback 5584 [main] INFO  o.a.samza.container.SamzaContainer - Starting table manager in task instance Partition 1
2018-07-06 18:57:01 logback 5584 [main] INFO  o.a.samza.container.SamzaContainer - Starting table manager in task instance Partition 7
2018-07-06 18:57:01 logback 5584 [main] INFO  o.a.samza.container.SamzaContainer - Starting table manager in task instance Partition 5
2018-07-06 18:57:01 logback 5584 [main] INFO  o.a.samza.container.SamzaContainer - Starting table manager in task instance Partition 11
2018-07-06 18:57:01 logback 5585 [main] INFO  o.a.samza.container.SamzaContainer - Starting host statistics monitor
2018-07-06 18:57:01 logback 5587 [main] INFO  o.a.samza.container.SamzaContainer - Registering task instances with producers.
2018-07-06 18:57:01 logback 5592 [main] INFO  o.a.samza.container.SamzaContainer - Starting producer multiplexer.
...
2018-07-06 18:57:01 logback 5607 [main] INFO  o.a.k.common.utils.AppInfoParser - Kafka version : 0.11.0.2
2018-07-06 18:57:01 logback 5607 [main] INFO  o.a.k.common.utils.AppInfoParser - Kafka commitId : 73be1e1168f91ee2
2018-07-06 18:57:01 logback 5607 [main] WARN  o.a.k.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=samza_producer-apollo_crawler_stream_task-1
        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
        at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
        at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:410)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:263)
        at org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.apply(KafkaSystemFactory.scala:91)
        at org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.apply(KafkaSystemFactory.scala:91)
        at org.apache.samza.system.kafka.KafkaSystemProducer.start(KafkaSystemProducer.scala:53)
        at org.apache.samza.system.SystemProducers$$anonfun$start$2.apply(SystemProducers.scala:41)
        at org.apache.samza.system.SystemProducers$$anonfun$start$2.apply(SystemProducers.scala:41)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at org.apache.samza.system.SystemProducers.start(SystemProducers.scala:41)
        at org.apache.samza.container.SamzaContainer.startProducers(SamzaContainer.scala:952)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:732)
        at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:101)
        at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:148)
2018-07-06 18:57:01 logback 5608 [main] INFO  o.a.samza.container.SamzaContainer - Initializing stream tasks.
2018-07-06 18:57:01 logback 5610 [main] INFO  o.a.samza.container.SamzaContainer - Registering task instances with consumers.
2018-07-06 18:57:01 logback 5631 [main] INFO  o.a.samza.container.SamzaContainer - Starting consumer multiplexer.
2018-07-06 18:57:01 logback 5637 [main] INFO  o.a.s.s.kafka.KafkaSystemConsumer - Refreshing brokers for: Map([my-topic-backfill,9] -> 7598983, [my-topic-backfill,3] -> 7598451, [my-topic-backfill,7] -> 7596128, [my-topic-backfill,1] -> 7595445, [my-topic-backfill,5] -> 7596439, [my-topic-backfill,11] -> 7595267)
2018-07-06 18:57:01 logback 5638 [main] INFO  o.a.samza.system.kafka.BrokerProxy - Creating new SimpleConsumer for host kafka-server.lv.ntent.com:9092 for system kafka
2018-07-06 18:57:01 logback 5640 [main] INFO  o.a.samza.system.kafka.GetOffset - Validating offset 7598983 for topic and partition [my-topic-backfill,9]
2018-07-06 18:57:01 logback 5644 [main] WARN  o.a.s.s.kafka.KafkaSystemConsumer - While refreshing brokers for [my-topic-backfill,9]: org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.. Retrying.
2018-07-06 18:57:01 logback 5646 [main] DEBUG o.a.s.s.kafka.KafkaSystemConsumer - Exception detail:
org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.
2018-07-06 18:57:02 logback 5746 [main] INFO  o.a.samza.system.kafka.GetOffset - Validating offset 7598983 for topic and partition [my-topic-backfill,9]
2018-07-06 18:57:02 logback 5747 [main] WARN  o.a.s.s.kafka.KafkaSystemConsumer - While refreshing brokers for [my-topic-backfill,9]: org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.. Retrying.
2018-07-06 18:57:02 logback 5747 [main] DEBUG o.a.s.s.kafka.KafkaSystemConsumer - Exception detail:
org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.
2018-07-06 18:57:02 logback 5948 [main] INFO  o.a.samza.system.kafka.GetOffset - Validating offset 7598983 for topic and partition [my-topic-backfill,9]
2018-07-06 18:57:02 logback 5949 [main] WARN  o.a.s.s.kafka.KafkaSystemConsumer - While refreshing brokers for [my-topic-backfill,9]: org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.. Retrying.
...

-----Original Message-----
From: Prateek Maheshwari [mailto:prateekmi2@gmail.com] 
Sent: Monday, July 9, 2018 9:39
To: dev@samza.apache.org
Subject: Re: Samza 0.14.1 : OffsetOutOfRangeException even with auto.offset.reset=smallest

Hi Thunder,

Can you provide debug level logs from KafkaSystemConsumer with the stack trace for the exception? It'll help figure out why the auto.offset.reset property isn't taking effect.

If this error is due to an older checkpoint for the stream, you can try resetting the checkpoint using the following two configurations:
streams.stream-id.samza.reset.offset: If set to true, when a Samza container starts up, it ignores any checkpointed offset for this particular input stream. Its behavior is thus determined by the samza.offset.default setting. Note that the reset takes effect every time a container is started, which may be every time you restart your job, or more frequently if a container fails and is restarted by the framework.

streams.stream-id.samza.offset.default: If a container starts up without a checkpoint, this property determines where in the input stream we should start consuming. The value must be an OffsetType, one of the following:
  upcoming: Start processing messages that are published after the job starts. Any messages published while the job was not running are not processed.
  oldest: Start processing at the oldest available message in the system, and reprocess the entire available message history.

I.e., set 'samza.reset.offset' = true, and 'samza.offset.default' = oldest for your stream. Let us know if this doesn't help.

Thanks,
Prateek

On Fri, Jul 6, 2018 at 11:43 AM, Thunder Stumpges <ts...@ntent.com> wrote:
> Hi all,
>
>
> We've just run into a strange problem with samza 0.14.1. We had a job 
> down for a bit, while kafka cleaned past our saved offsets. When 
> starting the job now, we get repeated 
> org.apache.kafka.common.errors.OffsetOutOfRangeException. And it just 
> retries over and over again. We HAVE set
>
> systems.kafka.consumer.auto.offset.reset=smallest as well. Has anyone else seen this? Our understanding from the documentation is that this setting says what to do if the offset is out of range.
>
>
>
> systems.system-name.consumer.auto.offset.reset : This setting determines what happens if a consumer attempts to read an offset that is outside of the current valid range. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message history retained by the brokers.
>
>
>
> This is the set of messages that keeps repeating:
>
>
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Verifying 
> properties
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property 
> client.id is overridden to 
> samza_consumer-stg_apollo_crawler_stream_task-1
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property 
> metadata.broker.list is overridden to kafka-server.ntent.com:9092
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property 
> request.timeout.ms is overridden to 30000
>
> 2018-07-06 18:32:15 INFO  kafka.client.ClientUtils$ - Fetching 
> metadata from broker BrokerEndPoint(0,kafka-server,9092) with 
> correlation id 12 for 1 topic(s) Set(my-topic)
>
> 2018-07-06 18:32:15 INFO  kafka.producer.SyncProducer - Connected to 
> kafka-server:9092 for producing
>
> 2018-07-06 18:32:15 INFO  kafka.producer.SyncProducer - Disconnecting 
> from kafka-server:9092
>
> 2018-07-06 18:32:15 INFO  o.a.samza.system.kafka.GetOffset - 
> Validating offset 6883929 for topic and partition [my-topic,10]
>
> 2018-07-06 18:32:15 WARN  o.a.s.s.kafka.KafkaSystemConsumer - While refreshing brokers for [my-topic,10]: org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.. Retrying.
>
>
>
> Thanks!
>
> Thunder
>
>

Re: Samza 0.14.1 : OffsetOutOfRangeException even with auto.offset.reset=smallest

Posted by Prateek Maheshwari <pr...@gmail.com>.
Hi Thunder,

Can you provide debug level logs from KafkaSystemConsumer with the
stack trace for the exception? It'll help figure out why the
auto.offset.reset property isn't taking effect.

If this error is due to an older checkpoint for the stream, you can
try resetting the checkpoint using the following two configurations:
streams.stream-id.samza.reset.offset: If set to true, when a Samza
container starts up, it ignores any checkpointed offset for this
particular input stream. Its behavior is thus determined by the
samza.offset.default setting. Note that the reset takes effect every
time a container is started, which may be every time you restart your
job, or more frequently if a container fails and is restarted by the
framework.

streams.stream-id.samza.offset.default: If a container starts up
without a checkpoint, this property determines where in the input
stream we should start consuming. The value must be an OffsetType, one
of the following:
  upcoming: Start processing messages that are published after the job
starts. Any messages published while the job was not running are not
processed.
  oldest: Start processing at the oldest available message in the
system, and reprocess the entire available message history.

I.e., set 'samza.reset.offset' = true, and 'samza.offset.default' =
oldest for your stream. Let us know if this doesn't help.

Thanks,
Prateek

On Fri, Jul 6, 2018 at 11:43 AM, Thunder Stumpges <ts...@ntent.com> wrote:
> Hi all,
>
>
> We've just run into a strange problem with samza 0.14.1. We had a job down for a bit, while kafka cleaned past our saved offsets. When starting the job now, we get repeated org.apache.kafka.common.errors.OffsetOutOfRangeException. And it just retries over and over again. We HAVE set
>
> systems.kafka.consumer.auto.offset.reset=smallest as well. Has anyone else seen this? Our understanding from the documentation is that this setting says what to do if the offset is out of range.
>
>
>
> systems.system-name.consumer.auto.offset.reset : This setting determines what happens if a consumer attempts to read an offset that is outside of the current valid range. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message history retained by the brokers.
>
>
>
> This is the set of messages that keeps repeating:
>
>
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Verifying properties
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property client.id is overridden to samza_consumer-stg_apollo_crawler_stream_task-1
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to kafka-server.ntent.com:9092
>
> 2018-07-06 18:32:15 INFO  kafka.utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
>
> 2018-07-06 18:32:15 INFO  kafka.client.ClientUtils$ - Fetching metadata from broker BrokerEndPoint(0,kafka-server,9092) with correlation id 12 for 1 topic(s) Set(my-topic)
>
> 2018-07-06 18:32:15 INFO  kafka.producer.SyncProducer - Connected to kafka-server:9092 for producing
>
> 2018-07-06 18:32:15 INFO  kafka.producer.SyncProducer - Disconnecting from kafka-server:9092
>
> 2018-07-06 18:32:15 INFO  o.a.samza.system.kafka.GetOffset - Validating offset 6883929 for topic and partition [my-topic,10]
>
> 2018-07-06 18:32:15 WARN  o.a.s.s.kafka.KafkaSystemConsumer - While refreshing brokers for [my-topic,10]: org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.. Retrying.
>
>
>
> Thanks!
>
> Thunder
>
>