You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/09/04 22:03:13 UTC

[GitHub] [pulsar] vivekkrj commented on issue #4988: Producer create method blocked forever

vivekkrj commented on issue #4988: Producer create method blocked forever 
URL: https://github.com/apache/pulsar/issues/4988#issuecomment-528108316
 
 
   Here is the sample code base for the client and pruducer block:
   
   /**
   	 * 
   	 * @throws PulsarClientException
   	 */
   	private PulsarClient getProducerClient() throws PulsarClientException {
   		PulsarClient client = null;
   		if(producerOptionBean.isAuthenticationEnable()) {
   			Authentication authentication = AuthenticationFactory
   					.TLS(producerOptionBean.getClientCertificateFile(), producerOptionBean.getClientCertificateKey());
   			client = PulsarClient
   					.builder()
   					.serviceUrl(producerOptionBean.getPulsarBrokerUrl())
   					.tlsTrustCertsFilePath(producerOptionBean.getTrustCa())
   					.allowTlsInsecureConnection(producerOptionBean.isAllowTlsInsecureConnection())
   					.enableTlsHostnameVerification(false)
   					.authentication(authentication)
   					.build();
   		}
   		else {
   			client = PulsarClient
   					.builder()
   					.serviceUrl(producerOptionBean.getPulsarBrokerUrl())
   					.tlsTrustCertsFilePath(producerOptionBean.getTrustCa())
   					.allowTlsInsecureConnection(producerOptionBean.isAllowTlsInsecureConnection())
   					.build();
   		}
   		return client;
   	}
   
   /**
   	 * 
   	 * @throws PulsarClientException
   	 */
   	private Producer<byte[]> buildProducer(PulsarClient client, String producerName) throws PulsarClientException {
   		Producer<byte[]> producer = client.newProducer()
   				.topic(producerOptionBean.getPulsarTopic())
   				.producerName(producerName)
   				.enableBatching(producerOptionBean.isEnableBatching())
   				.batchingMaxPublishDelay(producerOptionBean.getBatchMessageMaxDelay(), TimeUnit.MILLISECONDS)
   				.sendTimeout(producerOptionBean.getMsgAckTimeout(), TimeUnit.SECONDS)
   				.compressionType(getCompressionType())
   				.maxPendingMessages(producerOptionBean.getMaxPendingMessages())
   				.batchingMaxMessages(producerOptionBean.getBatchingMaxMessages())
   				.blockIfQueueFull(producerOptionBean.isBlockIfQueueFull())
   				.create();
   		return producer;
   	}
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services