You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2012/09/27 21:20:09 UTC
svn commit: r1391168 - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/
core/src/main/scala/kafka/api/ core/src/main/scala/kafka/consumer/
core/src/main/scala/kafka/javaapi/
core/src/main/scala/kafka/javaapi/consumer/ ...
Author: jjkoshy
Date: Thu Sep 27 19:20:08 2012
New Revision: 1391168
URL: http://svn.apache.org/viewvc?rev=1391168&view=rev
Log:
getOffset Api should return different latest offset to non-follower and follower consumers. Also, implement a batched version of the getOffset Api. patched by Joel Koshy; reviewed by Jun Rao; KAFKA-501
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
Removed:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
Modified:
incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/GetOffsetShell.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Thu Sep 27 19:20:08 2012
@@ -16,15 +16,20 @@
*/
package kafka.etl;
+
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
-import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
@@ -144,7 +149,7 @@ public class KafkaETLContext {
public boolean fetchMore () throws IOException {
if (!hasMore()) return false;
-
+
FetchRequest fetchRequest = new FetchRequestBuilder()
.correlationId(requestId)
.clientId(_request.clientId())
@@ -216,15 +221,23 @@ public class KafkaETLContext {
/* get smallest and largest offsets*/
long[] range = new long[2];
- long[] startOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(),
- OffsetRequest.EarliestTime(), 1);
+ TopicAndPartition topicAndPartition = new TopicAndPartition(_request.getTopic(), _request.getPartition());
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
+ new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
+ OffsetRequest request = new OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId());
+ long[] startOffsets = _consumer.getOffsetsBefore(request).offsets(_request.getTopic(), _request.getPartition());
if (startOffsets.length != 1)
throw new IOException("input:" + _input + " Expect one smallest offset but get "
+ startOffsets.length);
range[0] = startOffsets[0];
- long[] endOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(),
- OffsetRequest.LatestTime(), 1);
+ requestInfo.clear();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
+ request = new OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId());
+ long[] endOffsets = _consumer.getOffsetsBefore(request).offsets(_request.getTopic(), _request.getPartition());
if (endOffsets.length != 1)
throw new IOException("input:" + _input + " Expect one latest offset but get "
+ endOffsets.length);
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Thu Sep 27 19:20:08 2012
@@ -30,8 +30,6 @@ object FetchRequest {
val CurrentVersion = 1.shortValue()
val DefaultCorrelationId = -1
val DefaultClientId = ""
- val DefaultReplicaId = -1
- val NonFollowerId = DefaultReplicaId
val DefaultMaxWait = 0
val DefaultMinBytes = 0
@@ -60,7 +58,7 @@ object FetchRequest {
case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = FetchRequest.DefaultClientId,
- replicaId: Int = FetchRequest.DefaultReplicaId,
+ replicaId: Int = Request.DefaultReplicaId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
@@ -113,7 +111,7 @@ case class FetchRequest(versionId: Short
})
}
- def isFromFollower = replicaId != FetchRequest.NonFollowerId
+ def isFromFollower = replicaId != Request.NonFollowerId
def numPartitions = requestInfo.size
}
@@ -124,7 +122,7 @@ class FetchRequestBuilder() {
private var correlationId = FetchRequest.DefaultCorrelationId
private val versionId = FetchRequest.CurrentVersion
private var clientId = FetchRequest.DefaultClientId
- private var replicaId = FetchRequest.DefaultReplicaId
+ private var replicaId = Request.DefaultReplicaId
private var maxWait = FetchRequest.DefaultMaxWait
private var minBytes = FetchRequest.DefaultMinBytes
private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo]
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Thu Sep 27 19:20:08 2012
@@ -19,6 +19,8 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.utils.Utils
+import kafka.common.TopicAndPartition
+
object OffsetRequest {
val CurrentVersion = 1.shortValue()
@@ -32,36 +34,67 @@ object OffsetRequest {
def readFrom(buffer: ByteBuffer): OffsetRequest = {
val versionId = buffer.getShort
val clientId = Utils.readShortString(buffer)
- val topic = Utils.readShortString(buffer, "UTF-8")
- val partition = buffer.getInt()
- val offset = buffer.getLong
- val maxNumOffsets = buffer.getInt
- new OffsetRequest(versionId, clientId, topic, partition, offset, maxNumOffsets)
+ val replicaId = buffer.getInt
+ val topicCount = buffer.getInt
+ val pairs = (1 to topicCount).flatMap(_ => {
+ val topic = Utils.readShortString(buffer)
+ val partitionCount = buffer.getInt
+ (1 to partitionCount).map(_ => {
+ val partitionId = buffer.getInt
+ val time = buffer.getLong
+ val maxNumOffsets = buffer.getInt
+ (TopicAndPartition(topic, partitionId), PartitionOffsetRequestInfo(time, maxNumOffsets))
+ })
+ })
+ OffsetRequest(Map(pairs:_*), versionId = versionId, clientId = clientId, replicaId = replicaId)
}
}
-case class OffsetRequest(versionId: Short = OffsetRequest.CurrentVersion,
- clientId: String = OffsetRequest.DefaultClientId,
- topic: String,
- partition: Int,
- time: Long,
- maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
- def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) =
- this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets)
+case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
+case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo],
+ versionId: Short = OffsetRequest.CurrentVersion,
+ clientId: String = OffsetRequest.DefaultClientId,
+ replicaId: Int = Request.DefaultReplicaId)
+ extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
+ lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
Utils.writeShortString(buffer, clientId)
- Utils.writeShortString(buffer, topic)
- buffer.putInt(partition)
- buffer.putLong(time)
- buffer.putInt(maxNumOffsets)
+ buffer.putInt(replicaId)
+
+ buffer.putInt(requestInfoGroupedByTopic.size) // topic count
+ requestInfoGroupedByTopic.foreach {
+ case((topic, partitionInfos)) =>
+ Utils.writeShortString(buffer, topic)
+ buffer.putInt(partitionInfos.size) // partition count
+ partitionInfos.foreach {
+ case (TopicAndPartition(_, partition), partitionInfo) =>
+ buffer.putInt(partition)
+ buffer.putLong(partitionInfo.time)
+ buffer.putInt(partitionInfo.maxNumOffsets)
+ }
+ }
}
- def sizeInBytes(): Int = 2 + (2 + clientId.length()) + (2 + topic.length) + 4 + 8 + 4
+ def sizeInBytes =
+ 2 + /* versionId */
+ Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) +
+ 4 + /* replicaId */
+ 4 + /* topic count */
+ requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
+ val (topic, partitionInfos) = currTopic
+ foldedTopics +
+ Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
+ 4 + /* partition count */
+ partitionInfos.size * (
+ 4 + /* partition */
+ 8 + /* time */
+ 4 /* maxNumOffsets */
+ )
+ })
- override def toString(): String= "OffsetRequest(version:" + versionId + ", client id:" + clientId +
- ", topic:" + topic + ", part:" + partition + ", time:" + time + ", maxNumOffsets:" + maxNumOffsets + ")"
+ def isFromFollower = replicaId != Request.NonFollowerId
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala Thu Sep 27 19:20:08 2012
@@ -18,43 +18,77 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.common.ErrorMapping
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.utils.Utils
object OffsetResponse {
+
def readFrom(buffer: ByteBuffer): OffsetResponse = {
val versionId = buffer.getShort
- val errorCode = buffer.getShort
- val offsetsSize = buffer.getInt
- val offsets = new Array[Long](offsetsSize)
- for( i <- 0 until offsetsSize) {
- offsets(i) = buffer.getLong
- }
- new OffsetResponse(versionId, offsets, errorCode)
+ val numTopics = buffer.getInt
+ val pairs = (1 to numTopics).flatMap(_ => {
+ val topic = Utils.readShortString(buffer)
+ val numPartitions = buffer.getInt
+ (1 to numPartitions).map(_ => {
+ val partition = buffer.getInt
+ val error = buffer.getShort
+ val numOffsets = buffer.getInt
+ val offsets = (1 to numOffsets).map(_ => buffer.getLong)
+ (TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets))
+ })
+ })
+ OffsetResponse(versionId, Map(pairs:_*))
}
+
}
+
+case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long])
+
+
case class OffsetResponse(versionId: Short,
- offsets: Array[Long],
- errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
- val sizeInBytes = 2 + 2 + offsets.foldLeft(4)((sum, _) => sum + 8)
+ partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse])
+ extends RequestOrResponse {
- def writeTo(buffer: ByteBuffer) {
- buffer.putShort(versionId)
- /* error code */
- buffer.putShort(errorCode)
- buffer.putInt(offsets.length)
- offsets.foreach(buffer.putLong(_))
+ lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic)
+
+ def hasError = partitionErrorAndOffsets.values.exists(_.error != ErrorMapping.NoError)
+
+ val sizeInBytes = {
+ 2 + /* versionId */
+ 4 + /* topic count */
+ offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
+ val (topic, errorAndOffsetsMap) = currTopic
+ foldedTopics +
+ Utils.shortStringLength(topic) +
+ 4 + /* partition count */
+ errorAndOffsetsMap.foldLeft(0)((foldedPartitions, currPartition) => {
+ foldedPartitions +
+ 4 + /* partition id */
+ 2 + /* partition error */
+ 4 + /* offset array length */
+ currPartition._2.offsets.size * 8 /* offset */
+ })
+ })
}
- // need to override case-class equals due to broken java-array equals()
- override def equals(other: Any): Boolean = {
- other match {
- case that: OffsetResponse =>
- ( versionId == that.versionId &&
- errorCode == that.errorCode &&
- offsets.toSeq == that.offsets.toSeq)
- case _ => false
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putShort(versionId)
+ buffer.putInt(offsetsGroupedByTopic.size) // topic count
+ offsetsGroupedByTopic.foreach {
+ case((topic, errorAndOffsetsMap)) =>
+ Utils.writeShortString(buffer, topic)
+ buffer.putInt(errorAndOffsetsMap.size) // partition count
+ errorAndOffsetsMap.foreach {
+ case((TopicAndPartition(_, partition), errorAndOffsets)) =>
+ buffer.putInt(partition)
+ buffer.putShort(errorAndOffsets.error)
+ buffer.putInt(errorAndOffsets.offsets.size) // offset array length
+ errorAndOffsets.offsets.foreach(buffer.putLong(_))
+ }
}
}
+
}
+
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala Thu Sep 27 19:20:08 2012
@@ -66,12 +66,11 @@ case class ProducerResponse(versionId: S
foldedTopics +
Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
4 + /* partition count for this topic */
- currTopic._2.foldLeft (0) ((foldedPartitions, currPartition) => {
- foldedPartitions +
+ currTopic._2.size * {
4 + /* partition id */
2 + /* error code */
8 /* offset */
- })
+ }
})
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala Thu Sep 27 19:20:08 2012
@@ -25,6 +25,12 @@ object RequestOrResponse {
}
+object Request {
+ val DefaultReplicaId = -1
+ val NonFollowerId = DefaultReplicaId
+}
+
+
private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
def sizeInBytes: Int
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala Thu Sep 27 19:20:08 2012
@@ -20,7 +20,7 @@ package kafka.consumer
import kafka.cluster.Broker
import kafka.server.AbstractFetcherThread
import kafka.message.ByteBufferMessageSet
-import kafka.api.{FetchRequest, OffsetRequest, PartitionData}
+import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, PartitionData}
import kafka.common.TopicAndPartition
@@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String
val consumerFetcherManager: ConsumerFetcherManager)
extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs,
socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize,
- fetcherBrokerId = FetchRequest.NonFollowerId, maxWait = config.maxFetchWaitMs,
+ fetcherBrokerId = Request.NonFollowerId, maxWait = config.maxFetchWaitMs,
minBytes = config.minFetchBytes) {
// process fetched data
@@ -50,12 +50,13 @@ class ConsumerFetcherThread(name: String
case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime
case _ => startTimestamp = OffsetRequest.LatestTime
}
- val newOffset = simpleConsumer.getOffsetsBefore(topic, partitionId, startTimestamp, 1)(0)
-
+ val topicAndPartition = TopicAndPartition(topic, partitionId)
+ val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1)))
+ val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionId))
pti.resetFetchOffset(newOffset)
pti.resetConsumeOffset(newOffset)
- return newOffset
+ newOffset
}
// any logic for partitions whose leader has changed
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Thu Sep 27 19:20:08 2012
@@ -20,10 +20,10 @@ package kafka.consumer
import kafka.api._
import kafka.network._
import kafka.utils._
-import kafka.common.ErrorMapping
import java.util.concurrent.TimeUnit
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+
/**
* A consumer of kafka messages
*/
@@ -110,18 +110,10 @@ class SimpleConsumer( val host: String,
/**
* Get a list of valid offsets (up to maxSize) before the given time.
- * The result is a list of offsets, in descending order.
- *
- * @param time: time in millisecs (-1, from the latest offset available, -2 from the smallest offset available)
- * @return an array of offsets
+ * @param request a [[kafka.api.OffsetRequest]] object.
+ * @return a [[kafka.api.OffsetResponse]] object.
*/
- def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = {
- val request = new OffsetRequest(topic, partition, time, maxNumOffsets)
- val offsetResponse = OffsetResponse.readFrom(sendRequest(request).buffer)
- // try to throw exception based on global error codes
- ErrorMapping.maybeThrowException(offsetResponse.errorCode)
- offsetResponse.offsets
- }
+ def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer)
private def getOrMakeConnection() {
if(!blockingChannel.isConnected) {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Sep 27 19:20:08 2012
@@ -27,10 +27,11 @@ import org.I0Itec.zkclient.exception.ZkN
import java.net.InetAddress
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
+import kafka.api.PartitionOffsetRequestInfo
import java.util.UUID
import kafka.serializer.Decoder
import kafka.utils.ZkUtils._
-import kafka.common.{KafkaException, NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException}
+import kafka.common._
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Utils._
@@ -263,8 +264,9 @@ private[kafka] class ZookeeperConsumerCo
}
simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
ConsumerConfig.SocketBufferSize)
- val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1)
- producedOffset = offsets(0)
+ val topicAndPartition = TopicAndPartition(topic, partitionId)
+ val request = OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
+ producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
} catch {
case e =>
error("error in earliestOrLatestOffset() ", e)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala Thu Sep 27 19:20:08 2012
@@ -30,6 +30,9 @@ private[javaapi] object Implicits extend
implicit def toJavaTopicMetadataResponse(response: kafka.api.TopicMetadataResponse): kafka.javaapi.TopicMetadataResponse =
new kafka.javaapi.TopicMetadataResponse(response)
+ implicit def toJavaOffsetResponse(response: kafka.api.OffsetResponse): kafka.javaapi.OffsetResponse =
+ new kafka.javaapi.OffsetResponse(response)
+
implicit def optionToJavaRef[T](opt: Option[T]): T = {
opt match {
case Some(obj) => obj
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala?rev=1391168&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala Thu Sep 27 19:20:08 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.TopicAndPartition
+import kafka.api.{Request, PartitionOffsetRequestInfo}
+import collection.JavaConversions
+import java.nio.ByteBuffer
+
+
+class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffsetRequestInfo],
+ versionId: Short,
+ clientId: String) {
+
+ val underlying = {
+ val scalaMap = JavaConversions.asMap(requestInfo).toMap
+ kafka.api.OffsetRequest(
+ requestInfo = scalaMap,
+ versionId = versionId,
+ clientId = clientId,
+ replicaId = Request.NonFollowerId
+ )
+ }
+
+
+ def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
+
+
+ def sizeInBytes = underlying.sizeInBytes
+
+
+ override def toString = underlying.toString
+
+
+ override def equals(other: Any) = canEqual(other) && {
+ val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetRequest]
+ this.underlying.equals(otherOffsetRequest.underlying)
+ }
+
+
+ def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetRequest]
+
+
+ override def hashCode = underlying.hashCode
+
+}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetResponse.scala?rev=1391168&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetResponse.scala Thu Sep 27 19:20:08 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.TopicAndPartition
+
+class OffsetResponse(private val underlying: kafka.api.OffsetResponse) {
+
+ def hasError = underlying.hasError
+
+
+ def errorCode(topic: String, partition: Int) =
+ underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).error
+
+
+ def offsets(topic: String, partition: Int) =
+ underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).offsets.toArray
+
+
+ override def equals(other: Any) = canEqual(other) && {
+ val otherOffsetResponse = other.asInstanceOf[kafka.javaapi.OffsetResponse]
+ this.underlying.equals(otherOffsetResponse.underlying)
+ }
+
+
+ def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetResponse]
+
+
+ override def hashCode = underlying.hashCode
+
+
+ override def toString = underlying.toString
+
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala Thu Sep 27 19:20:08 2012
@@ -19,6 +19,8 @@ package kafka.javaapi.consumer
import kafka.utils.threadsafe
import kafka.javaapi.FetchResponse
+import kafka.javaapi.OffsetRequest
+
/**
* A consumer of kafka messages
@@ -67,13 +69,14 @@ class SimpleConsumer(val host: String,
/**
* Get a list of valid offsets (up to maxSize) before the given time.
- * The result is a list of offsets, in descending order.
*
- * @param time: time in millisecs (-1, from the latest offset available, -2 from the smallest offset available)
- * @return an array of offsets
+ * @param request a [[kafka.javaapi.OffsetRequest]] object.
+ * @return a [[kafka.javaapi.OffsetResponse]] object.
*/
- def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] =
- underlying.getOffsetsBefore(topic, partition, time, maxNumOffsets)
+ def getOffsetsBefore(request: OffsetRequest): kafka.javaapi.OffsetResponse = {
+ import kafka.javaapi.Implicits._
+ underlying.getOffsetsBefore(request.underlying)
+ }
def close() {
underlying.close
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Thu Sep 27 19:20:08 2012
@@ -81,12 +81,10 @@ object Log {
nf.format(offset) + FileSuffix
}
- def getEmptyOffsets(request: OffsetRequest): Array[Long] = {
- if (request.time == OffsetRequest.LatestTime || request.time == OffsetRequest.EarliestTime)
- return Array(0L)
- else
- return Array()
- }
+ def getEmptyOffsets(timestamp: Long): Seq[Long] =
+ if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
+ Seq(0L)
+ else Nil
}
@@ -389,7 +387,7 @@ private[kafka] class Log( val dir: File,
}
}
- def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
+ def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
val segsArray = segments.view
var offsetTimeArray: Array[(Long, Long)] = null
if (segsArray.last.size > 0)
@@ -403,7 +401,7 @@ private[kafka] class Log( val dir: File,
offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.sizeInBytes(), time.milliseconds)
var startIndex = -1
- request.time match {
+ timestamp match {
case OffsetRequest.LatestTime =>
startIndex = offsetTimeArray.length - 1
case OffsetRequest.EarliestTime =>
@@ -413,20 +411,21 @@ private[kafka] class Log( val dir: File,
debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
startIndex = offsetTimeArray.length - 1
while (startIndex >= 0 && !isFound) {
- if (offsetTimeArray(startIndex)._2 <= request.time)
+ if (offsetTimeArray(startIndex)._2 <= timestamp)
isFound = true
else
startIndex -=1
}
}
- val retSize = request.maxNumOffsets.min(startIndex + 1)
+ val retSize = maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
for (j <- 0 until retSize) {
ret(j) = offsetTimeArray(startIndex)._1
startIndex -= 1
}
- ret
+ // ensure that the returned seq is in descending order of offsets
+ ret.toSeq.sortBy(- _)
}
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Thu Sep 27 19:20:08 2012
@@ -23,7 +23,7 @@ import scala.collection._
import kafka.server.KafkaConfig
import kafka.api.OffsetRequest
import kafka.log.Log._
-import kafka.common.KafkaException
+import kafka.common.{TopicAndPartition, KafkaException}
/**
* The guy who creates and hands out logs
@@ -104,11 +104,11 @@ private[kafka] class LogManager(val conf
}
}
- def getOffsets(offsetRequest: OffsetRequest): Array[Long] = {
- val log = getLog(offsetRequest.topic, offsetRequest.partition)
+ def getOffsets(topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
+ val log = getLog(topicAndPartition.topic, topicAndPartition.partition)
log match {
- case Some(l) => l.getOffsetsBefore(offsetRequest)
- case None => getEmptyOffsets(offsetRequest)
+ case Some(l) => l.getOffsetsBefore(timestamp, maxNumOffsets)
+ case None => getEmptyOffsets(timestamp)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Sep 27 19:20:08 2012
@@ -349,21 +349,36 @@ class KafkaApis(val requestChannel: Requ
requestLogger.trace("Handling offset request " + offsetRequest.toString)
trace("Handling offset request " + offsetRequest.toString)
- var response: OffsetResponse = null
- try {
- // ensure leader exists
- replicaManager.getLeaderReplicaIfLocal(offsetRequest.topic, offsetRequest.partition)
- val offsets = replicaManager.logManager.getOffsets(offsetRequest)
- response = new OffsetResponse(offsetRequest.versionId, offsets)
- } catch {
- case ioe: IOException =>
- fatal("Halting due to unrecoverable I/O error while handling producer request: " + ioe.getMessage, ioe)
- System.exit(1)
- case e =>
- warn("Error while responding to offset request", e)
- response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],
- ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
- }
+ val responseMap = offsetRequest.requestInfo.map(elem => {
+ val (topicAndPartition, partitionOffsetRequestInfo) = elem
+ try {
+ // ensure leader exists
+ val leader = replicaManager.getLeaderReplicaIfLocal(
+ topicAndPartition.topic, topicAndPartition.partition)
+ val offsets = {
+ val allOffsets = replicaManager.logManager.getOffsets(topicAndPartition,
+ partitionOffsetRequestInfo.time,
+ partitionOffsetRequestInfo.maxNumOffsets)
+ if (offsetRequest.isFromFollower) allOffsets
+ else {
+ val hw = leader.highWatermark
+ if (allOffsets.exists(_ > hw))
+ hw +: allOffsets.dropWhile(_ > hw)
+ else allOffsets
+ }
+ }
+ (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
+ } catch {
+ case ioe: IOException =>
+ fatal("Halting due to unrecoverable I/O error while handling offset request: " + ioe.getMessage, ioe)
+ // compiler requires scala.sys.exit (not System.exit).
+ exit(1)
+ case e =>
+ warn("Error while responding to offset request", e)
+ (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
+ }
+ })
+ val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Thu Sep 27 19:20:08 2012
@@ -17,7 +17,7 @@
package kafka.server
-import kafka.api.{OffsetRequest, PartitionData}
+import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, PartitionData}
import kafka.cluster.Broker
import kafka.message.ByteBufferMessageSet
import kafka.common.TopicAndPartition
@@ -51,10 +51,15 @@ class ReplicaFetcherThread(name:String,
// handle a partition whose offset is out of range and return a new fetch offset
def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = {
// This means the local replica is out of date. Truncate the log and catch up from beginning.
- val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, OffsetRequest.EarliestTime, 1)
+ val topicAndPartition = TopicAndPartition(topic, partitionId)
+ val request = OffsetRequest(
+ replicaId = brokerConfig.brokerId,
+ requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
+ )
+ val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
val replica = replicaMgr.getReplica(topic, partitionId).get
- replica.log.get.truncateAndStartWithNewOffset(offsets(0))
- return offsets(0)
+ replica.log.get.truncateAndStartWithNewOffset(offset)
+ offset
}
// any logic for partitions whose leader has changed
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala Thu Sep 27 19:20:08 2012
@@ -23,9 +23,14 @@ import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
import kafka.consumer.SimpleConsumer
import collection.mutable.Map
+import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
+import kafka.common.TopicAndPartition
+import scala.collection._
+
+
object ConsumerOffsetChecker extends Logging {
- private val consumerMap: Map[String, Option[SimpleConsumer]] = Map()
+ private val consumerMap: mutable.Map[String, Option[SimpleConsumer]] = mutable.Map()
private val BidPidPattern = """(\d+)-(\d+)""".r
@@ -61,8 +66,10 @@ object ConsumerOffsetChecker extends Log
bid, getConsumer(zkClient, bid))
consumerOpt match {
case Some(consumer) =>
- val logSize =
- consumer.getOffsetsBefore(topic, pid.toInt, -1, 1).last.toLong
+ val topicAndPartition = TopicAndPartition(topic, pid.toInt)
+ val request =
+ OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+ val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
println("%20s%d".format("Log size = ", logSize))
println("%20s%,d (%,.2fG)".format("= ", logSize, logSize / math.pow(1024, 3)))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/GetOffsetShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/GetOffsetShell.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/GetOffsetShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/GetOffsetShell.scala Thu Sep 27 19:20:08 2012
@@ -21,6 +21,9 @@ package kafka.tools
import kafka.consumer._
import joptsimple._
import java.net.URI
+import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
+import kafka.common.TopicAndPartition
+
object GetOffsetShell {
@@ -65,7 +68,9 @@ object GetOffsetShell {
var time = options.valueOf(timeOpt).longValue
val nOffsets = options.valueOf(nOffsetsOpt).intValue
val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000)
- val offsets = consumer.getOffsetsBefore(topic, partition, time, nOffsets)
+ val topicAndPartition = TopicAndPartition(topic, partition)
+ val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
+ val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
println("get " + offsets.length + " results")
for (offset <- offsets)
println(offset)
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala?rev=1391168&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala Thu Sep 27 19:20:08 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.consumer.{SimpleConsumer, ConsumerConfig}
+import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
+import kafka.common.{TopicAndPartition, KafkaException}
+import kafka.utils.{ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, Utils}
+
+
+/**
+ * A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.
+ */
+object UpdateOffsetsInZK {
+ val Earliest = "earliest"
+ val Latest = "latest"
+
+ def main(args: Array[String]) {
+ if(args.length < 3)
+ usage
+ val config = new ConsumerConfig(Utils.loadProps(args(1)))
+ val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
+ config.zkConnectionTimeoutMs, ZKStringSerializer)
+ args(0) match {
+ case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2))
+ case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2))
+ case _ => usage
+ }
+ }
+
+ private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
+ val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic))
+ var partitions: Seq[Int] = Nil
+
+ partitionsPerTopicMap.get(topic) match {
+ case Some(l) => partitions = l.sortWith((s,t) => s < t)
+ case _ => throw new RuntimeException("Can't find topic " + topic)
+ }
+
+ var numParts = 0
+ for (partition <- partitions) {
+ val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+
+ val broker = brokerHostingPartition match {
+ case Some(b) => b
+ case None => throw new KafkaException("Broker " + brokerHostingPartition + " is unavailable. Cannot issue " +
+ "getOffsetsBefore request")
+ }
+
+ ZkUtils.getBrokerInfo(zkClient, broker) match {
+ case Some(brokerInfo) =>
+ val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
+ val topicAndPartition = TopicAndPartition(topic, partition)
+ val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1)))
+ val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+ val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+
+ println("updating partition " + partition + " with new offset: " + offset)
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
+ numParts += 1
+ case None => throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker))
+ }
+ }
+ println("updated the offset for " + numParts + " partitions")
+ }
+
+ private def usage() = {
+ println("USAGE: " + UpdateOffsetsInZK.getClass.getName + " [earliest | latest] consumer.properties topic")
+ System.exit(1)
+ }
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Thu Sep 27 19:20:08 2012
@@ -31,6 +31,7 @@ import joptsimple.{OptionSpec, OptionSet
import kafka.common.KafkaException
import kafka.cluster.Broker
import util.parsing.json.JSON
+import kafka.api.RequestOrResponse
import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
import kafka.producer.{ProducerPool, SyncProducer}
@@ -155,7 +156,7 @@ object Utils extends Logging {
* @param buffer The buffer to read from
* @param encoding The encoding in which to read the string
*/
- def readShortString(buffer: ByteBuffer, encoding: String = "UTF-8"): String = {
+ def readShortString(buffer: ByteBuffer, encoding: String = RequestOrResponse.DefaultCharset): String = {
val size: Int = buffer.getShort()
if(size < 0)
return null
@@ -170,7 +171,7 @@ object Utils extends Logging {
* @param string The string to write
* @param encoding The encoding in which to write the string
*/
- def writeShortString(buffer: ByteBuffer, string: String, encoding: String = "UTF-8"): Unit = {
+ def writeShortString(buffer: ByteBuffer, string: String, encoding: String = RequestOrResponse.DefaultCharset) {
if(string == null) {
buffer.putShort(-1)
} else if(string.length > Short.MaxValue) {
@@ -186,7 +187,7 @@ object Utils extends Logging {
* @param string The string to write
* @param encoding The encoding in which to write the string
*/
- def shortStringLength(string: String, encoding: String = "UTF-8"): Int = {
+ def shortStringLength(string: String, encoding: String = RequestOrResponse.DefaultCharset): Int = {
if(string == null) {
2
} else {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Thu Sep 27 19:20:08 2012
@@ -22,16 +22,15 @@ import kafka.utils._
import kafka.server.{KafkaConfig, KafkaServer}
import junit.framework.Assert._
import java.util.{Random, Properties}
-import collection.mutable.WrappedArray
import kafka.consumer.SimpleConsumer
import org.junit.{After, Before, Test}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import kafka.admin.CreateTopicCommand
-import kafka.api.{FetchRequestBuilder, OffsetRequest}
+import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
import kafka.utils.TestUtils._
-import kafka.common.UnknownTopicOrPartitionException
+import kafka.common.{ErrorMapping, TopicAndPartition, UnknownTopicOrPartitionException}
object LogOffsetTest {
val random = new Random()
@@ -67,12 +66,12 @@ class LogOffsetTest extends JUnit3Suite
@Test
def testGetOffsetsForUnknownTopic() {
- try {
- simpleConsumer.getOffsetsBefore("foo", 0, OffsetRequest.LatestTime, 10)
- fail("Should fail with UnknownTopicException since topic foo was never created")
- }catch {
- case e: UnknownTopicOrPartitionException => // this is ok
- }
+ val topicAndPartition = TopicAndPartition("foo", 0)
+ val request = OffsetRequest(
+ Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
+ val offsetResponse = simpleConsumer.getOffsetsBefore(request)
+ assertEquals(ErrorMapping.UnknownTopicOrPartitionCode,
+ offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
}
@Test
@@ -92,16 +91,17 @@ class LogOffsetTest extends JUnit3Suite
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
-
- val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.LatestTime, 10)
-
- val offsets = log.getOffsetsBefore(offsetRequest)
- assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
+ val offsets = log.getOffsetsBefore(OffsetRequest.LatestTime, 10)
+ assertEquals(Seq(240L, 216L, 108L, 0L), offsets)
waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
- val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
- OffsetRequest.LatestTime, 10)
- assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
+ val topicAndPartition = TopicAndPartition(topic, part)
+ val offsetRequest = OffsetRequest(
+ Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)),
+ replicaId = 0)
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ assertEquals(Seq(240L, 216L, 108L, 0L), consumerOffsets)
// try to fetch using latest offset
val fetchResponse = simpleConsumer.fetch(
@@ -124,8 +124,11 @@ class LogOffsetTest extends JUnit3Suite
var offsetChanged = false
for(i <- 1 to 14) {
- val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, 0,
- OffsetRequest.EarliestTime, 1)
+ val topicAndPartition = TopicAndPartition(topic, 0)
+ val offsetRequest =
+ OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
if(consumerOffsets(0) == 1) {
offsetChanged = true
@@ -153,14 +156,15 @@ class LogOffsetTest extends JUnit3Suite
time.sleep(20)
val now = time.milliseconds
- val offsetRequest = new OffsetRequest(topic, part, now, 10)
- val offsets = log.getOffsetsBefore(offsetRequest)
- println("Offsets = " + offsets.mkString(","))
- assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
+ val offsets = log.getOffsetsBefore(now, 10)
+ assertEquals(Seq(240L, 216L, 108L, 0L), offsets)
waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
- val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10)
- assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
+ val topicAndPartition = TopicAndPartition(topic, part)
+ val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ assertEquals(Seq(240L, 216L, 108L, 0L), consumerOffsets)
}
@Test
@@ -179,16 +183,17 @@ class LogOffsetTest extends JUnit3Suite
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
- val offsetRequest = new OffsetRequest(topic, part,
- OffsetRequest.EarliestTime, 10)
- val offsets = log.getOffsetsBefore(offsetRequest)
+ val offsets = log.getOffsetsBefore(OffsetRequest.EarliestTime, 10)
- assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+ assertEquals(Seq(0L), offsets)
waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
- val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
- OffsetRequest.EarliestTime, 10)
- assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+ val topicAndPartition = TopicAndPartition(topic, part)
+ val offsetRequest =
+ OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10)))
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ assertEquals(Seq(0L), consumerOffsets)
}
private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala Thu Sep 27 19:20:08 2012
@@ -109,12 +109,15 @@ object RpcDataSerializationTestUtils{
FetchResponse(1, 1, topicData)
}
- def createTestOffsetRequest: OffsetRequest = {
- new OffsetRequest(topic1, 1, 1000, 200)
- }
+ def createTestOffsetRequest = new OffsetRequest(
+ collection.immutable.Map(TopicAndPartition(topic1, 1) -> PartitionOffsetRequestInfo(1000, 200)),
+ replicaId = 0
+ )
def createTestOffsetResponse: OffsetResponse = {
- new OffsetResponse(1, Array(1000l, 2000l, 3000l, 4000l))
+ new OffsetResponse(OffsetRequest.CurrentVersion, collection.immutable.Map(
+ TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(ErrorMapping.NoError, Seq(1000l, 2000l, 3000l, 4000l)))
+ )
}
def createTestTopicMetadataRequest: TopicMetadataRequest = {
@@ -191,7 +194,7 @@ class RpcDataSerializationTest extends J
assertEquals("The original and deserialzed fetchRequest should be the same", fetchRequest,
deserializedFetchRequest)
- buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes())
+ buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes)
offsetRequest.writeTo(buffer)
buffer.rewind()
val deserializedOffsetRequest = OffsetRequest.readFrom(buffer)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala Thu Sep 27 19:20:08 2012
@@ -19,12 +19,16 @@ package kafka.server
import kafka.cluster.{Partition, Replica}
import kafka.log.Log
import kafka.message.{ByteBufferMessageSet, Message}
-import kafka.network.RequestChannel
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.utils.{Time, TestUtils, MockTime}
import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
-import kafka.api.{FetchRequest, FetchRequestBuilder}
+import kafka.api._
+import scala.Some
+import org.junit.Assert._
+import kafka.common.TopicAndPartition
+
class SimpleFetchTest extends JUnit3Suite {
@@ -88,7 +92,7 @@ class SimpleFetchTest extends JUnit3Suit
// This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
val goodFetch = new FetchRequestBuilder()
- .replicaId(FetchRequest.NonFollowerId)
+ .replicaId(Request.NonFollowerId)
.addFetch(topic, partitionId, 0, hw*2)
.build()
val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch)
@@ -98,6 +102,33 @@ class SimpleFetchTest extends JUnit3Suit
// make sure the log only reads bytes between 0->HW (5)
EasyMock.verify(log)
+
+ // Test offset request from non-replica
+ val topicAndPartition = TopicAndPartition(topic, partition.partitionId)
+ val offsetRequest = OffsetRequest(
+ Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+ val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest)
+
+ EasyMock.reset(logManager)
+ EasyMock.reset(replicaManager)
+
+ EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get)
+ EasyMock.expect(replicaManager.logManager).andReturn(logManager)
+ EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo))
+
+ EasyMock.replay(replicaManager)
+ EasyMock.replay(logManager)
+
+ apis.handleOffsetRequest(new RequestChannel.Request(processor = 0,
+ requestKey = 5,
+ buffer = offsetRequestBB,
+ startTimeNs = 1))
+ val offsetResponseBuffer = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
+ val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
+ EasyMock.verify(replicaManager)
+ EasyMock.verify(logManager)
+ assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size)
+ assertEquals(hw.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head)
}
/**
@@ -173,6 +204,34 @@ class SimpleFetchTest extends JUnit3Suit
* an offset of 15
*/
EasyMock.verify(log)
+
+ // Test offset request from replica
+ val topicAndPartition = TopicAndPartition(topic, partition.partitionId)
+ val offsetRequest = OffsetRequest(
+ Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)),
+ replicaId = followerReplicaId)
+ val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest)
+
+ EasyMock.reset(logManager)
+ EasyMock.reset(replicaManager)
+
+ EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get)
+ EasyMock.expect(replicaManager.logManager).andReturn(logManager)
+ EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo))
+
+ EasyMock.replay(replicaManager)
+ EasyMock.replay(logManager)
+
+ apis.handleOffsetRequest(new RequestChannel.Request(processor = 1,
+ requestKey = 5,
+ buffer = offsetRequestBB,
+ startTimeNs = 1))
+ val offsetResponseBuffer = requestChannel.receiveResponse(1).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
+ val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
+ EasyMock.verify(replicaManager)
+ EasyMock.verify(logManager)
+ assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size)
+ assertEquals(leo.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head)
}
private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala Thu Sep 27 19:20:08 2012
@@ -19,11 +19,12 @@ package kafka.perf
import java.net.URI
import java.text.SimpleDateFormat
-import kafka.api.{FetchRequestBuilder, OffsetRequest}
+import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
import kafka.consumer.SimpleConsumer
import kafka.utils._
import org.apache.log4j.Logger
-import kafka.message.ByteBufferMessageSet
+import kafka.common.TopicAndPartition
+
/**
* Performance test for the simple consumer
@@ -44,8 +45,11 @@ object SimpleConsumerPerformance {
val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize)
// reset to latest or smallest offset
- var offset: Long = if(config.fromLatest) consumer.getOffsetsBefore(config.topic, config.partition, OffsetRequest.LatestTime, 1).head
- else consumer.getOffsetsBefore(config.topic, config.partition, OffsetRequest.EarliestTime, 1).head
+ val topicAndPartition = TopicAndPartition(config.topic, config.partition)
+ val request = OffsetRequest(Map(
+ topicAndPartition -> PartitionOffsetRequestInfo(if (config.fromLatest) OffsetRequest.LatestTime else OffsetRequest.EarliestTime, 1)
+ ))
+ var offset: Long = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
val startMs = System.currentTimeMillis
var done = false