You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/10/27 16:24:28 UTC

svn commit: r1189773 - in /incubator/kafka/trunk/clients/go: ./ src/ tools/consumer/ tools/publisher/

Author: nehanarkhede
Date: Thu Oct 27 14:24:27 2011
New Revision: 1189773

URL: http://svn.apache.org/viewvc?rev=1189773&view=rev
Log:
KAFKA 158 Support for compression in go clients; patched by jeffregydamick; reviewed by nehanarkhede

Added:
    incubator/kafka/trunk/clients/go/src/payload_codec.go
Modified:
    incubator/kafka/trunk/clients/go/Makefile
    incubator/kafka/trunk/clients/go/README.md
    incubator/kafka/trunk/clients/go/kafka_test.go
    incubator/kafka/trunk/clients/go/src/consumer.go
    incubator/kafka/trunk/clients/go/src/converts.go
    incubator/kafka/trunk/clients/go/src/kafka.go
    incubator/kafka/trunk/clients/go/src/message.go
    incubator/kafka/trunk/clients/go/src/publisher.go
    incubator/kafka/trunk/clients/go/src/request.go
    incubator/kafka/trunk/clients/go/src/timing.go
    incubator/kafka/trunk/clients/go/tools/consumer/consumer.go
    incubator/kafka/trunk/clients/go/tools/publisher/publisher.go

Modified: incubator/kafka/trunk/clients/go/Makefile
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/Makefile?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/Makefile (original)
+++ incubator/kafka/trunk/clients/go/Makefile Thu Oct 27 14:24:27 2011
@@ -6,6 +6,7 @@ GOFILES=\
 	src/message.go\
 	src/converts.go\
 	src/consumer.go\
+	src/payload_codec.go\
 	src/publisher.go\
 	src/timing.go\
 	src/request.go\

Modified: incubator/kafka/trunk/clients/go/README.md
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/README.md?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/README.md (original)
+++ incubator/kafka/trunk/clients/go/README.md Thu Oct 27 14:24:27 2011
@@ -1,11 +1,16 @@
 # Kafka.go - Publisher & Consumer for Kafka in Go #
 
-Kafka is a distributed publish-subscribe messaging system: (http://sna-projects.com/kafka/)
+Kafka is a distributed publish-subscribe messaging system: (http://incubator.apache.org/kafka/)
 
 Go language: (http://golang.org/) <br/>
 
 ## Get up and running ##
 
+Install go: <br/>
+For more info see: http://golang.org/doc/install.html#install 
+
+Make sure to set your GOROOT properly (http://golang.org/doc/install.html#environment).
+
 Install kafka.go package: <br/>
 <code>make install</code>
 <br/>
@@ -13,7 +18,7 @@ Make the tools (publisher & consumer) <b
 <code>make tools</code>
 <br/>
 Start zookeeper, Kafka server <br/>
-For more info on Kafka, see: http://sna-projects.com/kafka/quickstart.php
+For more info on Kafka, see: http://incubator.apache.org/kafka/quickstart.html
 
 
 
@@ -48,6 +53,17 @@ broker.Publish(kafka.NewMessage([]byte("
 
 </code></pre>
 
+
+### Publishing Compressed Messages ###
+
+<pre><code>
+
+broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
+broker.Publish(kafka.NewCompressedMessage([]byte("tesing 1 2 3")))
+
+</code></pre>
+
+
 ### Consumer ###
 
 <pre><code>

Modified: incubator/kafka/trunk/clients/go/kafka_test.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/kafka_test.go?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/kafka_test.go (original)
+++ incubator/kafka/trunk/clients/go/kafka_test.go Thu Oct 27 14:24:27 2011
@@ -20,20 +20,19 @@
  *  of their respective owners.
  */
 
-
 package kafka
 
 import (
   "testing"
   //"fmt"
   "bytes"
-  "container/list"
+  "compress/gzip"
 )
 
 func TestMessageCreation(t *testing.T) {
   payload := []byte("testing")
   msg := NewMessage(payload)
-  if msg.magic != 0 {
+  if msg.magic != 1 {
     t.Errorf("magic incorrect")
     t.Fail()
   }
@@ -45,31 +44,183 @@ func TestMessageCreation(t *testing.T) {
   }
 }
 
+func TestMagic0MessageEncoding(t *testing.T) {
+  // generated by kafka-rb:
+  // test the old message format
+  expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
+  length, msgsDecoded := Decode(expected, DefaultCodecsMap)
+
+  if length == 0 || msgsDecoded == nil {
+    t.Fail()
+  }
+  msgDecoded := msgsDecoded[0]
+
+  payload := []byte("testing")
+  if !bytes.Equal(payload, msgDecoded.payload) {
+    t.Fatal("bytes not equal")
+  }
+  chksum := []byte{0xE8, 0xF3, 0x5A, 0x06}
+  if !bytes.Equal(chksum, msgDecoded.checksum[:]) {
+    t.Fatal("checksums do not match")
+  }
+  if msgDecoded.magic != 0 {
+    t.Fatal("magic incorrect")
+  }
+}
 
 func TestMessageEncoding(t *testing.T) {
+
   payload := []byte("testing")
   msg := NewMessage(payload)
 
   // generated by kafka-rb:
-  expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
+  expected := []byte{0x00, 0x00, 0x00, 0x0d, 0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
   if !bytes.Equal(expected, msg.Encode()) {
-    t.Fail()
+    t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode())
   }
 
   // verify round trip
-  msgDecoded := Decode(msg.Encode())
+  length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode())
+
+  if length == 0 || msgsDecoded == nil {
+    t.Fatal("message is nil")
+  }
+  msgDecoded := msgsDecoded[0]
+
   if !bytes.Equal(msgDecoded.payload, payload) {
-    t.Fail()
+    t.Fatal("bytes not equal")
+  }
+  chksum := []byte{0xE8, 0xF3, 0x5A, 0x06}
+  if !bytes.Equal(chksum, msgDecoded.checksum[:]) {
+    t.Fatal("checksums do not match")
+  }
+  if msgDecoded.magic != 1 {
+    t.Fatal("magic incorrect")
+  }
+}
+
+func TestCompressedMessageEncodingCompare(t *testing.T) {
+  payload := []byte("testing")
+  uncompressedMsgBytes := NewMessage(payload).Encode()
+  
+  msgGzipBytes := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID]).Encode()
+  msgDefaultBytes := NewCompressedMessage(payload).Encode()
+  if !bytes.Equal(msgDefaultBytes, msgGzipBytes) {
+    t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", msgDefaultBytes, msgGzipBytes)
+  }
+}
+
+func TestCompressedMessageEncoding(t *testing.T) {
+  payload := []byte("testing")
+  uncompressedMsgBytes := NewMessage(payload).Encode()
+  
+  msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID])
+
+  expectedPayload := []byte{0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04,
+    0xFF, 0x62, 0x60, 0x60, 0xE0, 0x65, 0x64, 0x78, 0xF1, 0x39, 0x8A,
+    0xAD, 0x24, 0xB5, 0xB8, 0x24, 0x33, 0x2F, 0x1D, 0x10, 0x00, 0x00,
+    0xFF, 0xFF, 0x0C, 0x6A, 0x82, 0x91, 0x11, 0x00, 0x00, 0x00}
+
+  expectedHeader := []byte{0x00, 0x00, 0x00, 0x2F, 0x01, 0x01, 0x07, 0xFD, 0xC3, 0x76}
+
+  expected := make([]byte, len(expectedHeader)+len(expectedPayload))
+  n := copy(expected, expectedHeader)
+  copy(expected[n:], expectedPayload)
+
+  if msg.compression != 1 {
+    t.Fatalf("expected compression: 1 but got: %b", msg.compression)
+  }
+
+  zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload))
+  uncompressed := make([]byte, 100)
+  n, _ = zipper.Read(uncompressed)
+  uncompressed = uncompressed[:n]
+  zipper.Close()
+
+  if !bytes.Equal(uncompressed, uncompressedMsgBytes) {
+    t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", uncompressed, uncompressedMsgBytes)
+  }
+
+  if !bytes.Equal(expected, msg.Encode()) {
+    t.Fatalf("expected: % X\n but got: % X", expected, msg.Encode())
+  }
+
+  // verify round trip
+  length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap)
+
+  if length == 0 || msgsDecoded == nil {
+    t.Fatal("message is nil")
   }
+  msgDecoded := msgsDecoded[0]
+
   if !bytes.Equal(msgDecoded.payload, payload) {
-    t.Fail()
+    t.Fatal("bytes not equal")
   }
   chksum := []byte{0xE8, 0xF3, 0x5A, 0x06}
-  if !bytes.Equal(msgDecoded.checksum[:], chksum) {
-    t.Fail()
+  if !bytes.Equal(chksum, msgDecoded.checksum[:]) {
+    t.Fatalf("checksums do not match, expected: % X but was: % X", 
+      chksum, msgDecoded.checksum[:])
   }
-  if msgDecoded.magic != 0 {
-    t.Fail()
+  if msgDecoded.magic != 1 {
+    t.Fatal("magic incorrect")
+  }
+}
+
+func TestLongCompressedMessageRoundTrip(t *testing.T) {
+  payloadBuf := bytes.NewBuffer([]byte{})
+  // make the test bigger than buffer allocated in the Decode
+  for i := 0; i < 15; i++ {
+    payloadBuf.Write([]byte("testing123 "))
+  }
+
+  uncompressedMsgBytes := NewMessage(payloadBuf.Bytes()).Encode()
+  msg := NewMessageWithCodec(uncompressedMsgBytes, DefaultCodecsMap[GZIP_COMPRESSION_ID])
+  
+  zipper, _ := gzip.NewReader(bytes.NewBuffer(msg.payload))
+  uncompressed := make([]byte, 200)
+  n, _ := zipper.Read(uncompressed)
+  uncompressed = uncompressed[:n]
+  zipper.Close()
+
+  if !bytes.Equal(uncompressed, uncompressedMsgBytes) {
+    t.Fatalf("uncompressed: % X \npayload: % X bytes not equal", 
+      uncompressed, uncompressedMsgBytes)
+  }
+
+  // verify round trip
+  length, msgsDecoded := Decode(msg.Encode(), DefaultCodecsMap)
+
+  if length == 0 || msgsDecoded == nil {
+    t.Fatal("message is nil")
+  }
+  msgDecoded := msgsDecoded[0]
+
+  if !bytes.Equal(msgDecoded.payload, payloadBuf.Bytes()) {
+    t.Fatal("bytes not equal")
+  }
+  if msgDecoded.magic != 1 {
+    t.Fatal("magic incorrect")
+  }
+}
+
+func TestMultipleCompressedMessages(t *testing.T) {
+  msgs := []*Message{NewMessage([]byte("testing")), 
+    NewMessage([]byte("multiple")), 
+    NewMessage([]byte("messages")),
+  }
+  msg := NewCompressedMessages(msgs...)
+  
+  length, msgsDecoded := DecodeWithDefaultCodecs(msg.Encode())
+  if length == 0 || msgsDecoded == nil {
+    t.Fatal("msgsDecoded is nil")
+  }
+  
+  // make sure the decompressed messages match what was put in
+  for index, decodedMsg := range msgsDecoded {
+    if !bytes.Equal(msgs[index].payload, decodedMsg.payload) {
+      t.Fatalf("Payload doesn't match, expected: % X but was: % X\n",
+        msgs[index].payload, decodedMsg.payload)
+    }
   }
 }
 
@@ -88,24 +239,22 @@ func TestRequestHeaderEncoding(t *testin
   }
 }
 
-
 func TestPublishRequestEncoding(t *testing.T) {
   payload := []byte("testing")
   msg := NewMessage(payload)
 
-  messages := list.New()
-  messages.PushBack(msg)
   pubBroker := NewBrokerPublisher("localhost:9092", "test", 0)
-  request := pubBroker.broker.EncodePublishRequest(messages)
+  request := pubBroker.broker.EncodePublishRequest(msg)
 
   // generated by kafka-rb:
-  expected := []byte{0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74,
-    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x0c,
-    0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
+  expected := []byte{0x00, 0x00, 0x00, 0x21, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x11, 0x00, 0x00, 0x00, 0x0d,
+    /* magic  comp  ......  chksum ....     ..  payload .. */
+    0x01, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}
 
   if !bytes.Equal(expected, request) {
     t.Errorf("expected length: %d but got: %d", len(expected), len(request))
-    t.Errorf("expected: %X\n but got: %X", expected, request)
+    t.Errorf("expected: % X\n but got: % X", expected, request)
     t.Fail()
   }
 }
@@ -122,7 +271,7 @@ func TestConsumeRequestEncoding(t *testi
 
   if !bytes.Equal(expected, request) {
     t.Errorf("expected length: %d but got: %d", len(expected), len(request))
-    t.Errorf("expected: %X\n but got: %X", expected, request)
+    t.Errorf("expected: % X\n but got: % X", expected, request)
     t.Fail()
   }
 }

Modified: incubator/kafka/trunk/clients/go/src/consumer.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/consumer.go?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/consumer.go (original)
+++ incubator/kafka/trunk/clients/go/src/consumer.go Thu Oct 27 14:24:27 2011
@@ -34,6 +34,7 @@ type BrokerConsumer struct {
   broker  *Broker
   offset  uint64
   maxSize uint32
+  codecs  map[byte]PayloadCodec
 }
 
 // Create a new broker consumer
@@ -45,7 +46,8 @@ type BrokerConsumer struct {
 func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer {
   return &BrokerConsumer{broker: newBroker(hostname, topic, partition),
     offset:  offset,
-    maxSize: maxSize}
+    maxSize: maxSize,
+    codecs:  DefaultCodecsMap}
 }
 
 // Simplified consumer that defaults the offset and maxSize to 0.
@@ -55,9 +57,18 @@ func NewBrokerConsumer(hostname string, 
 func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer {
   return &BrokerConsumer{broker: newBroker(hostname, topic, partition),
     offset:  0,
-    maxSize: 0}
+    maxSize: 0,
+    codecs:  DefaultCodecsMap}
 }
 
+// Add Custom Payload Codecs for Consumer Decoding
+// payloadCodecs - an array of PayloadCodec implementations
+func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) {
+  // merge to the default map, so one 'could' override the default codecs..
+  for k, v := range codecsMap(payloadCodecs) {
+    consumer.codecs[k] = v, true
+  }
+}
 
 func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) {
   conn, err := consumer.broker.connect()
@@ -77,14 +88,15 @@ func (consumer *BrokerConsumer) ConsumeO
       if err != nil {
         if err != os.EOF {
           log.Println("Fatal Error: ", err)
+          panic(err)
         }
+        quit <- true // force quit
         break
       }
       time.Sleep(pollTimeoutMs * 1000000)
     }
     done <- true
   }()
-
   // wait to be told to stop..
   <-quit
   conn.Close()
@@ -111,7 +123,6 @@ func (consumer *BrokerConsumer) Consume(
   return num, err
 }
 
-
 func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) {
   _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize))
   if err != nil {
@@ -129,14 +140,19 @@ func (consumer *BrokerConsumer) consumeW
     // parse out the messages
     var currentOffset uint64 = 0
     for currentOffset <= uint64(length-4) {
-      msg := Decode(payload[currentOffset:])
-      if msg == nil {
+      totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs)
+      if msgs == nil {
         return num, os.NewError("Error Decoding Message")
       }
-      msg.offset = consumer.offset + currentOffset
-      currentOffset += uint64(4 + msg.totalLength)
-      handlerFunc(msg)
-      num += 1
+      msgOffset := consumer.offset + currentOffset
+      for _, msg := range msgs {
+        // update all of the messages offset
+        // multiple messages can be at the same offset (compressed for example)
+        msg.offset = msgOffset
+        handlerFunc(&msg)
+        num += 1
+      }
+      currentOffset += uint64(4 + totalLength)
     }
     // update the broker's offset for next consumption
     consumer.offset += currentOffset
@@ -145,7 +161,6 @@ func (consumer *BrokerConsumer) consumeW
   return num, err
 }
 
-
 // Get a list of valid offsets (up to maxNumOffsets) before the given time, where 
 // time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available)
 // The result is a list of offsets, in descending order.

Modified: incubator/kafka/trunk/clients/go/src/converts.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/converts.go?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/converts.go (original)
+++ incubator/kafka/trunk/clients/go/src/converts.go Thu Oct 27 14:24:27 2011
@@ -22,12 +22,10 @@
 
 package kafka
 
-
 import (
   "encoding/binary"
 )
 
-
 func uint16bytes(value int) []byte {
   result := make([]byte, 2)
   binary.BigEndian.PutUint16(result, uint16(value))

Modified: incubator/kafka/trunk/clients/go/src/kafka.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/kafka.go?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/kafka.go (original)
+++ incubator/kafka/trunk/clients/go/src/kafka.go Thu Oct 27 14:24:27 2011
@@ -28,17 +28,14 @@ import (
   "os"
   "fmt"
   "encoding/binary"
-  "strconv"
   "io"
   "bufio"
 )
 
 const (
-  MAGIC_DEFAULT = 0
-  NETWORK       = "tcp"
+  NETWORK = "tcp"
 )
 
-
 type Broker struct {
   topic     string
   partition int
@@ -51,7 +48,6 @@ func newBroker(hostname string, topic st
     hostname:  hostname}
 }
 
-
 func (b *Broker) connect() (conn *net.TCPConn, error os.Error) {
   raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname)
   if err != nil {
@@ -91,7 +87,9 @@ func (b *Broker) readResponse(conn *net.
 
   errorCode := binary.BigEndian.Uint16(messages[0:2])
   if errorCode != 0 {
-    return 0, []byte{}, os.NewError(strconv.Uitoa(uint(errorCode)))
+    log.Println("errorCode: ", errorCode)
+    return 0, []byte{}, os.NewError(
+      fmt.Sprintf("Broker Response Error: %d", errorCode))
   }
   return expectedLength, messages[2:], nil
 }

Modified: incubator/kafka/trunk/clients/go/src/message.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/message.go?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/message.go (original)
+++ incubator/kafka/trunk/clients/go/src/message.go Thu Oct 27 14:24:27 2011
@@ -20,7 +20,6 @@
  *  of their respective owners.
  */
 
-
 package kafka
 
 import (
@@ -30,13 +29,21 @@ import (
   "log"
 )
 
+const (
+  // Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression
+  MAGIC_DEFAULT = 1
+  // magic + compression + chksum
+  NO_LEN_HEADER_SIZE = 1 + 1 + 4
+)
 
 type Message struct {
   magic       byte
+  compression byte
   checksum    [4]byte
   payload     []byte
   offset      uint64 // only used after decoding
-  totalLength uint32 // total length of the message (decoding)
+  totalLength uint32 // total length of the raw message (from decoding)
+
 }
 
 func (m *Message) Offset() uint64 {
@@ -51,57 +58,125 @@ func (m *Message) PayloadString() string
   return string(m.payload)
 }
 
-func NewMessage(payload []byte) *Message {
+func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message {
   message := &Message{}
   message.magic = byte(MAGIC_DEFAULT)
-  binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(payload))
-  message.payload = payload
+  message.compression = codec.Id()
+  message.payload = codec.Encode(payload)
+  binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(message.payload))
   return message
 }
 
-// MESSAGE SET: <MESSAGE LENGTH: uint32><MAGIC: 1 byte><CHECKSUM: uint32><MESSAGE PAYLOAD: bytes>
+// Default is is create a message with no compression
+func NewMessage(payload []byte) *Message {
+  return NewMessageWithCodec(payload, DefaultCodecsMap[NO_COMPRESSION_ID])
+}
+
+// Create a Message using the default compression method (gzip)
+func NewCompressedMessage(payload []byte) *Message {
+  return NewCompressedMessages(NewMessage(payload))
+}
+
+func NewCompressedMessages(messages ...*Message) *Message {
+  buf := bytes.NewBuffer([]byte{})
+  for _, message := range messages {
+    buf.Write(message.Encode())
+  }
+  return NewMessageWithCodec(buf.Bytes(), DefaultCodecsMap[GZIP_COMPRESSION_ID])
+}
+
+// MESSAGE SET: <MESSAGE LENGTH: uint32><MAGIC: 1 byte><COMPRESSION: 1 byte><CHECKSUM: uint32><MESSAGE PAYLOAD: bytes>
 func (m *Message) Encode() []byte {
-  msgLen := 1 + 4 + len(m.payload)
+  msgLen := NO_LEN_HEADER_SIZE + len(m.payload)
   msg := make([]byte, 4+msgLen)
   binary.BigEndian.PutUint32(msg[0:], uint32(msgLen))
   msg[4] = m.magic
-  copy(msg[5:], m.checksum[0:])
-  copy(msg[9:], m.payload)
+  msg[5] = m.compression
+
+  copy(msg[6:], m.checksum[0:])
+  copy(msg[10:], m.payload)
+
   return msg
 }
 
-func Decode(packet []byte) *Message {
+func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message) {
+  return Decode(packet, DefaultCodecsMap)
+}
+
+func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message) {
+  messages := []Message{}
+
+  length, message := decodeMessage(packet, payloadCodecsMap)
+
+  if length > 0 && message != nil {
+    if message.compression != NO_COMPRESSION_ID {
+      // wonky special case for compressed messages having embedded messages
+      payloadLen := uint32(len(message.payload))
+      messageLenLeft := payloadLen
+      for messageLenLeft > 0 {
+        start := payloadLen - messageLenLeft
+        innerLen, innerMsg := decodeMessage(message.payload[start:], payloadCodecsMap)
+        messageLenLeft = messageLenLeft - innerLen - 4 // message length uint32
+        messages = append(messages, *innerMsg)
+      }
+    } else {
+      messages = append(messages, *message)
+    }
+  }
+
+  return length, messages
+}
+
+func decodeMessage(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, *Message) {
   length := binary.BigEndian.Uint32(packet[0:])
   if length > uint32(len(packet[4:])) {
     log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:]))
-    return nil
+    return 0, nil
   }
   msg := Message{}
   msg.totalLength = length
   msg.magic = packet[4]
-  copy(msg.checksum[:], packet[5:9])
-  payloadLength := length - 1 - 4
-  msg.payload = packet[9 : 9+payloadLength]
+
+  rawPayload := []byte{}
+  if msg.magic == 0 {
+    msg.compression = byte(0)
+    copy(msg.checksum[:], packet[5:9])
+    payloadLength := length - 1 - 4
+    rawPayload = packet[9 : 9+payloadLength]
+  } else if msg.magic == MAGIC_DEFAULT {
+    msg.compression = packet[5]
+    copy(msg.checksum[:], packet[6:10])
+    payloadLength := length - NO_LEN_HEADER_SIZE
+    rawPayload = packet[10 : 10+payloadLength]
+  } else {
+    log.Printf("incorrect magic, expected: %X was: %X\n", MAGIC_DEFAULT, msg.magic)
+    return 0, nil
+  }
 
   payloadChecksum := make([]byte, 4)
-  binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(msg.payload))
+  binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(rawPayload))
   if !bytes.Equal(payloadChecksum, msg.checksum[:]) {
-    log.Printf("checksum mismatch, expected: %X was: %X\n", payloadChecksum, msg.checksum[:])
-    return nil
+    msg.Print()
+    log.Printf("checksum mismatch, expected: % X was: % X\n", payloadChecksum, msg.checksum[:])
+    return 0, nil
   }
-  return &msg
+  msg.payload = payloadCodecsMap[msg.compression].Decode(rawPayload)
+
+  return length, &msg
 }
 
 func (msg *Message) Print() {
   log.Println("----- Begin Message ------")
   log.Printf("magic: %X\n", msg.magic)
+  log.Printf("compression: %X\n", msg.compression)
   log.Printf("checksum: %X\n", msg.checksum)
   if len(msg.payload) < 1048576 { // 1 MB 
-    log.Printf("payload: %X\n", msg.payload)
+    log.Printf("payload: % X\n", msg.payload)
     log.Printf("payload(string): %s\n", msg.PayloadString())
   } else {
     log.Printf("long payload, length: %d\n", len(msg.payload))
   }
+  log.Printf("length: %d\n", msg.totalLength)
   log.Printf("offset: %d\n", msg.offset)
   log.Println("----- End Message ------")
 }

Added: incubator/kafka/trunk/clients/go/src/payload_codec.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/payload_codec.go?rev=1189773&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/src/payload_codec.go (added)
+++ incubator/kafka/trunk/clients/go/src/payload_codec.go Thu Oct 27 14:24:27 2011
@@ -0,0 +1,116 @@
+/*
+ *  Copyright (c) 2011 NeuStar, Inc.
+ *  All rights reserved.  
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at 
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ *  NeuStar, the Neustar logo and related names and logos are registered
+ *  trademarks, service marks or tradenames of NeuStar, Inc. All other 
+ *  product names, company names, marks, logos and symbols may be trademarks
+ *  of their respective owners.
+ */
+
+package kafka
+
+import (
+  "bytes"
+  "compress/gzip"
+  //  "log"
+)
+
+const (
+  NO_COMPRESSION_ID   = 0
+  GZIP_COMPRESSION_ID = 1
+)
+
+type PayloadCodec interface {
+
+  // the 1 byte id of the codec
+  Id() byte
+
+  // encoder interface for compression implementation
+  Encode(data []byte) []byte
+
+  // decoder interface for decompression implementation
+  Decode(data []byte) []byte
+}
+
+// Default Codecs
+
+var DefaultCodecs = []PayloadCodec{
+  new(NoCompressionPayloadCodec),
+  new(GzipPayloadCodec),
+}
+
+var DefaultCodecsMap = codecsMap(DefaultCodecs)
+
+func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec {
+  payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs))
+  for _, c := range payloadCodecs {
+    payloadCodecsMap[c.Id()] = c, true
+  }
+  return payloadCodecsMap
+}
+
+// No compression codec, noop
+
+type NoCompressionPayloadCodec struct {
+
+}
+
+func (codec *NoCompressionPayloadCodec) Id() byte {
+  return NO_COMPRESSION_ID
+}
+
+func (codec *NoCompressionPayloadCodec) Encode(data []byte) []byte {
+  return data
+}
+
+func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte {
+  return data
+}
+
+// Gzip Codec
+
+type GzipPayloadCodec struct {
+
+}
+
+func (codec *GzipPayloadCodec) Id() byte {
+  return GZIP_COMPRESSION_ID
+}
+
+func (codec *GzipPayloadCodec) Encode(data []byte) []byte {
+  buf := bytes.NewBuffer([]byte{})
+  zipper, _ := gzip.NewWriterLevel(buf, gzip.BestSpeed)
+  zipper.Write(data)
+  zipper.Close()
+  return buf.Bytes()
+}
+
+func (codec *GzipPayloadCodec) Decode(data []byte) []byte {
+  buf := bytes.NewBuffer([]byte{})
+  zipper, _ := gzip.NewReader(bytes.NewBuffer(data))
+  unzipped := make([]byte, 100)
+  for {
+    n, err := zipper.Read(unzipped)
+    if n > 0 && err == nil {
+      buf.Write(unzipped[0:n])
+    } else {
+      break
+    }
+  }
+
+  zipper.Close()
+  return buf.Bytes()
+}

Modified: incubator/kafka/trunk/clients/go/src/publisher.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/publisher.go?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/publisher.go (original)
+++ incubator/kafka/trunk/clients/go/src/publisher.go Thu Oct 27 14:24:27 2011
@@ -23,11 +23,9 @@
 package kafka
 
 import (
-  "container/list"
   "os"
 )
 
-
 type BrokerPublisher struct {
   broker *Broker
 }
@@ -36,21 +34,19 @@ func NewBrokerPublisher(hostname string,
   return &BrokerPublisher{broker: newBroker(hostname, topic, partition)}
 }
 
-
 func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) {
-  messages := list.New()
-  messages.PushBack(message)
-  return b.BatchPublish(messages)
+  return b.BatchPublish(message)
 }
 
-func (b *BrokerPublisher) BatchPublish(messages *list.List) (int, os.Error) {
+func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) {
   conn, err := b.broker.connect()
   if err != nil {
     return -1, err
   }
   defer conn.Close()
   // TODO: MULTIPRODUCE
-  num, err := conn.Write(b.broker.EncodePublishRequest(messages))
+  request := b.broker.EncodePublishRequest(messages...)
+  num, err := conn.Write(request)
   if err != nil {
     return -1, err
   }

Modified: incubator/kafka/trunk/clients/go/src/request.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/request.go?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/request.go (original)
+++ incubator/kafka/trunk/clients/go/src/request.go Thu Oct 27 14:24:27 2011
@@ -25,22 +25,19 @@ package kafka
 import (
   "encoding/binary"
   "bytes"
-  "container/list"
 )
 
-
 type RequestType uint16
 
 // Request Types
 const (
   REQUEST_PRODUCE      RequestType = 0
-  REQUEST_FETCH        = 1
-  REQUEST_MULTIFETCH   = 2
-  REQUEST_MULTIPRODUCE = 3
-  REQUEST_OFFSETS      = 4
+  REQUEST_FETCH                    = 1
+  REQUEST_MULTIFETCH               = 2
+  REQUEST_MULTIPRODUCE             = 3
+  REQUEST_OFFSETS                  = 4
 )
 
-
 // Request Header: <REQUEST_SIZE: uint32><REQUEST_TYPE: uint16><TOPIC SIZE: uint16><TOPIC: bytes><PARTITION: uint32>
 func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer {
   request := bytes.NewBuffer([]byte{})
@@ -70,7 +67,6 @@ func (b *Broker) EncodeOffsetRequest(tim
   return request.Bytes()
 }
 
-
 // <Request Header><OFFSET: uint64><MAX SIZE: uint32>
 func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte {
   request := b.EncodeRequestHeader(REQUEST_FETCH)
@@ -83,9 +79,8 @@ func (b *Broker) EncodeConsumeRequest(of
   return request.Bytes()
 }
 
-
 // <Request Header><MESSAGE SET SIZE: uint32><MESSAGE SETS>
-func (b *Broker) EncodePublishRequest(messages *list.List) []byte {
+func (b *Broker) EncodePublishRequest(messages ...*Message) []byte {
   // 4 + 2 + 2 + topicLength + 4 + 4
   request := b.EncodeRequestHeader(REQUEST_PRODUCE)
 
@@ -93,8 +88,7 @@ func (b *Broker) EncodePublishRequest(me
   request.Write(uint32bytes(0)) // placeholder message len
 
   written := 0
-  for element := messages.Front(); element != nil; element = element.Next() {
-    message := element.Value.(*Message)
+  for _, message := range messages {
     wrote, _ := request.Write(message.Encode())
     written += wrote
   }
@@ -103,6 +97,5 @@ func (b *Broker) EncodePublishRequest(me
   binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written))
   // now add the size of the whole to the first uint32
   encodeRequestSize(request)
-
   return request.Bytes()
 }

Modified: incubator/kafka/trunk/clients/go/src/timing.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/timing.go?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/timing.go (original)
+++ incubator/kafka/trunk/clients/go/src/timing.go Thu Oct 27 14:24:27 2011
@@ -20,7 +20,6 @@
  *  of their respective owners.
  */
 
-
 package kafka
 
 import (

Modified: incubator/kafka/trunk/clients/go/tools/consumer/consumer.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/consumer/consumer.go?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/consumer/consumer.go (original)
+++ incubator/kafka/trunk/clients/go/tools/consumer/consumer.go Thu Oct 27 14:24:27 2011
@@ -20,7 +20,6 @@
  *  of their respective owners.
  */
 
-
 package main
 
 import (
@@ -53,7 +52,6 @@ func init() {
   flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout")
 }
 
-
 func main() {
   flag.Parse()
   fmt.Println("Consuming Messages :")
@@ -87,7 +85,7 @@ func main() {
     go func() {
       for {
         sig := <-signal.Incoming
-        if sig.(signal.UnixSignal) == syscall.SIGINT {
+        if sig.(os.UnixSignal) == syscall.SIGINT {
           quit <- true
         }
       }

Modified: incubator/kafka/trunk/clients/go/tools/publisher/publisher.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/publisher/publisher.go?rev=1189773&r1=1189772&r2=1189773&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/publisher/publisher.go (original)
+++ incubator/kafka/trunk/clients/go/tools/publisher/publisher.go Thu Oct 27 14:24:27 2011
@@ -34,6 +34,7 @@ var topic string
 var partition int
 var message string
 var messageFile string
+var compress bool
 
 func init() {
   flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server")
@@ -41,6 +42,7 @@ func init() {
   flag.IntVar(&partition, "partition", 0, "partition to publish to")
   flag.StringVar(&message, "message", "", "message to publish")
   flag.StringVar(&messageFile, "messagefile", "", "read message from this file")
+  flag.BoolVar(&compress, "compress", false, "compress the messages published")
 }
 
 func main() {
@@ -64,12 +66,24 @@ func main() {
     payload := make([]byte, stat.Size)
     file.Read(payload)
     timing := kafka.StartTiming("Sending")
-    broker.Publish(kafka.NewMessage(payload))
+
+    if compress {
+      broker.Publish(kafka.NewCompressedMessage(payload))
+    } else {
+      broker.Publish(kafka.NewMessage(payload))
+    }
+
     timing.Print()
     file.Close()
   } else {
     timing := kafka.StartTiming("Sending")
-    broker.Publish(kafka.NewMessage([]byte(message)))
+
+    if compress {
+      broker.Publish(kafka.NewCompressedMessage([]byte(message)))
+    } else {
+      broker.Publish(kafka.NewMessage([]byte(message)))
+    }
+
     timing.Print()
   }
 }