You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/03/15 01:39:42 UTC

svn commit: r1300801 - in /incubator/kafka/trunk/clients/go: src/consumer.go src/kafka.go src/message.go src/payload_codec.go src/publisher.go src/request.go src/timing.go tools/consumer/consumer.go tools/offsets/offsets.go tools/publisher/publisher.go

Author: junrao
Date: Thu Mar 15 00:39:42 2012
New Revision: 1300801

URL: http://svn.apache.org/viewvc?rev=1300801&view=rev
Log:
reverting previous commit for KAFKA-296 because patch didn't apply cleanly

Modified:
    incubator/kafka/trunk/clients/go/src/consumer.go
    incubator/kafka/trunk/clients/go/src/kafka.go
    incubator/kafka/trunk/clients/go/src/message.go
    incubator/kafka/trunk/clients/go/src/payload_codec.go
    incubator/kafka/trunk/clients/go/src/publisher.go
    incubator/kafka/trunk/clients/go/src/request.go
    incubator/kafka/trunk/clients/go/src/timing.go
    incubator/kafka/trunk/clients/go/tools/consumer/consumer.go
    incubator/kafka/trunk/clients/go/tools/offsets/offsets.go
    incubator/kafka/trunk/clients/go/tools/publisher/publisher.go

Modified: incubator/kafka/trunk/clients/go/src/consumer.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/consumer.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/consumer.go (original)
+++ incubator/kafka/trunk/clients/go/src/consumer.go Thu Mar 15 00:39:42 2012
@@ -23,12 +23,11 @@
 package kafka
 
 import (
-  "encoding/binary"
-  "errors"
-  "io"
   "log"
+  "os"
   "net"
   "time"
+  "encoding/binary"
 )
 
 type BrokerConsumer struct {
@@ -67,11 +66,11 @@ func NewBrokerOffsetConsumer(hostname st
 func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) {
   // merge to the default map, so one 'could' override the default codecs..
   for k, v := range codecsMap(payloadCodecs) {
-    consumer.codecs[k] = v
+    consumer.codecs[k] = v, true
   }
 }
 
-func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, error) {
+func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) {
   conn, err := consumer.broker.connect()
   if err != nil {
     return -1, err
@@ -87,14 +86,14 @@ func (consumer *BrokerConsumer) ConsumeO
       })
 
       if err != nil {
-        if err != io.EOF {
+        if err != os.EOF {
           log.Println("Fatal Error: ", err)
           panic(err)
         }
         quit <- true // force quit
         break
       }
-      time.Sleep(time.Millisecond * time.Duration(pollTimeoutMs))
+      time.Sleep(pollTimeoutMs * 1000000)
     }
     done <- true
   }()
@@ -108,7 +107,7 @@ func (consumer *BrokerConsumer) ConsumeO
 
 type MessageHandlerFunc func(msg *Message)
 
