You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/06/30 11:41:20 UTC

[GitHub] [pulsar] sijie opened a new issue #4642: producer#flush doesn't work as expected

sijie opened a new issue #4642: producer#flush doesn't work as expected
URL: https://github.com/apache/pulsar/issues/4642
 
 
   **Describe the bug**
   
   The following code example should produce two message batches. but only one message batch is produced.
   
   ```
   producer.sendAsync("message1");
   producer.sendAsync("message2");
   producer.flush();
   
   producer.sendAsync("message3");
   producer.sendAsync("message4");
   producer.flush();
   ```
   
   **To Reproduce**
   
   Use the example code to reproduce
   
   **Expected behavior**
   
   When `flush` is triggered, it should flush out all the messages.
   
   **Additional context**
   
   The problem is due to following logic. if the connection is not connected, it doesn't send the batch. 
   
   ```
   private void processOpSendMsg(OpSendMsg op) {
           try {
               batchMessageContainer.clear();
               pendingMessages.put(op);
               ClientCnx cnx = cnx();
               if (isConnected()) {
                   // If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
                   // connection is established
                   op.cmd.retain();
                   cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
                   stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
               } else {
                   if (log.isDebugEnabled()) {
                       log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", topic, producerName,
                           op.sequenceId);
                   }
               }
           } catch (InterruptedException ie) {
               Thread.currentThread().interrupt();
               semaphore.release(op.numMessagesInBatch);
               if (op != null) {
                   op.callback.sendComplete(new PulsarClientException(ie));
               }
           } catch (Throwable t) {
               semaphore.release(op.numMessagesInBatch);
               log.warn("[{}] [{}] error while closing out batch -- {}", topic, producerName, t);
               if (op != null) {
                   op.callback.sendComplete(new PulsarClientException(t));
               }
           }
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services