You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by gi...@git.apache.org on 2017/07/25 18:33:15 UTC

[GitHub] rdhabalia commented on a change in pull request #538: PIP-2: Introduce non-persistent topics

rdhabalia commented on a change in pull request #538: PIP-2: Introduce non-persistent topics
URL: https://github.com/apache/incubator-pulsar/pull/538#discussion_r129388496
 
 

 ##########
 File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
 ##########
 @@ -553,6 +568,155 @@ public void testReplicator() throws Exception {
 
     }
 
+    /**
+     * verifies load manager assigns topic only if broker started in non-persistent mode
+     * <pre>
+     * 1. Start broker with disable non-persistent topic mode
+     * 2. Create namespace with non-persistency set
+     * 3. Create non-persistent topic
+     * 4. Load-manager should not be able to find broker
+     * 5. Create producer on that topic should fail
+     * </pre>
+     */
+    @Test(dataProvider = "loadManager")
+    public void testLoadManagerAssignmentForNonPersistentTestAssignment(String loadManagerName) throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final String topicName = "non-persistent://" + namespace + "/loadManager";
+        final String defaultLoadManagerName = conf.getLoadManagerClassName();
+        final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics();
+        try {
+            // start broker to not own non-persistent namespace and create non-persistent namespace
+            stopBroker();
+            conf.setEnableNonPersistentTopics(false);
+            conf.setLoadManagerClassName(loadManagerName);
+            startBroker();
+
+            Field field = PulsarService.class.getDeclaredField("loadManager");
+            field.setAccessible(true);
+            AtomicReference<LoadManager> loadManagerRef = (AtomicReference<LoadManager>) field.get(pulsar);
+            LoadManager manager = LoadManager.create(pulsar);
+            manager.start();
+            loadManagerRef.set(manager);
+
+            NamespaceBundle fdqn = pulsar.getNamespaceService().getBundle(DestinationName.get(topicName));
+            LoadManager loadManager = pulsar.getLoadManager().get();
+            ResourceUnit broker = null;
+            try {
+                broker = loadManager.getLeastLoaded(fdqn);
+            } catch (Exception e) {
+                // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker)
+            }
+            assertNull(broker);
+
+            ProducerConfiguration producerConf = new ProducerConfiguration();
+            try {
+                Producer producer = pulsarClient.createProducerAsync(topicName, producerConf).get(1, TimeUnit.SECONDS);
+                producer.close();
+                fail("topic loading should have failed");
+            } catch (Exception e) {
+                // Ok
+            }
+            NonPersistentTopic topicRef = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
+            assertNull(topicRef);
+
+        } finally {
+            conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
+            conf.setLoadManagerClassName(defaultLoadManagerName);
+        }
+
+    }
+
+    /**
+     * verifies: broker should reject non-persistent topic loading if broker is not enable for non-persistent topic
+     * 
+     * @param loadManagerName
+     * @throws Exception
+     */
+    @Test
+    public void testNonPersistentTopicUnderPersistentNamespace() throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final String topicName = "non-persistent://" + namespace + "/persitentNamespace";
+
+        final boolean defaultENableNonPersistentTopic = conf.isEnableNonPersistentTopics();
+        try {
+            conf.setEnableNonPersistentTopics(false);
+            stopBroker();
+            startBroker();
+            ProducerConfiguration producerConf = new ProducerConfiguration();
+            try {
+                Producer producer = pulsarClient.createProducerAsync(topicName, producerConf).get(1, TimeUnit.SECONDS);
+                producer.close();
+                fail("topic loading should have failed");
+            } catch (Exception e) {
+                // Ok
+            }
+            NonPersistentTopic topicRef = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
+            assertNull(topicRef);
+        } finally {
+            conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic);
+        }
+    }
+
+    /**
+     * verifies that broker started with onlyNonPersistent mode doesn't own persistent-topic
+     * 
+     * @param loadManagerName
+     * @throws Exception
+     */
+    @Test(dataProvider = "loadManager")
+    public void testNonPersistentBrokerModeRejectPersistentTopic(String loadManagerName) throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final String topicName = "persistent://" + namespace + "/loadManager";
+        final String defaultLoadManagerName = conf.getLoadManagerClassName();
+        final boolean defaultEnablePersistentTopic = conf.isEnablePersistentTopics();
+        final boolean defaultEnableNonPersistentTopic = conf.isEnableNonPersistentTopics();
+        try {
+            // start broker to not own non-persistent namespace and create non-persistent namespace
+            stopBroker();
+            conf.setEnableNonPersistentTopics(true);
+            conf.setEnablePersistentTopics(false);
+            conf.setLoadManagerClassName(loadManagerName);
+            startBroker();
+
+            Field field = PulsarService.class.getDeclaredField("loadManager");
+            field.setAccessible(true);
+            AtomicReference<LoadManager> loadManagerRef = (AtomicReference<LoadManager>) field.get(pulsar);
+            LoadManager manager = LoadManager.create(pulsar);
+            manager.start();
+            loadManagerRef.set(manager);
+
+            NamespaceBundle fdqn = pulsar.getNamespaceService().getBundle(DestinationName.get(topicName));
+            LoadManager loadManager = pulsar.getLoadManager().get();
+            ResourceUnit broker = null;
+            try {
+                broker = loadManager.getLeastLoaded(fdqn);
+            } catch (Exception e) {
+                // Ok. (ModulearLoadManagerImpl throws RuntimeException incase don't find broker)
+            }
+            assertNull(broker);
+
+            ProducerConfiguration producerConf = new ProducerConfiguration();
+            try {
+                Producer producer = pulsarClient.createProducerAsync(topicName, producerConf).get(1, TimeUnit.SECONDS);
+                producer.close();
+                fail("topic loading should have failed");
+            } catch (Exception e) {
+                // Ok
+            }
 
 Review comment:
   actually in this test we start broker in `non-persistent` mode and we try to load `persistent` topic which will fail because load-manager doesn't find any broker which support persistent-topic.
   
   However, producer retries to do lookup and recreate but here, we have put the timeout on producer-creation which will timeout and completes the test.
   ` pulsarClient.createProducerAsync(topicName, producerConf).get(1, TimeUnit.SECONDS);`
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services