-func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error) {
+func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os.Error) {
   conn, err := consumer.broker.connect()
   if err != nil {
     return -1, err
@@ -124,7 +123,7 @@ func (consumer *BrokerConsumer) Consume(
   return num, err
 }
 
-func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, error) {
+func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) {
   _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize))
   if err != nil {
     return -1, err
@@ -143,7 +142,7 @@ func (consumer *BrokerConsumer) consumeW
     for currentOffset <= uint64(length-4) {
       totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs)
       if msgs == nil {
-        return num, errors.New("Error Decoding Message")
+        return num, os.NewError("Error Decoding Message")
       }
       msgOffset := consumer.offset + currentOffset
       for _, msg := range msgs {
@@ -165,7 +164,7 @@ func (consumer *BrokerConsumer) consumeW
 // Get a list of valid offsets (up to maxNumOffsets) before the given time, where 
 // time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available)
 // The result is a list of offsets, in descending order.
-func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error) {
+func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, os.Error) {
   offsets := make([]uint64, 0)
 
   conn, err := consumer.broker.connect()

Modified: incubator/kafka/trunk/clients/go/src/kafka.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/kafka.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/kafka.go (original)
+++ incubator/kafka/trunk/clients/go/src/kafka.go Thu Mar 15 00:39:42 2012
@@ -23,13 +23,13 @@
 package kafka
 
 import (
-  "bufio"
-  "encoding/binary"
-  "errors"
-  "fmt"
-  "io"
   "log"
   "net"
+  "os"
+  "fmt"
+  "encoding/binary"
+  "io"
+  "bufio"
 )
 
 const (
@@ -48,7 +48,7 @@ func newBroker(hostname string, topic st
     hostname:  hostname}
 }
 
-func (b *Broker) connect() (conn *net.TCPConn, error error) {
+func (b *Broker) connect() (conn *net.TCPConn, error os.Error) {
   raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname)
   if err != nil {
     log.Println("Fatal Error: ", err)
@@ -63,7 +63,7 @@ func (b *Broker) connect() (conn *net.TC
 }
 
 // returns length of response & payload & err
-func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) {
+func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) {
   reader := bufio.NewReader(conn)
   length := make([]byte, 4)
   lenRead, err := io.ReadFull(reader, length)
@@ -71,7 +71,7 @@ func (b *Broker) readResponse(conn *net.
     return 0, []byte{}, err
   }
   if lenRead != 4 || lenRead < 0 {
-    return 0, []byte{}, errors.New("invalid length of the packet length field")
+    return 0, []byte{}, os.NewError("invalid length of the packet length field")
   }
 
   expectedLength := binary.BigEndian.Uint32(length)
@@ -82,13 +82,13 @@ func (b *Broker) readResponse(conn *net.
   }
 
   if uint32(lenRead) != expectedLength {
-    return 0, []byte{}, errors.New(fmt.Sprintf("Fatal Error: Unexpected Length: %d  expected:  %d", lenRead, expectedLength))
+    return 0, []byte{}, os.NewError(fmt.Sprintf("Fatal Error: Unexpected Length: %d  expected:  %d", lenRead, expectedLength))
   }
 
   errorCode := binary.BigEndian.Uint16(messages[0:2])
   if errorCode != 0 {
     log.Println("errorCode: ", errorCode)
-    return 0, []byte{}, errors.New(
+    return 0, []byte{}, os.NewError(
       fmt.Sprintf("Broker Response Error: %d", errorCode))
   }
   return expectedLength, messages[2:], nil

Modified: incubator/kafka/trunk/clients/go/src/message.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/message.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/message.go (original)
+++ incubator/kafka/trunk/clients/go/src/message.go Thu Mar 15 00:39:42 2012
@@ -23,9 +23,9 @@
 package kafka
 
 import (
-  "bytes"
-  "encoding/binary"
   "hash/crc32"
+  "encoding/binary"
+  "bytes"
   "log"
 )
 

Modified: incubator/kafka/trunk/clients/go/src/payload_codec.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/payload_codec.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/payload_codec.go (original)
+++ incubator/kafka/trunk/clients/go/src/payload_codec.go Thu Mar 15 00:39:42 2012
@@ -57,7 +57,7 @@ var DefaultCodecsMap = codecsMap(Default
 func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec {
   payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs))
   for _, c := range payloadCodecs {
-    payloadCodecsMap[c.Id()] = c
+    payloadCodecsMap[c.Id()] = c, true
   }
   return payloadCodecsMap
 }
@@ -65,6 +65,7 @@ func codecsMap(payloadCodecs []PayloadCo
 // No compression codec, noop
 
 type NoCompressionPayloadCodec struct {
+
 }
 
 func (codec *NoCompressionPayloadCodec) Id() byte {
@@ -82,6 +83,7 @@ func (codec *NoCompressionPayloadCodec) 
 // Gzip Codec
 
 type GzipPayloadCodec struct {
+
 }
 
 func (codec *GzipPayloadCodec) Id() byte {

Modified: incubator/kafka/trunk/clients/go/src/publisher.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/publisher.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/publisher.go (original)
+++ incubator/kafka/trunk/clients/go/src/publisher.go Thu Mar 15 00:39:42 2012
@@ -22,6 +22,10 @@
 
 package kafka
 
+import (
+  "os"
+)
+
 type BrokerPublisher struct {
   broker *Broker
 }
@@ -30,11 +34,11 @@ func NewBrokerPublisher(hostname string,
   return &BrokerPublisher{broker: newBroker(hostname, topic, partition)}
 }
 
-func (b *BrokerPublisher) Publish(message *Message) (int, error) {
+func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) {
   return b.BatchPublish(message)
 }
 
-func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) {
+func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) {
   conn, err := b.broker.connect()
   if err != nil {
     return -1, err

Modified: incubator/kafka/trunk/clients/go/src/request.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/request.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/request.go (original)
+++ incubator/kafka/trunk/clients/go/src/request.go Thu Mar 15 00:39:42 2012
@@ -23,8 +23,8 @@
 package kafka
 
 import (
-  "bytes"
   "encoding/binary"
+  "bytes"
 )
 
 type RequestType uint16

Modified: incubator/kafka/trunk/clients/go/src/timing.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/timing.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/timing.go (original)
+++ incubator/kafka/trunk/clients/go/src/timing.go Thu Mar 15 00:39:42 2012
@@ -34,16 +34,16 @@ type Timing struct {
 }
 
 func StartTiming(label string) *Timing {
-  return &Timing{label: label, start: time.Now().UnixNano()}
+  return &Timing{label: label, start: time.Nanoseconds(), stop: 0}
 }
 
 func (t *Timing) Stop() {
-  t.stop = time.Now().UnixNano()
+  t.stop = time.Nanoseconds()
 }
 
 func (t *Timing) Print() {
   if t.stop == 0 {
     t.Stop()
   }
-  log.Printf("%s took: %f ms\n", t.label, float64(t.stop-t.start)/1000000)
+  log.Printf("%s took: %f ms\n", t.label, float64((time.Nanoseconds()-t.start))/1000000)
 }

Modified: incubator/kafka/trunk/clients/go/tools/consumer/consumer.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/consumer/consumer.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/consumer/consumer.go (original)
+++ incubator/kafka/trunk/clients/go/tools/consumer/consumer.go Thu Mar 15 00:39:42 2012
@@ -23,12 +23,12 @@
 package main
 
 import (
+  "kafka"
   "flag"
   "fmt"
   "os"
-  "os/signal"
   "strconv"
-  kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
+  "os/signal"
   "syscall"
 )
 
@@ -46,7 +46,7 @@ func init() {
   flag.StringVar(&topic, "topic", "test", "topic to publish to")
   flag.IntVar(&partition, "partition", 0, "partition to publish to")
   flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from")
-  flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes of message set to request")
+  flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from")
   flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file")
   flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming")
   flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout")
@@ -61,7 +61,7 @@ func main() {
 
   var payloadFile *os.File = nil
   if len(writePayloadsTo) > 0 {
-    var err error
+    var err os.Error
     payloadFile, err = os.Create(writePayloadsTo)
     if err != nil {
       fmt.Println("Error opening file: ", err)
@@ -74,7 +74,7 @@ func main() {
       msg.Print()
     }
     if payloadFile != nil {
-      payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n"))
+      payloadFile.Write([]byte("Message at: " + strconv.Uitoa64(msg.Offset()) + "\n"))
       payloadFile.Write(msg.Payload())
       payloadFile.Write([]byte("\n-------------------------------\n"))
     }
@@ -83,17 +83,10 @@ func main() {
   if consumerForever {
     quit := make(chan bool, 1)
     go func() {
-      sigIn := make(chan os.Signal)
-      signal.Notify(sigIn)
       for {
-
-        select {
-        case sig := <-sigIn:
-          if sig.(os.Signal) == syscall.SIGINT {
-            quit <- true
-          } else {
-            fmt.Println(sig)
-          }
+        sig := <-signal.Incoming
+        if sig.(os.UnixSignal) == syscall.SIGINT {
+          quit <- true
         }
       }
     }()

Modified: incubator/kafka/trunk/clients/go/tools/offsets/offsets.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/offsets/offsets.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/offsets/offsets.go (original)
+++ incubator/kafka/trunk/clients/go/tools/offsets/offsets.go Thu Mar 15 00:39:42 2012
@@ -20,12 +20,13 @@
  *  of their respective owners.
  */
 
+
 package main
 
 import (
+  "kafka"
   "flag"
   "fmt"
-  kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
 )
 
 var hostname string
@@ -42,6 +43,7 @@ func init() {
   flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that:  time(ms)/-1(latest)/-2(earliest)")
 }
 
+
 func main() {
   flag.Parse()
   fmt.Println("Offsets :")
@@ -54,7 +56,7 @@ func main() {
     fmt.Println("Error: ", err)
   }
   fmt.Printf("Offsets found: %d\n", len(offsets))
-  for i := 0; i < len(offsets); i++ {
+  for i := 0 ; i < len(offsets); i++ {
     fmt.Printf("Offset[%d] = %d\n", i, offsets[i])
   }
 }

Modified: incubator/kafka/trunk/clients/go/tools/publisher/publisher.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/publisher/publisher.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/publisher/publisher.go (original)
+++ incubator/kafka/trunk/clients/go/tools/publisher/publisher.go Thu Mar 15 00:39:42 2012
@@ -23,9 +23,9 @@
 package main
 
 import (
+  "kafka"
   "flag"
   "fmt"
-  kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
   "os"
 )
 
@@ -63,7 +63,7 @@ func main() {
       fmt.Println("Error: ", err)
       return
     }
-    payload := make([]byte, stat.Size())
+    payload := make([]byte, stat.Size)
     file.Read(payload)
     timing := kafka.StartTiming("Sending")