You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/12/02 09:44:37 UTC
[GitHub] [pulsar] 180254 opened a new issue #8783: non-persistent topic unsubscribing hangs and makes topic unusable
180254 opened a new issue #8783:
URL: https://github.com/apache/pulsar/issues/8783
**Describe the bug**
non-persistent topic doesn't allow new subscriptions to be created, after removing some subscription due to expiration.
This has happend several times over the past weeks.
Restarting affected broker fixes the issue.
**To Reproduce**
We have non-persistent topic configured as follows:
```
bin/pulsar-admin namespaces create ad200we/batch_frontnotifier
bin/pulsar-admin namespaces set-message-ttl -ttl 60 ad200we/batch_frontnotifier
bin/pulsar-admin namespaces set-inactive-topic-policies --disable-delete-while-inactive --delete-mode delete_when_no_subscriptions --max-inactive-duration 30m ad200we/batch_frontnotifier
bin/pulsar-admin namespaces set-subscription-expiration-time --time 30 ad200we/batch_frontnotifier
```
Issue timeline:
1. Topic was working correctly.
2. We see the following pulsar logs:
```
19:05:04.518 pulsar1-dev pulsar-broker {"logLevel":"INFO","logThread":"pulsar-inactivity-monitor-25-1","logger":"org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription","message":"[non-persistent://ad200we/batch_frontnotifier/pulsar1_batch_completed][pulsar15f47f2b3-fef2-4ca9-968e-17c4b685ff0f] Unsubscribing","stack_trace":null}
19:05:04.518 pulsar1-dev pulsar-broker {"logLevel":"INFO","logThread":"pulsar-inactivity-monitor-25-1","logger":"org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription","message":"[non-persistent://ad200we/batch_frontnotifier/pulsar1_batch_completed][pulsar15f47f2b3-fef2-4ca9-968e-17c4b685ff0f] Successfully deleted subscription","stack_trace":null}
19:05:04.518 pulsar1-dev pulsar-broker {"logLevel":"INFO","logThread":"pulsar-inactivity-monitor-25-1","logger":"org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic","message":"[non-persistent://ad200we/batch_frontnotifier/pulsar1_batch_completed][pulsar15f47f2b3-fef2-4ca9-968e-17c4b685ff0f] The subscription was deleted due to expiration","stack_trace":null}
19:05:04.519 pulsar1-dev pulsar-broker {"logLevel":"INFO","logThread":"pulsar-inactivity-monitor-25-1","logger":"org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription","message":"[non-persistent://ad200we/batch_frontnotifier/pulsar1_batch_completed][pulsar10215b463-e4d7-46f5-b586-a131e3217e29] Unsubscribing","stack_trace":null}
```
3. After the unsubscribing there is no following log with success confirmation.
4. After that new subscription to the topic cannot be created. We see the following client logs:
```
{"@timestamp":"2020-12-02T08:05:43.608Z","logLevel":"WARN","pid":"6","logThread":"pulsar-client-io-1-1","logger":"o.apache.pulsar.client.impl.ConsumerImpl","message":"[non-persistent://ad200we/batch_frontnotifier/pulsar1_batch_completed][pulsar1ad53b6cf-ecaa-48b9-833e-a1d7d0fcf872] Failed to subscribe to topic on pulsar1-westeurope.dev.example.com/20.x.x.40x6651"}
{"@timestamp":"2020-12-02T08:05:43.610Z","logLevel":"ERROR","pid":"6","logThread":"pulsar-client-io-1-1","logger":"c.t.l.b.p.PulsarProducerConsumerFactory","message":"Cannot create consumer for topic: pulsar1_batch_completed","stack_trace":"org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 5 request timedout after ms 30000
... 11 common frames omitted
Wrapped by: java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 5 request timedout after ms 30000
```
**Expected behavior**
1. It is possible to create new subscription.
2. We see expiration logs, but we think there were no unactive subscriptions at this point.
3. Subscription operation should not hang.
4. Despite efforts, we cannot locate the problem in the code. Maybe fence operation should have some timeout.
**Desktop (please complete the following information):**
- Pulsar version: 2.6.2
- OS: Azure AKS service (Ubuntu 18)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] mkozioro commented on issue #8783: non-persistent topic unsubscribing hangs and makes topic unusable
Posted by GitBox <gi...@apache.org>.
mkozioro commented on issue #8783:
URL: https://github.com/apache/pulsar/issues/8783#issuecomment-737411170
We've added some logging to NonPersistentTopic.java and rebuilt docker image:
```
log.debug("[{}] [{}] [{}] Creating NonPersistentSubscription", topic, subscriptionName, consumerName);
NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName));
log.debug("[{}] [{}] [{}] Created NonPersistentSubscription", topic, subscriptionName, consumerName);
```
Now we see in logs that subscribing hangs on:
```
19:10:53.951 pulsar0-dev pulsar-broker {"logLevel":"DEBUG","logThread":"Thread-73","logger":"org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic","message":"[non-persistent://ad101we/batch_frontnotifier/pulsar0_batch_completed] [pulsar034c8c302-c07f-4ce8-bfe3-43399e2c006e] [fbba3] Creating NonPersistentSubscription","stack_trace":null}
```
And there is no ...Created NonPersistentSubscription ...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] mkozioro commented on issue #8783: non-persistent topic unsubscribing hangs and makes topic unusable
Posted by GitBox <gi...@apache.org>.
mkozioro commented on issue #8783:
URL: https://github.com/apache/pulsar/issues/8783#issuecomment-737866196
Hi,
In NonPersistentTopic there is checkInactiveSubscriptions method that is going through subscriptions using forEach method:
```
...
subscriptions.forEach((subName, sub) -> {
if (sub.getDispatcher() != null && sub.getDispatcher().isConsumerConnected() || sub.isReplicated()) {
return;
}
if (System.currentTimeMillis() - sub.getLastActive() > expirationTimeMillis) {
sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration",
topic, subName));
}
});
...
```
NonPersistentSubscription is removed by unsubscribing from NonPersistentTopic:
```
topic.unsubscribe(subName)
```
```
@Override
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
log.debug("[{}][{}] Unsubscribing subscription from topic", getName(), subscriptionName);
subscriptions.remove(subscriptionName);
log.debug("[{}][{}] Unsubscribed subscription from topic", getName(), subscriptionName);
return CompletableFuture.completedFuture(null);
}
```
This is causing locking failure in ConcurrentOpenHashMap.
I've made simple code that makes similar locking failure:
```
@Test
public void foreachAndRemoveTest() throws InterruptedException {
ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(16, 1);
ExecutorService executor = Executors.newCachedThreadPool();
final int nThreads = 2;
Random random = new Random();
for (int i = 0; i < nThreads; i++) {
final int threadIdx = i;
executor.submit(() -> {
List<Long> keys = LongStream.range(threadIdx * 1000, threadIdx * 1000 + 50).boxed().collect(Collectors.toList());
System.out.println(Thread.currentThread().getName() + " Before adding elements");
keys.forEach(aLong -> map.put(aLong, String.valueOf(aLong)));
System.out.println(Thread.currentThread().getName() + " After adding elements");
while (map.size() > 0) {
long key = keys.get(random.nextInt(keys.size()));
System.out.println(Thread.currentThread().getName() + " Before foreach");
map.forEach((aLong, s) -> {
System.out.println(Thread.currentThread().getName() + " Got foreach element key: " + aLong);
if (aLong.equals(key)) {
System.out.println(Thread.currentThread().getName() + " Before removing " + key);
map.remove(key);
System.out.println(Thread.currentThread().getName() + " Removed " + key);
}
});
}
System.out.println(Thread.currentThread().getName() + " Finished");
});
}
executor.awaitTermination(500, TimeUnit.SECONDS);
}
```
Result is similar like in broker:
```
pool-1-thread-2 Before adding elements
pool-1-thread-1 Before adding elements
pool-1-thread-2 After adding elements
pool-1-thread-1 After adding elements
pool-1-thread-2 Before foreach
pool-1-thread-1 Before foreach
pool-1-thread-1 Got foreach element key: 0
pool-1-thread-2 Got foreach element key: 0
pool-1-thread-1 Got foreach element key: 1047
pool-1-thread-2 Got foreach element key: 1047
pool-1-thread-1 Got foreach element key: 1049
pool-1-thread-1 Got foreach element key: 25
pool-1-thread-2 Got foreach element key: 1049
pool-1-thread-1 Got foreach element key: 1
pool-1-thread-2 Got foreach element key: 25
pool-1-thread-1 Got foreach element key: 49
pool-1-thread-2 Got foreach element key: 1
pool-1-thread-1 Got foreach element key: 16
pool-1-thread-2 Got foreach element key: 49
pool-1-thread-1 Got foreach element key: 1040
pool-1-thread-2 Got foreach element key: 16
pool-1-thread-1 Got foreach element key: 2
pool-1-thread-2 Got foreach element key: 1040
pool-1-thread-1 Got foreach element key: 29
pool-1-thread-2 Got foreach element key: 2
pool-1-thread-1 Got foreach element key: 18
pool-1-thread-2 Got foreach element key: 29
pool-1-thread-1 Got foreach element key: 1034
pool-1-thread-2 Got foreach element key: 18
pool-1-thread-1 Got foreach element key: 1007
pool-1-thread-2 Got foreach element key: 1034
pool-1-thread-1 Got foreach element key: 46
pool-1-thread-2 Got foreach element key: 1007
pool-1-thread-1 Got foreach element key: 1027
pool-1-thread-2 Got foreach element key: 46
pool-1-thread-1 Got foreach element key: 3
pool-1-thread-2 Got foreach element key: 1027
pool-1-thread-1 Got foreach element key: 1021
pool-1-thread-2 Got foreach element key: 3
pool-1-thread-1 Got foreach element key: 1004
pool-1-thread-2 Got foreach element key: 1021
pool-1-thread-1 Got foreach element key: 1037
pool-1-thread-2 Got foreach element key: 1004
pool-1-thread-1 Got foreach element key: 1013
pool-1-thread-2 Got foreach element key: 1037
pool-1-thread-1 Got foreach element key: 9
pool-1-thread-2 Got foreach element key: 1013
pool-1-thread-1 Got foreach element key: 1015
pool-1-thread-2 Got foreach element key: 9
pool-1-thread-1 Got foreach element key: 13
pool-1-thread-2 Got foreach element key: 1015
pool-1-thread-1 Got foreach element key: 1026
pool-1-thread-2 Got foreach element key: 13
pool-1-thread-1 Got foreach element key: 1043
pool-1-thread-2 Got foreach element key: 1026
pool-1-thread-1 Got foreach element key: 1019
pool-1-thread-2 Got foreach element key: 1043
pool-1-thread-1 Got foreach element key: 1025
pool-1-thread-2 Got foreach element key: 1019
pool-1-thread-1 Got foreach element key: 1030
pool-1-thread-2 Got foreach element key: 1025
pool-1-thread-1 Got foreach element key: 1046
pool-1-thread-2 Got foreach element key: 1030
pool-1-thread-1 Got foreach element key: 1035
pool-1-thread-2 Got foreach element key: 1046
pool-1-thread-1 Got foreach element key: 1001
pool-1-thread-2 Got foreach element key: 1035
pool-1-thread-1 Got foreach element key: 17
pool-1-thread-2 Got foreach element key: 1001
pool-1-thread-1 Got foreach element key: 1009
pool-1-thread-2 Got foreach element key: 17
pool-1-thread-1 Got foreach element key: 1032
pool-1-thread-2 Got foreach element key: 1009
pool-1-thread-1 Got foreach element key: 1011
pool-1-thread-2 Got foreach element key: 1032
pool-1-thread-1 Got foreach element key: 1010
pool-1-thread-2 Got foreach element key: 1011
pool-1-thread-1 Got foreach element key: 35
pool-1-thread-2 Got foreach element key: 1010
pool-1-thread-1 Before removing 35
pool-1-thread-2 Got foreach element key: 35
pool-1-thread-1 Removed 35
pool-1-thread-2 Got foreach element key: 1008
pool-1-thread-1 Got foreach element key: 1008
pool-1-thread-2 Got foreach element key: 48
pool-1-thread-1 Got foreach element key: 48
pool-1-thread-2 Got foreach element key: 1012
pool-1-thread-1 Got foreach element key: 1012
pool-1-thread-2 Got foreach element key: 1029
pool-1-thread-1 Got foreach element key: 1029
pool-1-thread-2 Got foreach element key: 1041
pool-1-thread-1 Got foreach element key: 1041
pool-1-thread-2 Got foreach element key: 38
pool-1-thread-1 Got foreach element key: 38
pool-1-thread-2 Got foreach element key: 1045
pool-1-thread-1 Got foreach element key: 1045
pool-1-thread-2 Got foreach element key: 20
pool-1-thread-1 Got foreach element key: 20
pool-1-thread-2 Got foreach element key: 1042
pool-1-thread-1 Got foreach element key: 1042
pool-1-thread-2 Got foreach element key: 1023
pool-1-thread-1 Got foreach element key: 1023
pool-1-thread-2 Got foreach element key: 1000
pool-1-thread-1 Got foreach element key: 1000
pool-1-thread-2 Got foreach element key: 7
pool-1-thread-1 Got foreach element key: 7
pool-1-thread-2 Got foreach element key: 39
pool-1-thread-1 Got foreach element key: 39
pool-1-thread-2 Got foreach element key: 1018
pool-1-thread-1 Got foreach element key: 1018
pool-1-thread-2 Got foreach element key: 42
pool-1-thread-1 Got foreach element key: 42
pool-1-thread-2 Got foreach element key: 8
pool-1-thread-1 Got foreach element key: 8
pool-1-thread-2 Got foreach element key: 1017
pool-1-thread-1 Got foreach element key: 1017
pool-1-thread-2 Got foreach element key: 1006
pool-1-thread-1 Got foreach element key: 1006
pool-1-thread-2 Got foreach element key: 12
pool-1-thread-1 Got foreach element key: 12
pool-1-thread-2 Got foreach element key: 26
pool-1-thread-1 Got foreach element key: 26
pool-1-thread-2 Got foreach element key: 1036
pool-1-thread-1 Got foreach element key: 1036
pool-1-thread-2 Got foreach element key: 5
pool-1-thread-1 Got foreach element key: 5
pool-1-thread-1 Got foreach element key: 23
pool-1-thread-2 Got foreach element key: 23
pool-1-thread-1 Got foreach element key: 1002
pool-1-thread-2 Got foreach element key: 1002
pool-1-thread-1 Got foreach element key: 43
pool-1-thread-2 Before removing 1002
pool-1-thread-1 Got foreach element key: 22
pool-1-thread-1 Got foreach element key: 34
pool-1-thread-1 Got foreach element key: 1039
pool-1-thread-1 Got foreach element key: 15
pool-1-thread-1 Got foreach element key: 24
pool-1-thread-1 Got foreach element key: 1048
pool-1-thread-1 Got foreach element key: 1044
pool-1-thread-1 Got foreach element key: 10
pool-1-thread-1 Got foreach element key: 32
pool-1-thread-1 Got foreach element key: 33
pool-1-thread-1 Got foreach element key: 1005
pool-1-thread-1 Got foreach element key: 21
pool-1-thread-1 Got foreach element key: 4
pool-1-thread-1 Got foreach element key: 1014
pool-1-thread-1 Got foreach element key: 1031
pool-1-thread-1 Got foreach element key: 31
pool-1-thread-1 Got foreach element key: 47
pool-1-thread-1 Got foreach element key: 45
pool-1-thread-1 Got foreach element key: 11
pool-1-thread-1 Got foreach element key: 1003
pool-1-thread-1 Got foreach element key: 1020
pool-1-thread-1 Got foreach element key: 36
pool-1-thread-1 Got foreach element key: 1028
pool-1-thread-1 Got foreach element key: 41
pool-1-thread-1 Got foreach element key: 37
pool-1-thread-1 Got foreach element key: 28
pool-1-thread-1 Got foreach element key: 44
pool-1-thread-1 Got foreach element key: 1022
pool-1-thread-1 Got foreach element key: 1024
pool-1-thread-1 Got foreach element key: 19
pool-1-thread-1 Got foreach element key: 40
pool-1-thread-1 Got foreach element key: 1033
pool-1-thread-1 Got foreach element key: 1016
pool-1-thread-1 Got foreach element key: 14
pool-1-thread-1 Got foreach element key: 30
pool-1-thread-1 Got foreach element key: 1038
pool-1-thread-1 Got foreach element key: 27
pool-1-thread-1 Got foreach element key: 6
pool-1-thread-1 Before foreach
pool-1-thread-1 Got foreach element key: 0
pool-1-thread-1 Got foreach element key: 1047
pool-1-thread-1 Got foreach element key: 1049
pool-1-thread-1 Got foreach element key: 25
pool-1-thread-1 Got foreach element key: 1
pool-1-thread-1 Got foreach element key: 49
pool-1-thread-1 Got foreach element key: 16
pool-1-thread-1 Got foreach element key: 1040
pool-1-thread-1 Got foreach element key: 2
pool-1-thread-1 Got foreach element key: 29
pool-1-thread-1 Got foreach element key: 18
pool-1-thread-1 Got foreach element key: 1034
pool-1-thread-1 Got foreach element key: 1007
pool-1-thread-1 Got foreach element key: 46
pool-1-thread-1 Got foreach element key: 1027
pool-1-thread-1 Got foreach element key: 3
pool-1-thread-1 Got foreach element key: 1021
pool-1-thread-1 Got foreach element key: 1004
pool-1-thread-1 Got foreach element key: 1037
pool-1-thread-1 Got foreach element key: 1013
pool-1-thread-1 Got foreach element key: 9
pool-1-thread-1 Before removing 9
```
Code hangs on removing element from the map.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] mkozioro commented on issue #8783: non-persistent topic unsubscribing hangs and makes topic unusable
Posted by GitBox <gi...@apache.org>.
mkozioro commented on issue #8783:
URL: https://github.com/apache/pulsar/issues/8783#issuecomment-737773985
Hi.
I've checked again on broker with more debugging lines, and the first time issue appears is:
```
19:03:12.206 pulsar0-dev pulsar-broker {"logLevel":"DEBUG","logThread":"pulsar-inactivity-monitor-25-1","logger":"org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic","message":"[non-persistent://ad101we/batch_frontnotifier/pulsar0_batch_completed][pulsar0f59a8139-f4f5-42f3-b9a9-a8f854d12a99] Unsubscribing subscription from topic","stack_trace":null}
```
Which fits to code fron NonPersistentTopic:
```
@Override
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
log.debug("[{}][{}] Unsubscribing subscription from topic", getName(), subscriptionName);
subscriptions.remove(subscriptionName);
log.debug("[{}][{}] Unsubscribed subscription from topic", getName(), subscriptionName);
return CompletableFuture.completedFuture(null);
}
```
There is no "Unsubscribed subscription from topic" log line. Somehow operations on ConcurrentOpenHashMap hangs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] sijie closed issue #8783: non-persistent topic unsubscribing hangs and makes topic unusable
Posted by GitBox <gi...@apache.org>.
sijie closed issue #8783:
URL: https://github.com/apache/pulsar/issues/8783
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] mkozioro commented on issue #8783: non-persistent topic unsubscribing hangs and makes topic unusable
Posted by GitBox <gi...@apache.org>.
mkozioro commented on issue #8783:
URL: https://github.com/apache/pulsar/issues/8783#issuecomment-737184787
EDIT:
After enabling debug on org.apache.pulsar.broker.service.nonpersistent, we've seen adding new consumer & subscription hangs after logging:
```
12:49:15.534 pulsar1-dev pulsar-broker {"logLevel":"DEBUG","logThread":"Thread-1644","logger":"org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic","message":"[non-persistent://ad200we/batch_frontnotifier/pulsar1_batch_completed] [pulsar1e747acc1-59ae-4d94-bdd5-2104519a0c34] [2e1f8] Added consumer -- count: 443","stack_trace":null}
```
It fits to code in NonPersistentTopic:
```
lock.readLock().lock();
try {
if (isFenced) {
log.warn("[{}] Attempting to subscribe to a fenced topic", topic);
future.completeExceptionally(new TopicFencedException("Topic is temporarily unavailable"));
return future;
}
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Added consumer -- count: {}", topic, subscriptionName, consumerName,
USAGE_COUNT_UPDATER.get(this));
}
} finally {
lock.readLock().unlock();
}
NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName));
try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
addConsumerToSubscription(subscription, consumer);
if (!cnx.isActive()) {
consumer.close();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,
consumer.consumerName(), USAGE_COUNT_UPDATER.get(NonPersistentTopic.this));
}
future.completeExceptionally(
new BrokerServiceException("Connection was closed while the opening the cursor "));
} else {
log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
future.complete(consumer);
}
```
Is it possible that there is something wrong with subscriptions collection (ConcurrentOpenHashMap<String, NonPersistentSubscription>)?
When we saw previously Unsibscribing.... log, the next thing that should be called is:
```
@Override
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
subscriptions.remove(subscriptionName);
return CompletableFuture.completedFuture(null);
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org