You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/03/15 01:39:42 UTC
svn commit: r1300801 - in /incubator/kafka/trunk/clients/go: src/consumer.go
src/kafka.go src/message.go src/payload_codec.go src/publisher.go
src/request.go src/timing.go tools/consumer/consumer.go
tools/offsets/offsets.go tools/publisher/publisher.go
Author: junrao
Date: Thu Mar 15 00:39:42 2012
New Revision: 1300801
URL: http://svn.apache.org/viewvc?rev=1300801&view=rev
Log:
reverting previous commit for KAFKA-296 because patch didn't apply cleanly
Modified:
incubator/kafka/trunk/clients/go/src/consumer.go
incubator/kafka/trunk/clients/go/src/kafka.go
incubator/kafka/trunk/clients/go/src/message.go
incubator/kafka/trunk/clients/go/src/payload_codec.go
incubator/kafka/trunk/clients/go/src/publisher.go
incubator/kafka/trunk/clients/go/src/request.go
incubator/kafka/trunk/clients/go/src/timing.go
incubator/kafka/trunk/clients/go/tools/consumer/consumer.go
incubator/kafka/trunk/clients/go/tools/offsets/offsets.go
incubator/kafka/trunk/clients/go/tools/publisher/publisher.go
Modified: incubator/kafka/trunk/clients/go/src/consumer.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/consumer.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/consumer.go (original)
+++ incubator/kafka/trunk/clients/go/src/consumer.go Thu Mar 15 00:39:42 2012
@@ -23,12 +23,11 @@
package kafka
import (
- "encoding/binary"
- "errors"
- "io"
"log"
+ "os"
"net"
"time"
+ "encoding/binary"
)
type BrokerConsumer struct {
@@ -67,11 +66,11 @@ func NewBrokerOffsetConsumer(hostname st
func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec) {
// merge to the default map, so one 'could' override the default codecs..
for k, v := range codecsMap(payloadCodecs) {
- consumer.codecs[k] = v
+ consumer.codecs[k] = v, true
}
}
-func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, error) {
+func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) {
conn, err := consumer.broker.connect()
if err != nil {
return -1, err
@@ -87,14 +86,14 @@ func (consumer *BrokerConsumer) ConsumeO
})
if err != nil {
- if err != io.EOF {
+ if err != os.EOF {
log.Println("Fatal Error: ", err)
panic(err)
}
quit <- true // force quit
break
}
- time.Sleep(time.Millisecond * time.Duration(pollTimeoutMs))
+ time.Sleep(pollTimeoutMs * 1000000)
}
done <- true
}()
@@ -108,7 +107,7 @@ func (consumer *BrokerConsumer) ConsumeO
type MessageHandlerFunc func(msg *Message)
-func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, error) {
+func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os.Error) {
conn, err := consumer.broker.connect()
if err != nil {
return -1, err
@@ -124,7 +123,7 @@ func (consumer *BrokerConsumer) Consume(
return num, err
}
-func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, error) {
+func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) {
_, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize))
if err != nil {
return -1, err
@@ -143,7 +142,7 @@ func (consumer *BrokerConsumer) consumeW
for currentOffset <= uint64(length-4) {
totalLength, msgs := Decode(payload[currentOffset:], consumer.codecs)
if msgs == nil {
- return num, errors.New("Error Decoding Message")
+ return num, os.NewError("Error Decoding Message")
}
msgOffset := consumer.offset + currentOffset
for _, msg := range msgs {
@@ -165,7 +164,7 @@ func (consumer *BrokerConsumer) consumeW
// Get a list of valid offsets (up to maxNumOffsets) before the given time, where
// time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available)
// The result is a list of offsets, in descending order.
-func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error) {
+func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, os.Error) {
offsets := make([]uint64, 0)
conn, err := consumer.broker.connect()
Modified: incubator/kafka/trunk/clients/go/src/kafka.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/kafka.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/kafka.go (original)
+++ incubator/kafka/trunk/clients/go/src/kafka.go Thu Mar 15 00:39:42 2012
@@ -23,13 +23,13 @@
package kafka
import (
- "bufio"
- "encoding/binary"
- "errors"
- "fmt"
- "io"
"log"
"net"
+ "os"
+ "fmt"
+ "encoding/binary"
+ "io"
+ "bufio"
)
const (
@@ -48,7 +48,7 @@ func newBroker(hostname string, topic st
hostname: hostname}
}
-func (b *Broker) connect() (conn *net.TCPConn, error error) {
+func (b *Broker) connect() (conn *net.TCPConn, error os.Error) {
raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname)
if err != nil {
log.Println("Fatal Error: ", err)
@@ -63,7 +63,7 @@ func (b *Broker) connect() (conn *net.TC
}
// returns length of response & payload & err
-func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, error) {
+func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) {
reader := bufio.NewReader(conn)
length := make([]byte, 4)
lenRead, err := io.ReadFull(reader, length)
@@ -71,7 +71,7 @@ func (b *Broker) readResponse(conn *net.
return 0, []byte{}, err
}
if lenRead != 4 || lenRead < 0 {
- return 0, []byte{}, errors.New("invalid length of the packet length field")
+ return 0, []byte{}, os.NewError("invalid length of the packet length field")
}
expectedLength := binary.BigEndian.Uint32(length)
@@ -82,13 +82,13 @@ func (b *Broker) readResponse(conn *net.
}
if uint32(lenRead) != expectedLength {
- return 0, []byte{}, errors.New(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength))
+ return 0, []byte{}, os.NewError(fmt.Sprintf("Fatal Error: Unexpected Length: %d expected: %d", lenRead, expectedLength))
}
errorCode := binary.BigEndian.Uint16(messages[0:2])
if errorCode != 0 {
log.Println("errorCode: ", errorCode)
- return 0, []byte{}, errors.New(
+ return 0, []byte{}, os.NewError(
fmt.Sprintf("Broker Response Error: %d", errorCode))
}
return expectedLength, messages[2:], nil
Modified: incubator/kafka/trunk/clients/go/src/message.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/message.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/message.go (original)
+++ incubator/kafka/trunk/clients/go/src/message.go Thu Mar 15 00:39:42 2012
@@ -23,9 +23,9 @@
package kafka
import (
- "bytes"
- "encoding/binary"
"hash/crc32"
+ "encoding/binary"
+ "bytes"
"log"
)
Modified: incubator/kafka/trunk/clients/go/src/payload_codec.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/payload_codec.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/payload_codec.go (original)
+++ incubator/kafka/trunk/clients/go/src/payload_codec.go Thu Mar 15 00:39:42 2012
@@ -57,7 +57,7 @@ var DefaultCodecsMap = codecsMap(Default
func codecsMap(payloadCodecs []PayloadCodec) map[byte]PayloadCodec {
payloadCodecsMap := make(map[byte]PayloadCodec, len(payloadCodecs))
for _, c := range payloadCodecs {
- payloadCodecsMap[c.Id()] = c
+ payloadCodecsMap[c.Id()] = c, true
}
return payloadCodecsMap
}
@@ -65,6 +65,7 @@ func codecsMap(payloadCodecs []PayloadCo
// No compression codec, noop
type NoCompressionPayloadCodec struct {
+
}
func (codec *NoCompressionPayloadCodec) Id() byte {
@@ -82,6 +83,7 @@ func (codec *NoCompressionPayloadCodec)
// Gzip Codec
type GzipPayloadCodec struct {
+
}
func (codec *GzipPayloadCodec) Id() byte {
Modified: incubator/kafka/trunk/clients/go/src/publisher.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/publisher.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/publisher.go (original)
+++ incubator/kafka/trunk/clients/go/src/publisher.go Thu Mar 15 00:39:42 2012
@@ -22,6 +22,10 @@
package kafka
+import (
+ "os"
+)
+
type BrokerPublisher struct {
broker *Broker
}
@@ -30,11 +34,11 @@ func NewBrokerPublisher(hostname string,
return &BrokerPublisher{broker: newBroker(hostname, topic, partition)}
}
-func (b *BrokerPublisher) Publish(message *Message) (int, error) {
+func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) {
return b.BatchPublish(message)
}
-func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error) {
+func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, os.Error) {
conn, err := b.broker.connect()
if err != nil {
return -1, err
Modified: incubator/kafka/trunk/clients/go/src/request.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/request.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/request.go (original)
+++ incubator/kafka/trunk/clients/go/src/request.go Thu Mar 15 00:39:42 2012
@@ -23,8 +23,8 @@
package kafka
import (
- "bytes"
"encoding/binary"
+ "bytes"
)
type RequestType uint16
Modified: incubator/kafka/trunk/clients/go/src/timing.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/timing.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/src/timing.go (original)
+++ incubator/kafka/trunk/clients/go/src/timing.go Thu Mar 15 00:39:42 2012
@@ -34,16 +34,16 @@ type Timing struct {
}
func StartTiming(label string) *Timing {
- return &Timing{label: label, start: time.Now().UnixNano()}
+ return &Timing{label: label, start: time.Nanoseconds(), stop: 0}
}
func (t *Timing) Stop() {
- t.stop = time.Now().UnixNano()
+ t.stop = time.Nanoseconds()
}
func (t *Timing) Print() {
if t.stop == 0 {
t.Stop()
}
- log.Printf("%s took: %f ms\n", t.label, float64(t.stop-t.start)/1000000)
+ log.Printf("%s took: %f ms\n", t.label, float64((time.Nanoseconds()-t.start))/1000000)
}
Modified: incubator/kafka/trunk/clients/go/tools/consumer/consumer.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/consumer/consumer.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/consumer/consumer.go (original)
+++ incubator/kafka/trunk/clients/go/tools/consumer/consumer.go Thu Mar 15 00:39:42 2012
@@ -23,12 +23,12 @@
package main
import (
+ "kafka"
"flag"
"fmt"
"os"
- "os/signal"
"strconv"
- kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
+ "os/signal"
"syscall"
)
@@ -46,7 +46,7 @@ func init() {
flag.StringVar(&topic, "topic", "test", "topic to publish to")
flag.IntVar(&partition, "partition", 0, "partition to publish to")
flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from")
- flag.UintVar(&maxSize, "maxsize", 1048576, "max size in bytes of message set to request")
+ flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from")
flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file")
flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming")
flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout")
@@ -61,7 +61,7 @@ func main() {
var payloadFile *os.File = nil
if len(writePayloadsTo) > 0 {
- var err error
+ var err os.Error
payloadFile, err = os.Create(writePayloadsTo)
if err != nil {
fmt.Println("Error opening file: ", err)
@@ -74,7 +74,7 @@ func main() {
msg.Print()
}
if payloadFile != nil {
- payloadFile.Write([]byte("Message at: " + strconv.FormatUint(msg.Offset(), 10) + "\n"))
+ payloadFile.Write([]byte("Message at: " + strconv.Uitoa64(msg.Offset()) + "\n"))
payloadFile.Write(msg.Payload())
payloadFile.Write([]byte("\n-------------------------------\n"))
}
@@ -83,17 +83,10 @@ func main() {
if consumerForever {
quit := make(chan bool, 1)
go func() {
- sigIn := make(chan os.Signal)
- signal.Notify(sigIn)
for {
-
- select {
- case sig := <-sigIn:
- if sig.(os.Signal) == syscall.SIGINT {
- quit <- true
- } else {
- fmt.Println(sig)
- }
+ sig := <-signal.Incoming
+ if sig.(os.UnixSignal) == syscall.SIGINT {
+ quit <- true
}
}
}()
Modified: incubator/kafka/trunk/clients/go/tools/offsets/offsets.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/offsets/offsets.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/offsets/offsets.go (original)
+++ incubator/kafka/trunk/clients/go/tools/offsets/offsets.go Thu Mar 15 00:39:42 2012
@@ -20,12 +20,13 @@
* of their respective owners.
*/
+
package main
import (
+ "kafka"
"flag"
"fmt"
- kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
)
var hostname string
@@ -42,6 +43,7 @@ func init() {
flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that: time(ms)/-1(latest)/-2(earliest)")
}
+
func main() {
flag.Parse()
fmt.Println("Offsets :")
@@ -54,7 +56,7 @@ func main() {
fmt.Println("Error: ", err)
}
fmt.Printf("Offsets found: %d\n", len(offsets))
- for i := 0; i < len(offsets); i++ {
+ for i := 0 ; i < len(offsets); i++ {
fmt.Printf("Offset[%d] = %d\n", i, offsets[i])
}
}
Modified: incubator/kafka/trunk/clients/go/tools/publisher/publisher.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/publisher/publisher.go?rev=1300801&r1=1300800&r2=1300801&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/publisher/publisher.go (original)
+++ incubator/kafka/trunk/clients/go/tools/publisher/publisher.go Thu Mar 15 00:39:42 2012
@@ -23,9 +23,9 @@
package main
import (
+ "kafka"
"flag"
"fmt"
- kafka "svn.apache.org/repos/asf/incubator/kafka.svn/trunk/clients/go/src"
"os"
)
@@ -63,7 +63,7 @@ func main() {
fmt.Println("Error: ", err)
return
}
- payload := make([]byte, stat.Size())
+ payload := make([]byte, stat.Size)
file.Read(payload)
timing := kafka.StartTiming("Sending")