You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@curator.apache.org by "Jordan Zimmerman (Jira)" <ji...@apache.org> on 2021/11/14 10:14:00 UTC

[jira] [Comment Edited] (CURATOR-623) DistributedQueue stops filling after long disconnect from cluster

    [ https://issues.apache.org/jira/browse/CURATOR-623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17443282#comment-17443282 ] 

Jordan Zimmerman edited comment on CURATOR-623 at 11/14/21, 10:13 AM:
----------------------------------------------------------------------

The test is faulty. There's no guarantee which queue will service a message. In this case, {{aliveQueue}} is always getting the queued message and since it has no handler {{done}} never gets set. If you add the same handler to  {{aliveQueue}} that is set for {{dyingQueue}} then the test passes. Alternatively, you could make {{aliveQueue}} producer-only and it will work.


was (Author: randgalt):
The test is faulty. There's no guarantee which queue will service a message. In this case, {{aliveQueue}} is always getting the queued message and since it has no handler {{done}} never gets set. If you add the same handler to  {{aliveQueue}} that is set for {{dyingQueue}} then the test passes.

> DistributedQueue stops filling after long disconnect from cluster
> -----------------------------------------------------------------
>
>                 Key: CURATOR-623
>                 URL: https://issues.apache.org/jira/browse/CURATOR-623
>             Project: Apache Curator
>          Issue Type: Bug
>            Reporter: Никита Соколов
>            Assignee: Jordan Zimmerman
>            Priority: Major
>
> One of our VMs had network down for 12 minutes and after the network was up, the queues have stopped being filled by external processes as curator gave up on all watchers. Here is a test reproducing the issue:
> {code:java}
> import junit.framework.TestCase;
> import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.CuratorFrameworkFactory;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueBuilder;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.recipes.queue.QueueSerializer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.retry.ExponentialBackoffRetry;
> import org.apache.curator.test.TestingCluster;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.function.Consumer;
> public class DistributedQueueTest extends TestCase {
>     public void test() throws Exception {
>         final var done = new CompletableFuture<>();
>         try (
>             final var testingCluster = started(new TestingCluster(1));
>             final var dyingCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
>             final var dyingQueue = newQueue(dyingCuratorFramework, item -> {
>                 if (item.equals("0")) {
>                     done.complete(null);
>                 }
>             })
>         ) {
>             dyingQueue.start();
>             testingCluster.killServer(testingCluster.getInstances().iterator().next());
>             Thread.sleep(2 * 60_000);
>             testingCluster.restartServer(testingCluster.getInstances().iterator().next());
>             try (
>                 final var aliveCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
>                 final var aliveQueue = newQueue(aliveCuratorFramework, __ -> {})
>             ) {
>                 aliveQueue.start();
>                 aliveQueue.put("0");
>                 done.get(1, TimeUnit.MINUTES);
>             }
>         }
>     }
>     private static DistributedQueue<String> newQueue(CuratorFramework curatorFramework, Consumer<String> consumer) {
>         curatorFramework.start();
>         return QueueBuilder.builder(
>             curatorFramework,
>             new QueueConsumer<String>() {
>                 @Override
>                 public void consumeMessage(String o) {
>                     consumer.accept(o);
>                 }
>                 @Override
>                 public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
>                 }
>             },
>             new QueueSerializer<>() {
>                 @Override
>                 public byte[] serialize(String item) {
>                     return item.getBytes();
>                 }
>                 @Override
>                 public String deserialize(byte[] bytes) {
>                     return new String(bytes);
>                 }
>             },
>             "/MyChildrenCacheTest/queue"
>         ).buildQueue();
>     }
>     private static TestingCluster started(TestingCluster testingCluster) throws Exception {
>         try {
>             testingCluster.start();
>             return testingCluster;
>         } catch (Throwable throwable) {
>             try (testingCluster) {
>                 throw throwable;
>             }
>         }
>     }
>     private static CuratorFramework getCuratorFramework(String connectString) {
>         return CuratorFrameworkFactory.builder()
>             .ensembleProvider(new FixedEnsembleProvider(connectString, true))
>             .retryPolicy(new ExponentialBackoffRetry(1000, 3))
>             .build();
>     }
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)