You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2012/09/06 18:11:07 UTC

[jira] [Created] (KAFKA-496) high level producer send should return a response

Jun Rao created KAFKA-496:
-----------------------------

             Summary: high level producer send should return a response
                 Key: KAFKA-496
                 URL: https://issues.apache.org/jira/browse/KAFKA-496
             Project: Kafka
          Issue Type: Bug
          Components: core
            Reporter: Jun Rao
             Fix For: 0.8


Currently, Producer.send() doesn't return any value. In 0.8, since each produce request will be acked, we should pass the response back. What we can do is that if the producer is in sync mode, we can return a map of (topic,partitionId) -> (errorcode, offset). If the producer is in async mode, we can just return a null.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-496) high level producer send should return a response

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13458030#comment-13458030 ] 

Jay Kreps commented on KAFKA-496:
---------------------------------

I would propose we work back from what the user code would look like.

One point I would like to bring up is that the current producer only allows a single request at a time. This is a huge hit on throughput since a single producer can only utilize only one partition on one broker at any given time. For some uses where the # producers is huge compared to the number of brokers this is fine, but this is not universally true. The fix for this is to make the send() call non-blocking and support multiplexing requests over the connection. This is not hard to do--we have the correlation id in the requests so the only change is in the network layer to avoid reordering requests from the same connection. If we made this change that would make the producer request ALWAYS be a sort of future.

The state of futures in java and scala seems to be a little complex. Java has a future but it doesn't allow registering a callback. Fineagle and Akka both have custom versions of Future. There is a proposal to unify all these, though I don't know the status (http://docs.scala-lang.org/sips/pending/futures-promises.html).

For our purpose I recommend we just make our own. It supports two methods
trait Future[T] {
  /* returns true if the result is ready */
  def complete: Boolean
  /* add a function to be called when the result is ready. The function takes the result of the execution--either an exception or a object of type T. Note you can call this more than once to register multiple actions. */
  def onComplete((Either[T, Exception]) => Unit): Future[T]
  /* await completion and return the result or throw the exception */
  def result: T
}

So the function prototype would be
   def send(data: ProduceData*) => Future[ProduceResponse]

In the current code the future would immediately be satisfied for the sync producer. When we have fully implemented the non-blocking client it wouldn't. But this change would be transparent to the user.

I think there are a couple of use cases
1. You don't really care what the result it (basically "fire and forget"), in which case you use this api as you do today:
     send(...)
2. You want to make sure the send succeeded or do some follow up action but you don't mind blocking the current thread:
    val response = send(...).result
3. You want to do something more complicated. This could be sending out lots of requests without blocking then handling responses or asynchronously handling failures or whatever. In this case you would use
    send(...).onComplete { result: Either[T, Exception] =>
        result match {
          case result: T => .. do something
          case e: Exception => handle exception
     }
      


  
                
> high level producer send should return a response
> -------------------------------------------------
>
>                 Key: KAFKA-496
>                 URL: https://issues.apache.org/jira/browse/KAFKA-496
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jun Rao
>            Priority: Blocker
>              Labels: features
>             Fix For: 0.8
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Currently, Producer.send() doesn't return any value. In 0.8, since each produce request will be acked, we should pass the response back. What we can do is that if the producer is in sync mode, we can return a map of (topic,partitionId) -> (errorcode, offset). If the producer is in async mode, we can just return a null.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-496) high level producer send should return a response

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13457997#comment-13457997 ] 

Jun Rao commented on KAFKA-496:
-------------------------------

The following is a straw-man proposal:

Define the following class and trait.

class ProducerCallbackResponse (val response: ProducerResponse) {
}

trait ProducerCallback[V] {
  def onSerializationError(event: V, e: Throwable)
  def onSendResponse(response: ProducerResponse)

  def getCallbackResponse(response: ProducerResponse): Option[ProducerCallbackResponse]
}

1. In Producer, add a new api registerCallBack(callback: ProducerCallback[V]). Change the send api to:
   def send(producerData: ProducerData[K,V]*) : Option[ProducerCallbackResponse]

2. For sync Producer, define the following default callback. Send() will either get an exception or a ProducerCallbackResponse.

class DefaultSyncProducerCallback[V] extends ProducerCallback[V] {
  var response: Option[ProducerCallbackResponse] = none
  def oonSerializationError(event: V, e: Throwable) {
       throw e   
   }
   def onSendResponse(response: ProducerResponse) {
       // instantiate response with a DefaultSyncProducerResponse
       response = Some(new ProducerCallbackResponse(response))
   }

   def getCallbackResponse(): ProducerCallbackResponse = {
      return response
   }
}   

3. For async Producer, define the following default callback that simply ignores the callback.

class DefaultAsyncProducerCallback[V] extends ProducerCallback[V] {
  def onSerializationError(event: V, e: Throwable) {
       // let it go  
   }
   def onSendResponse(response: ProducerResponse) {
       // let it go
   }

   def getCallbackResponse(): ProducerCallbackResponse = {
      return none
   }
}   

4. A user can also define and register it's own customized ProducerCallback.

                
> high level producer send should return a response
> -------------------------------------------------
>
>                 Key: KAFKA-496
>                 URL: https://issues.apache.org/jira/browse/KAFKA-496
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jun Rao
>            Priority: Blocker
>              Labels: features
>             Fix For: 0.8
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Currently, Producer.send() doesn't return any value. In 0.8, since each produce request will be acked, we should pass the response back. What we can do is that if the producer is in sync mode, we can return a map of (topic,partitionId) -> (errorcode, offset). If the producer is in async mode, we can just return a null.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-496) high level producer send should return a response

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13458067#comment-13458067 ] 

Jun Rao commented on KAFKA-496:
-------------------------------

Actually, Producer already allows a client to send a list of ProducerData.
                
> high level producer send should return a response
> -------------------------------------------------
>
>                 Key: KAFKA-496
>                 URL: https://issues.apache.org/jira/browse/KAFKA-496
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jun Rao
>            Priority: Blocker
>              Labels: features
>             Fix For: 0.8
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Currently, Producer.send() doesn't return any value. In 0.8, since each produce request will be acked, we should pass the response back. What we can do is that if the producer is in sync mode, we can return a map of (topic,partitionId) -> (errorcode, offset). If the producer is in async mode, we can just return a null.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Assigned] (KAFKA-496) high level producer send should return a response

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps reassigned KAFKA-496:
-------------------------------

    Assignee: Jay Kreps
    
> high level producer send should return a response
> -------------------------------------------------
>
>                 Key: KAFKA-496
>                 URL: https://issues.apache.org/jira/browse/KAFKA-496
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jun Rao
>            Assignee: Jay Kreps
>            Priority: Blocker
>              Labels: features
>             Fix For: 0.8
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Currently, Producer.send() doesn't return any value. In 0.8, since each produce request will be acked, we should pass the response back. What we can do is that if the producer is in sync mode, we can return a map of (topic,partitionId) -> (errorcode, offset). If the producer is in async mode, we can just return a null.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (KAFKA-496) high level producer send should return a response

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-496:
-----------------------------

    Priority: Blocker  (was: Major)
    
> high level producer send should return a response
> -------------------------------------------------
>
>                 Key: KAFKA-496
>                 URL: https://issues.apache.org/jira/browse/KAFKA-496
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jun Rao
>            Priority: Blocker
>              Labels: features
>             Fix For: 0.8
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Currently, Producer.send() doesn't return any value. In 0.8, since each produce request will be acked, we should pass the response back. What we can do is that if the producer is in sync mode, we can return a map of (topic,partitionId) -> (errorcode, offset). If the producer is in async mode, we can just return a null.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-496) high level producer send should return a response

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13449842#comment-13449842 ] 

Jay Kreps commented on KAFKA-496:
---------------------------------

A more concrete description would be that we return a ProduceResponse which has an offset and error. Calls to 
  response.offset() or response.error()
would block until the request completed in the case of an async request. We would probably also need 
  response.onComplete(fun)
to register a callback that would be run when the response was done.

One question, though, is whether an error should result in an exception or in an error code when you try to get either field.

The advantage of this is that the semantics of produce would remain the same for both sync and async. Code written to work with sync could be changed to async with only a config change.

It would be worth thinking through if there is a use case for this because it is likely a bit more complicated.
                
> high level producer send should return a response
> -------------------------------------------------
>
>                 Key: KAFKA-496
>                 URL: https://issues.apache.org/jira/browse/KAFKA-496
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jun Rao
>              Labels: features
>             Fix For: 0.8
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Currently, Producer.send() doesn't return any value. In 0.8, since each produce request will be acked, we should pass the response back. What we can do is that if the producer is in sync mode, we can return a map of (topic,partitionId) -> (errorcode, offset). If the producer is in async mode, we can just return a null.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-496) high level producer send should return a response

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13449820#comment-13449820 ] 

Jay Kreps commented on KAFKA-496:
---------------------------------

An alternative would be give a Future back which would be immediately satisfied in the case of a synchronous call and would eventually become satisfied in the event of an async call. This is probably slightly harder to implement.
                
> high level producer send should return a response
> -------------------------------------------------
>
>                 Key: KAFKA-496
>                 URL: https://issues.apache.org/jira/browse/KAFKA-496
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jun Rao
>              Labels: features
>             Fix For: 0.8
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Currently, Producer.send() doesn't return any value. In 0.8, since each produce request will be acked, we should pass the response back. What we can do is that if the producer is in sync mode, we can return a map of (topic,partitionId) -> (errorcode, offset). If the producer is in async mode, we can just return a null.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira