You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/06 17:23:13 UTC

svn commit: r1417977 - in /kafka/branches/0.8/core/src/main/scala/kafka: api/FetchResponse.scala api/TopicMetadataResponse.scala server/KafkaApis.scala

Author: jkreps
Date: Thu Dec  6 16:23:12 2012
New Revision: 1417977

URL: http://svn.apache.org/viewvc?rev=1417977&view=rev
Log:
KAFKA-642 Addressing Jun's follow up comments--(1) add parans to make statement more clear, (2) remove the initial offset from the fetch response since the message set itself now contains all offsets.


Modified:
    kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
    kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala

Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1417977&r1=1417976&r2=1417977&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Thu Dec  6 16:23:12 2012
@@ -27,29 +27,25 @@ import kafka.api.ApiUtils._
 object FetchResponsePartitionData {
   def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
     val error = buffer.getShort
-    val initialOffset = buffer.getLong
     val hw = buffer.getLong
     val messageSetSize = buffer.getInt
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new FetchResponsePartitionData(error, initialOffset,
-                                   hw, new ByteBufferMessageSet(messageSetBuffer))
+    new FetchResponsePartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer))
   }
 
   val headerSize =
     2 + /* error code */
-    8 + /* initialOffset */
     8 + /* high watermark */
     4 /* messageSetSize */
 }
 
-case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError,
-                                      initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
+case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError, hw: Long = -1L, messages: MessageSet) {
 
   val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes
 
-  def this(messages: MessageSet) = this(ErrorMapping.NoError, 0L, -1L, messages)
+  def this(messages: MessageSet) = this(ErrorMapping.NoError, -1L, messages)
   
 }
 
@@ -63,7 +59,6 @@ class PartitionDataSend(val partitionId:
   private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + FetchResponsePartitionData.headerSize)
   buffer.putInt(partitionId)
   buffer.putShort(partitionData.error)
-  buffer.putLong(partitionData.initialOffset)
   buffer.putLong(partitionData.hw)
   buffer.putInt(partitionData.messages.sizeInBytes)
   buffer.rewind()

Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala?rev=1417977&r1=1417976&r2=1417977&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala Thu Dec  6 16:23:12 2012
@@ -56,7 +56,7 @@ case class TopicMetadataResponse(version
     
   def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = {
     val parts = topicsMetadata.flatMap(_.partitionsMetadata)
-    val brokers = parts.flatMap(_.replicas) ++ parts.map(_.leader).collect{case Some(l) => l}
+    val brokers = (parts.flatMap(_.replicas)) ++ (parts.map(_.leader).collect{case Some(l) => l})
     brokers.map(b => (b.id, b)).toMap
   }
 }

Modified: kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1417977&r1=1417976&r2=1417977&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Dec  6 16:23:12 2012
@@ -77,7 +77,7 @@ class KafkaApis(val requestChannel: Requ
             val apiRequest = request.requestObj.asInstanceOf[FetchRequest]
             val fetchResponsePartitionData = apiRequest.requestInfo.map {
               case (topicAndPartition, data) =>
-                (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), 0, -1, null))
+                (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null))
             }
             val errorResponse = FetchResponse(apiRequest.versionId, apiRequest.correlationId, fetchResponsePartitionData)
             requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
@@ -326,19 +326,18 @@ class KafkaApis(val requestChannel: Requ
             BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
             BrokerTopicStats.getBrokerAllTopicStats.bytesOutRate.mark(messages.sizeInBytes)
             if (!isFetchFromFollower) {
-              new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark, messages)
+              new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
             } else {
               debug("Leader %d for topic %s partition %d received fetch request from follower %d"
                             .format(brokerId, topic, partition, fetchRequest.replicaId))
-              new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark, messages)
+              new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
             }
           } catch {
             case t: Throwable =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               BrokerTopicStats.getBrokerAllTopicStats.failedFetchRequestRate.mark()
               error("error when processing request " + (topic, partition, offset, fetchSize), t)
-              new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
-                                             offset, -1L, MessageSet.Empty)
+              new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
           }
         (TopicAndPartition(topic, partition), partitionData)
     }