You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "sigikjohn-vargheese (via GitHub)" <gi...@apache.org> on 2023/04/05 13:59:39 UTC

[GitHub] [pulsar] sigikjohn-vargheese commented on issue #19970: [Bug] Return value of getPartitionedStats doesn't contain subscription type

sigikjohn-vargheese commented on issue #19970:
URL: https://github.com/apache/pulsar/issues/19970#issuecomment-1497537294

   @tisonkun Here is pared down snippet. I have a service which connects to Pulsar admin API to create Partitioned topics and then get PartitionedTopicStats. Details from PartitionedTopicStats are then mapped to my POJO. I have a condition to check whether subscription type is empty. From PartitionedTopicStats it always comes back empty because that field is missing in the Admin API response. If I replace PartitionedTopicStats with TopicStats, I can see that the type is not empty. 
   
   Please note that this behavior can be verified using CLI as well
   pulsar-admin topics partitioned-stats <topicName> --> missing subscription type
   pulsar-admin topics stats <partitionedTopicName> --> has subscription type
   
   // Interface to connect to pulsar admin Java client
   public interface PulsarAdminService {
       PulsarAdmin getPulsarAdmin();
       Topics getTopics();
       CustomTopic createPartitionedTopic(String namespace, String topicPath, String createdBy, String description, int numPartitions);
   }
   
   // Service to connect to pulsar admin Java client
   @Service
   public class PulsarAdminServiceImpl implements PulsarAdminService {
   
   	private PulsarAdmin pulsarAdmin;
   
   	@Override
       	public Topics getTopics() {
               return getPulsarAdmin().topics();
       	}
   
        	@Override
       	public CustomTopic createPartitionedTopic(String namespace, String topicPath, String createdBy, String description, int numPartitions) {
               Map<String, String> properties = new HashMap<>();
               properties.put(CREATED_ON, new SimpleDateFormat(DATE_FORMAT).format(new Date()));
               properties.put(DESCRIPTION, description);
               properties.put(CREATED_BY, createdBy);
   	    try {
                   getTopics().createPartitionedTopic(topicPath, numPartitions, properties);
   
                   PartitionedTopicStats pts = getTopics().getPartitionedStats(topicPath, false);
   		return buildTopic(namespace, pts); 
   	    } catch (PulsarAdminException.ConflictException e) {
                   LOG.warn("Topic already exists", e);
               } 
         	}
   
   	@Override
       	public PulsarAdmin getPulsarAdmin() {
               if(pulsarAdmin == null){
               	pulsarAdmin = createPulsarAdmin();
               }
               return pulsarAdmin;
           }
   
   	private PulsarAdmin createPulsarAdmin() {
               try {
               	PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder();
               	pulsarAdminBuilder.serviceHttpUrl("http://localhost:8080");
               	pulsarAdminBuilder.authentication(AuthenticationFactory.token("real JWT goes here"));
               	pulsarAdminBuilder.allowTlsInsecureConnection(false);
               	pulsarAdminBuilder.tlsTrustCertsFilePath("some path");
               	pulsarAdminBuilder.enableTlsHostnameVerification(false);
               	return pulsarAdminBuilder.build();
               } catch (RuntimeException | PulsarClientException e) {
               	LOG.error("Failed to create Pulsar Admin instance., e);
               }
       	}
   
   	private CustomTopic buildTopic(String namespace, PartitionedTopicStats pts) {
   	    CustomTopic topic = new CustomTopic();
   	    Map<String, String> topicProps = partitionedTopicStats.getMetadata().properties;
   	    topic.setName(topicName);
   	    topic.setNamespace(namespace);
               topic.setPartitions(partitionedTopicStats.getMetadata().partitions);
               topic.setPersistent(true);
   	    topic.setDescription(topicProps.get("description"));
               topic.setCreatedBy(topicProps.get("createdBy"));
   	    List<CustomSubscription> subscriptionList = new ArrayList<>();
   	    for (Map.Entry<String, ? extends SubscriptionStats> entry : pts.getSubscriptions().entrySet()) {
               	CustomSubscription subscription = new CustomSubscription();
               	subscription.setName(entry.getKey());
   		SubscriptionStats subscriptionStats = entry.getValue();
   		if (null != subscriptionStats.getType() && !subscriptionStats.getType().trim().isEmpty()) { // DEFECT - type is missing
   		    subscription.setType(subscriptionStats.getType()); 
   		}
                   subscriptionList.add(subscription);
               }
   	    topic.setSubscriptions(subscriptionList);
   	    return topic;
   	}
   }
   
   // Custom POJOs
   public class CustomTopic implements Serializable {
   
       public static final String PROP_NAME = "name";
       public static final String PROP_NAMESPACE = "namespace";
       public static final String PROP_PERSISTENT = "persistent";
       public static final String PROP_SUBSCRIPTIONS = "subscriptions";
       public static final String PROP_DESCRIPTION = "description";
       public static final String PROP_CREATED_BY = "createdBy";
       public static final String PROP_PARTITIONS = "partitions";
   
       @NotNull
       private String name;
       @NotNull
       private String namespace;
       @NotNull
       private Boolean persistent;
       private List<CustomSubscription> subscriptions;
       private String description;
       private String createdBy;
       @NotNull
       private Integer partitions;
   
       public CustomTopic() {
       }
   
       public CustomTopic(@NotNull String name, @NotNull String namespace, @NotNull Boolean persistent, @NotNull Integer partitions) {
           this.name = name;
           this.environment = environment;
           this.persistent = persistent;
           this.partitions = partitions;
       }
   
       @NotNull
       public String getName() {
           return this.name;
       }
   
       public void setName(@NotNull String name) {
           this.name = name;
       }
   
       @NotNull
       public String getNamespace() {
           return this.namespace;
       }
   
       public void setNamespace(@NotNull String namespace) {
           this.namespace = namespace;
       }
   
       @NotNull
       public Boolean isPersistent() {
           return this.persistent;
       }
   
       public void setPersistent(@NotNull Boolean persistent) {
           this.persistent = persistent;
       }
   
       public List<CustomSubscription> getSubscriptions() {
           return this.subscriptions;
       }
   
       public void setSubscriptions(List<CustomSubscription> subscriptions) {
           this.subscriptions = subscriptions;
   
       public String getDescription() {
           return this.description;
       }
   
       public void setDescription(String description) {
           this.description = description;
       }
   
       public String getCreatedBy() {
           return this.createdBy;
       }
   
       public void setCreatedBy(String createdBy) {
           this.createdBy = createdBy;
       }
   
       @NotNull
       public Integer getPartitions() {
           return this.partitions;
       }
   
       public void setPartitions(@NotNull Integer partitions) {
           this.partitions = partitions;
       }
   }
   
   public class CustomSubscription implements Serializable {
   
       public static final String PROP_NAME = "name";
       public static final String PROP_TYPE = "type";
   
       @NotNull
       private String name;
       private String type;
   
       public CustomSubscription() {
       }
   
       public CustomSubscription(@NotNull String name) {
           this.name = name;
           this.topic = topic;
       }
   
       @NotNull
       public String getName() {
           return this.name;
       }
   
       public void setName(@NotNull String name) {
           this.name = name;
       }
   
       public String getType() {
           return this.type;
       }
   
       public void setType(String type) {
           this.type = type;
       }
   }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org