You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/06/05 11:24:38 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #4435: Introduce batch message container framework and support key based batching container

codelipenghui commented on a change in pull request #4435: Introduce batch message container framework and support key based batching container
URL: https://github.com/apache/pulsar/pull/4435#discussion_r290691493
 
 

 ##########
 File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 ##########
 @@ -146,9 +114,51 @@ void clear() {
         batchedMessageMetadataAndPayload = null;
     }
 
-    boolean isEmpty() {
+    @Override
+    public boolean isEmpty() {
         return messages.isEmpty();
     }
 
-    private static final Logger log = LoggerFactory.getLogger(BatchMessageContainer.class);
+    @Override
+    public void handleException(Exception ex) {
+        try {
+            // Need to protect ourselves from any exception being thrown in the future handler from the application
+            firstCallback.sendComplete(ex);
+        } catch (Throwable t) {
+            log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
+                sequenceId, t);
+        }
+        ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
+        clear();
+    }
+
+    private OpSendMsg createOpSendMsg() throws IOException {
+        ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
+        messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+        ByteBufPair cmd = producer.sendMessage(producer.producerId, sequenceId, numMessagesInBatch,
+            messageMetadata.build(), encryptedPayload);
+
+        OpSendMsg op = OpSendMsg.create(messages, cmd, sequenceId, firstCallback);
+
+        if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+            cmd.release();
+            if (op != null) {
+                op.callback.sendComplete(new PulsarClientException.InvalidMessageException(
+                    "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
+                op.recycle();
+            }
+            return null;
+        }
+
+        op.setNumMessagesInBatch(numMessagesInBatch);
+        op.setBatchSizeByte(currentBatchSizeBytes);
+        return op;
+    }
+
+    @Override
+    public List<OpSendMsg> createOpSendMsgs() throws IOException {
 
 Review comment:
   Good point, i think can add a method `isMultiBatch`, if `isMultiBatch` is true call createOpSendMsgs() , otherwise call createOpSendMsg()

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services