You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Yang Ye (Created) (JIRA)" <ji...@apache.org> on 2012/02/17 23:42:57 UTC

[jira] [Created] (KAFKA-277) Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function

Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function
--------------------------------------------------------------------------------------------------------------------

                 Key: KAFKA-277
                 URL: https://issues.apache.org/jira/browse/KAFKA-277
             Project: Kafka
          Issue Type: Bug
            Reporter: Yang Ye


Shallow iterator just traverse the first level messages of a ByteBufferMessageSet, compressed messages won't be decompressed and treated individually 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Assigned] (KAFKA-277) Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function

Posted by "Jun Rao (Assigned) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao reassigned KAFKA-277:
-----------------------------

    Assignee: Yang Ye
    
> Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-277
>                 URL: https://issues.apache.org/jira/browse/KAFKA-277
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Yang Ye
>            Assignee: Yang Ye
>         Attachments: internal_iterator_with_unit_test.patch, shallow_iterator.patch
>
>
> Shallow iterator just traverse the first level messages of a ByteBufferMessageSet, compressed messages won't be decompressed and treated individually 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Resolved] (KAFKA-277) Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function

Posted by "Jun Rao (Resolved) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao resolved KAFKA-277.
---------------------------

       Resolution: Fixed
    Fix Version/s: 0.7.1

Thanks for the patch. It looks good. Just committed to trunk.
                
> Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-277
>                 URL: https://issues.apache.org/jira/browse/KAFKA-277
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Yang Ye
>            Assignee: Yang Ye
>             Fix For: 0.7.1
>
>         Attachments: internal_iterator_with_unit_test.patch, shallow_iterator.patch
>
>
> Shallow iterator just traverse the first level messages of a ByteBufferMessageSet, compressed messages won't be decompressed and treated individually 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-277) Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function

Posted by "Jun Rao (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13212133#comment-13212133 ] 

Jun Rao commented on KAFKA-277:
-------------------------------

Some comments:
1. The variable event just binds to every item in the sequence by the foreach method. There is no need to rename it since each item in processedEvents is supposed to be a single event.
2. In ByteBufferMessageSet, instead of duplicating code in shallowIterator, could we rename deepIterator to internalIterator and add a flag to control whether we want to do shallow iteration or deep iteration? In general, we don't want to expose the shallow iterator externally. So, it's better if we just add a verifyMessageSize method in ByteBufferMessageSet that uses shallow iterator.
 
                
> Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-277
>                 URL: https://issues.apache.org/jira/browse/KAFKA-277
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Yang Ye
>         Attachments: shallow_iterator.patch
>
>
> Shallow iterator just traverse the first level messages of a ByteBufferMessageSet, compressed messages won't be decompressed and treated individually 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-277) Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function

Posted by "Yang Ye (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yang Ye updated KAFKA-277:
--------------------------

    Attachment: shallow_iterator.patch

The lastConnectionTime adjustment is also in this patch.

Also the following file is also affected. Curious, in the repository copy, the variable "event" has no context (an variable not defined), only when I change it to events the make process can succeed. 

--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala  (revision 1245727)
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala  (working copy)
@@ -38,8 +38,8 @@
       processedEvents = cbkHandler.beforeSendingData(events)

     if(logger.isTraceEnabled)
-      processedEvents.foreach(event => trace("Handling event for Topic: %s, Partition: %d"
-        .format(event.getTopic, event.getPartition)))
+      processedEvents.foreach(events => trace("Handling event for Topic: %s, Partition: %d"
+        .format(events.getTopic, events.getPartition)))

     send(serialize(collate(processedEvents), serializer), syncProducer)
   }

                
> Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-277
>                 URL: https://issues.apache.org/jira/browse/KAFKA-277
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Yang Ye
>         Attachments: shallow_iterator.patch
>
>
> Shallow iterator just traverse the first level messages of a ByteBufferMessageSet, compressed messages won't be decompressed and treated individually 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-277) Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function

Posted by "Yang Ye (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yang Ye updated KAFKA-277:
--------------------------

    Attachment: internal_iterator_with_unit_test.patch

in ByteBufferMessageSet, internal_iterator() is built with one flag to control the deep or shallow behavior. verifyMessageSize() function is moved as an member function. 

Unit test is built as a separate function in SynchProducerTest.scala 
                
> Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-277
>                 URL: https://issues.apache.org/jira/browse/KAFKA-277
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Yang Ye
>         Attachments: internal_iterator_with_unit_test.patch, shallow_iterator.patch
>
>
> Shallow iterator just traverse the first level messages of a ByteBufferMessageSet, compressed messages won't be decompressed and treated individually 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-277) Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function

Posted by "Jun Rao (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13212738#comment-13212738 ] 

Jun Rao commented on KAFKA-277:
-------------------------------

Also, please add a unit test for this. Use a max_message size larger than each individual uncompressed message, but smaller than the compressed message.
                
> Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-277
>                 URL: https://issues.apache.org/jira/browse/KAFKA-277
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Yang Ye
>         Attachments: shallow_iterator.patch
>
>
> Shallow iterator just traverse the first level messages of a ByteBufferMessageSet, compressed messages won't be decompressed and treated individually 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira