You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/29 02:16:58 UTC

kafka git commit: KAFKA-1729; Add constructor to javaapi to allow constructing explicitly versioned offset commit requests; patched by Joel Koshy; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/0.8.2 2a1e3d451 -> 96ce96dc9


KAFKA-1729; Add constructor to javaapi to allow constructing explicitly versioned offset commit requests; patched by Joel Koshy; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96ce96dc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96ce96dc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96ce96dc

Branch: refs/heads/0.8.2
Commit: 96ce96dc959dd827ae1f70a17e89e05b91b6ba58
Parents: 2a1e3d4
Author: Joel Koshy <jj...@gmail.com>
Authored: Wed Jan 28 19:16:43 2015 -0600
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jan 28 19:16:43 2015 -0600

----------------------------------------------------------------------
 .../main/scala/kafka/api/OffsetCommitResponse.scala   |  4 +++-
 .../kafka/javaapi/ConsumerMetadataResponse.scala      |  6 ++++++
 .../scala/kafka/javaapi/OffsetCommitRequest.scala     | 14 ++++++++++++--
 .../scala/kafka/javaapi/OffsetCommitResponse.scala    |  9 +++++++++
 .../scala/kafka/javaapi/OffsetFetchResponse.scala     |  5 +++++
 5 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 03dd736..abe67a5 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -20,7 +20,7 @@ package kafka.api
 import java.nio.ByteBuffer
 
 import kafka.utils.Logging
-import kafka.common.TopicAndPartition
+import kafka.common.{ErrorMapping, TopicAndPartition}
 
 object OffsetCommitResponse extends Logging {
   val CurrentVersion: Short = 1
@@ -50,6 +50,8 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
 
   lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
 
+  def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != ErrorMapping.NoError }
+
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)
     buffer.putInt(commitStatusGroupedByTopic.size)

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
index 1b28861..d281bb3 100644
--- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
@@ -17,6 +17,8 @@
 
 package kafka.javaapi
 
+import java.nio.ByteBuffer
+
 import kafka.cluster.Broker
 
 class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) {
@@ -40,3 +42,7 @@ class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadat
   override def toString = underlying.toString
 
 }
+
+object ConsumerMetadataResponse {
+  def readFrom(buffer: ByteBuffer) = new ConsumerMetadataResponse(kafka.api.ConsumerMetadataResponse.readFrom(buffer))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 873f575..456c3c4 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -22,7 +22,8 @@ import kafka.common.{OffsetAndMetadata, TopicAndPartition}
 class OffsetCommitRequest(groupId: String,
                           requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata],
                           correlationId: Int,
-                          clientId: String) {
+                          clientId: String,
+                          versionId: Short) {
   val underlying = {
     val scalaMap: collection.immutable.Map[TopicAndPartition, OffsetAndMetadata] = {
       import collection.JavaConversions._
@@ -32,12 +33,21 @@ class OffsetCommitRequest(groupId: String,
     kafka.api.OffsetCommitRequest(
       groupId = groupId,
       requestInfo = scalaMap,
-      versionId = 0, // binds to version 0 so that it commits to Zookeeper
+      versionId = versionId,
       correlationId = correlationId,
       clientId = clientId
     )
   }
 
+  def this(groupId: String,
+           requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata],
+           correlationId: Int,
+           clientId: String) {
+
+    // by default bind to version 0 so that it commits to Zookeeper
+    this(groupId, requestInfo, correlationId, clientId, 0)
+  }
+
 
   override def toString = underlying.toString
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
index c2d3d11..b222329 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
@@ -17,6 +17,8 @@
 
 package kafka.javaapi
 
+import java.nio.ByteBuffer
+
 import kafka.common.TopicAndPartition
 import collection.JavaConversions
 
@@ -27,5 +29,12 @@ class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitRespons
     underlying.commitStatus
   }
 
+  def hasError = underlying.hasError
+
+  def errorCode(topicAndPartition: TopicAndPartition) = underlying.commitStatus(topicAndPartition)
+
 }
 
+object OffsetCommitResponse {
+  def readFrom(buffer: ByteBuffer) = new OffsetCommitResponse(kafka.api.OffsetCommitResponse.readFrom(buffer))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
index 60924d2..c4bdb12 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
@@ -17,6 +17,8 @@
 
 package kafka.javaapi
 
+import java.nio.ByteBuffer
+
 import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
 import collection.JavaConversions
 
@@ -29,3 +31,6 @@ class OffsetFetchResponse(private val underlying: kafka.api.OffsetFetchResponse)
 
 }
 
+object OffsetFetchResponse {
+  def readFrom(buffer: ByteBuffer) = new OffsetFetchResponse(kafka.api.OffsetFetchResponse.readFrom(buffer))
+}