You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/29 02:16:58 UTC
kafka git commit: KAFKA-1729;
Add constructor to javaapi to allow constructing explicitly versioned
offset commit requests; patched by Joel Koshy; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/0.8.2 2a1e3d451 -> 96ce96dc9
KAFKA-1729; Add constructor to javaapi to allow constructing explicitly versioned offset commit requests; patched by Joel Koshy; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96ce96dc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96ce96dc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96ce96dc
Branch: refs/heads/0.8.2
Commit: 96ce96dc959dd827ae1f70a17e89e05b91b6ba58
Parents: 2a1e3d4
Author: Joel Koshy <jj...@gmail.com>
Authored: Wed Jan 28 19:16:43 2015 -0600
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jan 28 19:16:43 2015 -0600
----------------------------------------------------------------------
.../main/scala/kafka/api/OffsetCommitResponse.scala | 4 +++-
.../kafka/javaapi/ConsumerMetadataResponse.scala | 6 ++++++
.../scala/kafka/javaapi/OffsetCommitRequest.scala | 14 ++++++++++++--
.../scala/kafka/javaapi/OffsetCommitResponse.scala | 9 +++++++++
.../scala/kafka/javaapi/OffsetFetchResponse.scala | 5 +++++
5 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 03dd736..abe67a5 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -20,7 +20,7 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.utils.Logging
-import kafka.common.TopicAndPartition
+import kafka.common.{ErrorMapping, TopicAndPartition}
object OffsetCommitResponse extends Logging {
val CurrentVersion: Short = 1
@@ -50,6 +50,8 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
+ def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != ErrorMapping.NoError }
+
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
buffer.putInt(commitStatusGroupedByTopic.size)
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
index 1b28861..d281bb3 100644
--- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
@@ -17,6 +17,8 @@
package kafka.javaapi
+import java.nio.ByteBuffer
+
import kafka.cluster.Broker
class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) {
@@ -40,3 +42,7 @@ class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadat
override def toString = underlying.toString
}
+
+object ConsumerMetadataResponse {
+ def readFrom(buffer: ByteBuffer) = new ConsumerMetadataResponse(kafka.api.ConsumerMetadataResponse.readFrom(buffer))
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 873f575..456c3c4 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -22,7 +22,8 @@ import kafka.common.{OffsetAndMetadata, TopicAndPartition}
class OffsetCommitRequest(groupId: String,
requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata],
correlationId: Int,
- clientId: String) {
+ clientId: String,
+ versionId: Short) {
val underlying = {
val scalaMap: collection.immutable.Map[TopicAndPartition, OffsetAndMetadata] = {
import collection.JavaConversions._
@@ -32,12 +33,21 @@ class OffsetCommitRequest(groupId: String,
kafka.api.OffsetCommitRequest(
groupId = groupId,
requestInfo = scalaMap,
- versionId = 0, // binds to version 0 so that it commits to Zookeeper
+ versionId = versionId,
correlationId = correlationId,
clientId = clientId
)
}
+ def this(groupId: String,
+ requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata],
+ correlationId: Int,
+ clientId: String) {
+
+ // by default bind to version 0 so that it commits to Zookeeper
+ this(groupId, requestInfo, correlationId, clientId, 0)
+ }
+
override def toString = underlying.toString
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
index c2d3d11..b222329 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
@@ -17,6 +17,8 @@
package kafka.javaapi
+import java.nio.ByteBuffer
+
import kafka.common.TopicAndPartition
import collection.JavaConversions
@@ -27,5 +29,12 @@ class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitRespons
underlying.commitStatus
}
+ def hasError = underlying.hasError
+
+ def errorCode(topicAndPartition: TopicAndPartition) = underlying.commitStatus(topicAndPartition)
+
}
+object OffsetCommitResponse {
+ def readFrom(buffer: ByteBuffer) = new OffsetCommitResponse(kafka.api.OffsetCommitResponse.readFrom(buffer))
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/96ce96dc/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
index 60924d2..c4bdb12 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
@@ -17,6 +17,8 @@
package kafka.javaapi
+import java.nio.ByteBuffer
+
import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
import collection.JavaConversions
@@ -29,3 +31,6 @@ class OffsetFetchResponse(private val underlying: kafka.api.OffsetFetchResponse)
}
+object OffsetFetchResponse {
+ def readFrom(buffer: ByteBuffer) = new OffsetFetchResponse(kafka.api.OffsetFetchResponse.readFrom(buffer))
+}