You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Yogesh BG (JIRA)" <ji...@apache.org> on 2017/07/03 10:26:00 UTC

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

    [ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072228#comment-16072228 ] 

Yogesh BG edited comment on KAFKA-5545 at 7/3/17 10:25 AM:
-----------------------------------------------------------

Here I found out something that if the stream got closed successfully then its able to re-establish the connection with new ip of the broker and process the data further.

But some times what happening is. Previously stream is not getting closed properly. Because some threads are trying to re-establish the connection to the old ip of broker which is not available. And keeps logging DEBUG exceptions. I have attached the debug log. In this situation stream is not processing the data further.

Here is the logic used to reestablish the connection.
close timeout is 60sec


{code:java}
private ScheduledFuture<?> setupDiscovery(final AbstractConfiguration configInstance, int refreshInterval,
			final String vipAddress, final boolean useSecurePort, final boolean useHostNames) {
		return executorService.scheduleWithFixedDelay(new Runnable() {
			@Override
			public void run() {
				try {
					List<String> bootstrapServers = getBootstrapServer(configInstance, vipAddress, useSecurePort,
							useHostNames);
					String oldBootstrapServerString = config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
					logger.info("New bootstrap servers obtained from registry server are " + bootstrapServers
							+ ", old bootstrap server are " + oldBootstrapServerString);
					boolean isChanged = checkForChangeInBootstrapServers(bootstrapServers, oldBootstrapServerString);
					if (isChanged) {
						String bootstrapServerString = bootstrapServersStr(bootstrapServers);
						config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerString);
						logger.info(
								"Closing connection to oldBootstrapServerString [" + oldBootstrapServerString + "].");
						close();
						streams = new KafkaStreams(buildTopology(config), config);
						logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString + "].");
						streams.cleanUp();
						start();
						logger.info("Completed restart of kafka streams connection to new broker with configuration "
								+ config);
					}
				} catch (Throwable ex) {
					logger.error("discovery of kafka broker instances failed with reason : " + ex.getMessage()
							+ ", will retry again", ex);
				}
			}

		}, 0, refreshInterval, TimeUnit.MINUTES);
	}

{code}
[^file:///C:/Users/appcito/Desktop/kafkastreams.log]


was (Author: yogeshbelur):
Here I found out something that if the stream got closed successfully then its able to re-establish the connection with new ip of the broker and process the data further.

But some times what happening is. Previously stream is not getting closed properly. Because some threads are trying to re-establish the connection to the old ip of broker which is not available. And keeps logging DEBUG exceptions. I have attached the debug log. In this situation stream is not processing the data further.

Here is the logic used to reestablish the connection.
close timeout is 60sec


{code:java}
private ScheduledFuture<?> setupDiscovery(final AbstractConfiguration configInstance, int refreshInterval,
			final String vipAddress, final boolean useSecurePort, final boolean useHostNames) {
		return executorService.scheduleWithFixedDelay(new Runnable() {
			@Override
			public void run() {
				try {
					List<String> bootstrapServers = getBootstrapServer(configInstance, vipAddress, useSecurePort,
							useHostNames);
					String oldBootstrapServerString = config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
					logger.info("New bootstrap servers obtained from registry server are " + bootstrapServers
							+ ", old bootstrap server are " + oldBootstrapServerString);
					boolean isChanged = checkForChangeInBootstrapServers(bootstrapServers, oldBootstrapServerString);
					if (isChanged) {
						String bootstrapServerString = bootstrapServersStr(bootstrapServers);
						config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerString);
						logger.info(
								"Closing connection to oldBootstrapServerString [" + oldBootstrapServerString + "].");
						close();
						streams = new KafkaStreams(buildTopology(config), config);
						logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString + "].");
						streams.cleanUp();
						start();
						logger.info("Completed restart of kafka streams connection to new broker with configuration "
								+ config);
					}
				} catch (Throwable ex) {
					logger.error("discovery of kafka broker instances failed with reason : " + ex.getMessage()
							+ ", will retry again", ex);
				}
			}

		}, 0, refreshInterval, TimeUnit.MINUTES);
	}

{code}


> Kafka Stream not able to successfully restart over new broker ip
> ----------------------------------------------------------------
>
>                 Key: KAFKA-5545
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5545
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Yogesh BG
>            Priority: Critical
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip changed and if changed, we cleanup the stream, rebuild topology(tried with reusing topology) and start the stream again. I end up with the following exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-38] Creating active task 0_5 with assigned partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-41] Creating active task 0_1 with assigned partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-34] Creating active task 0_7 with assigned partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-37] Creating active task 0_3 with assigned partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-45] Creating active task 0_0 with assigned partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-36] Creating active task 0_4 with assigned partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-43] Creating active task 0_6 with assigned partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-48] Creating active task 0_2 with assigned partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the state directory for task 0_5
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.035 [StreamThread-41] WARN  o.a.k.s.p.internals.StreamThread - Could not create task 0_1. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the state directory for task 0_1
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.037 [StreamThread-37] WARN  o.a.k.s.p.internals.StreamThread - Could not create task 0_3. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_3] Failed to lock the state directory for task 0_3
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-34] WARN  o.a.k.s.p.internals.StreamThread - Could not create task 0_7. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_7] Failed to lock the state directory for task 0_7
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-43] WARN  o.a.k.s.p.internals.StreamThread - Could not create task 0_6. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_6] Failed to lock the state directory for task 0_6
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-45] WARN  o.a.k.s.p.internals.StreamThread - Could not create task 0_0. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the state directory for task 0_0
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-36] WARN  o.a.k.s.p.internals.StreamThread - Could not create task 0_4. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to lock the state directory for task 0_4
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-48] WARN  o.a.k.s.p.internals.StreamThread - Could not create task 0_2. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_2] Failed to lock the state directory for task 0_2
>         at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:13.642 [StreamThread-44] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-44] Committing all tasks because the commit interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-47] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-47] Committing all tasks because the commit interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-42] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-42] Committing all tasks because the commit interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-46] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-46] Committing all tasks because the commit interval 10000ms has ela
> ]
> psed
> 11:04:13.646 [StreamThread-33] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-33] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:13.648 [StreamThread-40] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-40] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:13.655 [StreamThread-39] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-39] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:13.660 [StreamThread-35] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-35] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-42] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-42] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-46] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-46] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-47] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-47] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-44] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-44] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.671 [StreamThread-33] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-33] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.676 [StreamThread-40] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-40] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.677 [StreamThread-39] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-39] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:23.682 [StreamThread-35] INFO  o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-35] Committing all tasks because the commit interval 10000ms has ela
> psed
> 11:04:29.025 [pool-4-thread-1]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)