You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/26 09:33:57 UTC

[GitHub] yush1ga closed pull request #1282: Fixed merge issues with DestinationName renaming

yush1ga closed pull request #1282: Fixed merge issues with DestinationName renaming
URL: https://github.com/apache/incubator-pulsar/pull/1282
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 092dc3617..2083cc786 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -31,7 +31,7 @@
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 
 public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
@@ -70,7 +70,7 @@ protected boolean isConsumersExceededOnTopic() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -88,7 +88,7 @@ protected boolean isConsumersExceededOnSubscription() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index d8510df7e..2ec804228 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -34,7 +34,6 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
@@ -50,8 +49,6 @@
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
-import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Producer;
 import org.apache.pulsar.broker.service.Replicator;
@@ -268,7 +265,7 @@ private boolean isProducersExceeded() {
         Policies policies;
         try {
             policies =  brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -591,7 +588,7 @@ protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic
         }
         return isReplicatorStarted.get();
     }
-    
+
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
         log.info("[{}] Removing replicator to {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -976,7 +973,7 @@ public void markBatchMessagePublished() {
         this.hasBatchMessagePublished = true;
     }
 
-    
-    
+
+
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2e59811b8..54e40b6f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -40,13 +40,13 @@
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
 import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
@@ -139,7 +139,7 @@ private boolean isConsumersExceededOnTopic() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -157,7 +157,7 @@ private boolean isConsumersExceededOnSubscription() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index bd197e84c..cc30836d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -23,9 +23,8 @@
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 
 import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -42,7 +41,7 @@
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
@@ -53,7 +52,7 @@
     private final PersistentTopic topic;
     private final ManagedCursor cursor;
     private final String name;
-    
+
     private boolean havePendingRead = false;
 
     private static final int MaxReadBatchSize = 100;
@@ -122,7 +121,7 @@ protected boolean isConsumersExceededOnTopic() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -140,7 +139,7 @@ protected boolean isConsumersExceededOnSubscription() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -201,7 +200,7 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o
                 if (future.isSuccess()) {
                     // acquire message-dispatch permits for already delivered messages
                     if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-                        topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent);    
+                        topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent);
                     }
                     // Schedule a new read batch operation only after the previous batch has been written to the socket
                     synchronized (PersistentDispatcherSingleActiveConsumer.this) {
@@ -303,7 +302,7 @@ protected void readMoreEntries(Consumer consumer) {
             }
 
             int messagesToRead = Math.min(availablePermits, readBatchSize);
-            
+
             // throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
             // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
             // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4d5d9746c..62017f0f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -351,7 +351,7 @@ private boolean isProducersExceeded() {
         Policies policies;
         try {
             policies =  brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -911,7 +911,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
                 boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster);
                 if (isReplicatorStarted) {
-                    future.complete(null);    
+                    future.complete(null);
                 } else {
                     future.completeExceptionally(new NamingException(
                             PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d0277ef73..c1b44c0b6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -39,7 +39,6 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
-import com.google.common.collect.ImmutableMap;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -89,12 +88,12 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
@@ -111,6 +110,8 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
@@ -405,7 +406,7 @@ public void testMaxProducersForNamespace() throws Exception {
         Policies policies = new Policies();
         policies.max_producers_per_topic = 2;
         when(pulsar.getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace())))
                 .thenReturn(Optional.of(policies));
         testMaxProducers();
     }
@@ -576,7 +577,7 @@ public void testMaxConsumersSharedForNamespace() throws Exception {
         policies.max_consumers_per_subscription = 2;
         policies.max_consumers_per_topic = 3;
         when(pulsar.getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace())))
                 .thenReturn(Optional.of(policies));
 
         testMaxConsumersShared();
@@ -667,7 +668,7 @@ public void testMaxConsumersFailoverForNamespace() throws Exception {
         policies.max_consumers_per_subscription = 2;
         policies.max_consumers_per_topic = 3;
         when(pulsar.getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace())))
                 .thenReturn(Optional.of(policies));
 
         testMaxConsumersFailover();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services