You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/10/23 16:36:51 UTC
[49/50] [abbrv] qpid-proton git commit: Merge branch 'master' into
go1 - 0.11 alpa
Merge branch 'master' into go1 - 0.11 alpa
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8dc93cd0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8dc93cd0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8dc93cd0
Branch: refs/heads/go1
Commit: 8dc93cd08c786b30fc1e76312e6761739a791d5a
Parents: 581841f f70afb6
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Oct 23 10:22:12 2015 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Oct 23 10:22:12 2015 -0400
----------------------------------------------------------------------
README.md | 96 +++++
amqp/doc.go | 34 ++
amqp/error.go | 66 +++
amqp/interop | 1 +
amqp/interop_test.go | 381 +++++++++++++++++
amqp/marshal.go | 250 +++++++++++
amqp/message.go | 347 +++++++++++++++
amqp/message_test.go | 166 ++++++++
amqp/types.go | 199 +++++++++
amqp/unmarshal.go | 558 ++++++++++++++++++++++++
amqp/url.go | 96 +++++
amqp/url_test.go | 51 +++
electron/connection.go | 218 ++++++++++
electron/container.go | 71 ++++
electron/doc.go | 57 +++
electron/endpoint.go | 68 +++
electron/handler.go | 158 +++++++
electron/link.go | 247 +++++++++++
electron/messaging_test.go | 416 ++++++++++++++++++
electron/receiver.go | 238 +++++++++++
electron/sender.go | 315 ++++++++++++++
electron/session.go | 125 ++++++
electron/time.go | 82 ++++
internal/error.go | 118 +++++
internal/flexchannel.go | 82 ++++
internal/flexchannel_test.go | 89 ++++
internal/safemap.go | 57 +++
internal/uuid.go | 70 +++
proton/doc.go | 51 ++-
proton/engine.go | 402 ++++++++++++++++++
proton/handlers.go | 391 +++++++++++++++++
proton/message.go | 86 ++++
proton/proton_test.go | 27 ++
proton/wrappers.go | 384 +++++++++++++++++
proton/wrappers_gen.go | 874 ++++++++++++++++++++++++++++++++++++++
readme-branch.md | 7 +
36 files changed, 6871 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/README.md
----------------------------------------------------------------------
diff --cc README.md
index 0000000,9f95939..4b2da12
mode 000000,100644..100644
--- a/README.md
+++ b/README.md
@@@ -1,0 -1,44 +1,96 @@@
-Qpid Proton - AMQP messaging toolkit
-====================================
-
-Linux Build | Windows Build
-------------|--------------
-[![Linux Build Status](https://travis-ci.org/apache/qpid-proton.svg?branch=master)](https://travis-ci.org/apache/qpid-proton) | [![Windows Build Status](https://ci.appveyor.com/api/projects/status/github/apache/qpid-proton?branch=master&svg=true)](https://ci.appveyor.com/project/ke4qqq/qpid-proton/branch/master)
-
-Qpid Proton is a high-performance, lightweight messaging library. It can be
-used in the widest range of messaging applications, including brokers, client
-libraries, routers, bridges, proxies, and more. Proton makes it trivial to
-integrate with the AMQP 1.0 ecosystem from any platform, environment, or
-language
-
-Features
---------
-
- + A flexible and capable reactive messaging API
- + Full control of AMQP 1.0 protocol semantics
- + Portable C implementation with bindings to popular languages
- + Pure-Java and pure-JavaScript implementations
- + Peer-to-peer and brokered messaging
- + Secure communication via SSL and SASL
-
-Universal - Proton is designed to scale both up and down. Equally suitable for
-simple clients or high-powered servers, it can be deployed in simple
-peer-to-peer configurations or as part of a global federated messaging network.
-
-Embeddable - Proton is carefully written to be portable and cross platform. It
-has minimal dependencies, and it is architected to be usable with any threading
-model, as well as with non-threaded applications. These features make it
-uniquely suited for embedding messaging capabilities into existing software.
-
-Standard - Built around the AMQP 1.0 messaging standard, Proton is not only
-ideal for building out your own messaging applications but also for connecting
-them to the broader ecosystem of AMQP 1.0-based messaging applications.
-
-Getting Started
----------------
-
-See the included INSTALL file for build and install instructions and the
-DEVELOPERS file for information on how to modify and test the library code
-itself.
-
-Please see http://qpid.apache.org/proton for a more info.
++# Qpid Go packages for AMQP
++
++These packages provide [Go](http://golang.org) support for sending and receiving
++AMQP messages in client or server applications. Reference documentation is
++available at: <http://godoc.org/?q=qpid.apache.org>
++
++There are 3 packages:
++
++[qpid.apache.org/amqp](http://godoc.org/qpid.apache.org/amqp) provides functions
++to convert AMQP messages and data types to and from Go data types. Used by both
++the proton and electron packages to manage AMQP data.
++
++[qpid.apache.org/electron](http://godoc.org/qpid.apache.org/electron) is a
++simple, concurrent-safe API for sending and receiving messages. It can be used
++with goroutines and channels to build concurrent AMQP clients and servers.
++
++[qpid.apache.org/proton](http://godoc.org/qpid.apache.org/proton) is an
++event-driven, concurrent-unsafe package that closely follows the proton C
++API. Most Go programmers will find the electron package easier to use.
++
++There are [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md)
++to help you get started.
++
++Feedback is encouraged at:
++
++- Email <pr...@qpid.apache.org>
++- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue.
++
++### Why two APIs?
++
++The `proton` API is a direct mapping of the proton C library into Go. It is
++usable but not very natural for a Go programmer because it takes an
++*event-driven* approach and has no built-in support for concurrent
++use. `electron` uses `proton` internally but provides a more Go-like API that is
++safe to use from multiple concurrent goroutines.
++
++Go encourages programs to be structured as concurrent *goroutines* that
++communicate via *channels*. Go literature distinguishes between:
++
++- *concurrency*: "keeping track of things that could be done in parallel"
++- *parallelism*: "actually doing things in parallel on multiple CPUs or cores"
++
++A Go program expresses concurrency by starting goroutines for potentially
++concurrent tasks. The Go runtime schedules the activity of goroutines onto a
++small number (possibly one) of actual parallel executions.
++
++Even with no hardware parallelism, goroutine concurrency lets the Go runtime
++order unpredictable events like file descriptors being readable/writable,
++channels having data, timers firing etc. Go automatically takes care of
++switching out goroutines that block or sleep so it is normal to write code in
++terms of blocking calls.
++
++By contrast, event-driven programming is based on polling mechanisms like
++`select`, `poll` or `epoll`. These also dispatch unpredictably ordered events to
++a single thread or a small thread pool. However this requires a different style
++of programming: "event-driven" or "reactive" programming. Go developers call it
++"inside-out" programming. In an event-driven program blocking is a big problem
++as it consumes a scarce thread of execution, so actions that take time to
++complete have to be re-structured in terms of multiple events.
++
++The promise of Go is that you can express your program in concurrent, sequential
++terms and the Go runtime will turn it inside-out for you. You can start
++goroutines for all concurrent activities. They can loop forever or block for as
++long as they need waiting for timers, IO or any unpredictable event. Go will
++interleave and schedule them efficiently onto the available parallel hardware.
++
++For example: in the `electron` API, you can send a message and wait for it to be
++acknowledged in a single function. All the information about the message, why
++you sent it, and what to do when it is acknowledged can be held in local
++variables, all the code is in a simple sequence. Other goroutines in your
++program can be sending and receiving messages concurrently, they are not
++blocked.
++
++In the `proton` API, an event handler that sends a message must return
++*immediately*, it cannot block the event loop to wait for
++acknowledgement. Acknowledgement is a separate event, so the code for handling
++it is in a different event handler. Context information about the message has to
++be stored in some non-local variable that both functions can find. This makes
++the code harder to follow.
++
++The `proton` API is important because it is the foundation for the `electron`
++API, and may be useful for programs that need to be close to the original C
++library for some reason. However the `electron` API hides the event-driven
++details behind simple, sequential, concurrent-safe methods that can be called
++from arbitrary goroutines. Under the covers, data is passed through channels to
++dedicated `proton` goroutines so user goroutines can work concurrently with the
++proton event-loop.
++
++## New to Go?
++
++If you are new to Go then these are a good place to start:
++
++- [A Tour of Go](http://tour.golang.org)
++- [Effective Go](http://golang.org/doc/effective_go.html)
++
++Then look at the tools and docs at <http://golang.org> as you need them.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/doc.go
----------------------------------------------------------------------
diff --cc amqp/doc.go
index 0000000,0000000..323c344
new file mode 100644
--- /dev/null
+++ b/amqp/doc.go
@@@ -1,0 -1,0 +1,34 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++/*
++Package amqp encodes and decodes AMQP messages and data types as Go types.
++
++It follows the standard 'encoding' libraries pattern. The mapping between AMQP
++and Go types is described in the documentation of the Marshal and Unmarshal
++functions.
++
++AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/>
++*/
++package amqp
++
++// #cgo LDFLAGS: -lqpid-proton
++import "C"
++
++// This file is just for the package comment.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/error.go
----------------------------------------------------------------------
diff --cc amqp/error.go
index 0000000,0000000..868dbf3
new file mode 100644
--- /dev/null
+++ b/amqp/error.go
@@@ -1,0 -1,0 +1,66 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package amqp
++
++import (
++ "fmt"
++ "reflect"
++)
++
++// Error is an AMQP error condition. It has a name and a description.
++// It implements the Go error interface so can be returned as an error value.
++//
++// You can pass amqp.Error to methods that pass an error to a remote endpoint,
++// this gives you full control over what the remote endpoint will see.
++//
++// You can also pass any Go error to such functions, the remote peer
++// will see the equivalent of MakeError(error)
++//
++type Error struct{ Name, Description string }
++
++// Error implements the Go error interface for AMQP error errors.
++func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) }
++
++// Errorf makes a Error with name and formatted description as per fmt.Sprintf
++func Errorf(name, format string, arg ...interface{}) Error {
++ return Error{name, fmt.Sprintf(format, arg...)}
++}
++
++// MakeError makes an AMQP error from a go error using the Go error type as the name
++// and the err.Error() string as the description.
++func MakeError(err error) Error {
++ return Error{reflect.TypeOf(err).Name(), err.Error()}
++}
++
++var (
++ InternalError = "amqp:internal-error"
++ NotFound = "amqp:not-found"
++ UnauthorizedAccess = "amqp:unauthorized-access"
++ DecodeError = "amqp:decode-error"
++ ResourceLimit = "amqp:resource-limit"
++ NotAllowed = "amqp:not-allowed"
++ InvalidField = "amqp:invalid-field"
++ NotImplemented = "amqp:not-implemented"
++ ResourceLocked = "amqp:resource-locked"
++ PreerrorFailed = "amqp:preerror-failed"
++ ResourceDeleted = "amqp:resource-deleted"
++ IllegalState = "amqp:illegal-state"
++ FrameSizeTooSmall = "amqp:frame-size-too-small"
++)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/interop
----------------------------------------------------------------------
diff --cc amqp/interop
index 0000000,0000000..ad6fcad
new file mode 120000
--- /dev/null
+++ b/amqp/interop
@@@ -1,0 -1,0 +1,1 @@@
++../../../../../../tests/interop
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/interop_test.go
----------------------------------------------------------------------
diff --cc amqp/interop_test.go
index 0000000,0000000..b36ef64
new file mode 100644
--- /dev/null
+++ b/amqp/interop_test.go
@@@ -1,0 -1,0 +1,381 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++// Test that conversion of Go type to/from AMQP is compatible with other
++// bindings.
++//
++package amqp
++
++import (
++ "bytes"
++ "fmt"
++ "io"
++ "io/ioutil"
++ "os"
++ "reflect"
++ "strings"
++ "testing"
++)
++
++func checkEqual(want interface{}, got interface{}) error {
++ if !reflect.DeepEqual(want, got) {
++ return fmt.Errorf("%#v != %#v", want, got)
++ }
++ return nil
++}
++
++func getReader(name string) (r io.Reader) {
++ r, err := os.Open("interop/" + name + ".amqp")
++ if err != nil {
++ panic(fmt.Errorf("Can't open %#v: %v", name, err))
++ }
++ return
++}
++
++func remaining(d *Decoder) string {
++ remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader))
++ return string(remainder)
++}
++
++// checkDecode: want is the expected value, gotPtr is a pointer to a
++// instance of the same type for Decode.
++func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) {
++
++ if err := d.Decode(gotPtr); err != nil {
++ t.Error("Decode failed", err)
++ return
++ }
++ got := reflect.ValueOf(gotPtr).Elem().Interface()
++ if err := checkEqual(want, got); err != nil {
++ t.Error("Decode bad value:", err)
++ return
++ }
++
++ // Try round trip encoding
++ bytes, err := Marshal(want, nil)
++ if err != nil {
++ t.Error("Marshal failed", err)
++ return
++ }
++ n, err := Unmarshal(bytes, gotPtr)
++ if err != nil {
++ t.Error("Unmarshal failed", err)
++ return
++ }
++ if err := checkEqual(n, len(bytes)); err != nil {
++ t.Error("Bad unmarshal length", err)
++ return
++ }
++ got = reflect.ValueOf(gotPtr).Elem().Interface()
++ if err = checkEqual(want, got); err != nil {
++ t.Error("Bad unmarshal value", err)
++ return
++ }
++}
++
++func TestUnmarshal(t *testing.T) {
++ bytes, err := ioutil.ReadAll(getReader("strings"))
++ if err != nil {
++ t.Error(err)
++ }
++ for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
++ var got string
++ n, err := Unmarshal(bytes, &got)
++ if err != nil {
++ t.Error(err)
++ }
++ if want != got {
++ t.Errorf("%#v != %#v", want, got)
++ }
++ bytes = bytes[n:]
++ }
++}
++
++func TestPrimitivesExact(t *testing.T) {
++ d := NewDecoder(getReader("primitives"))
++ // Decoding into exact types
++ var b bool
++ checkDecode(d, true, &b, t)
++ checkDecode(d, false, &b, t)
++ var u8 uint8
++ checkDecode(d, uint8(42), &u8, t)
++ var u16 uint16
++ checkDecode(d, uint16(42), &u16, t)
++ var i16 int16
++ checkDecode(d, int16(-42), &i16, t)
++ var u32 uint32
++ checkDecode(d, uint32(12345), &u32, t)
++ var i32 int32
++ checkDecode(d, int32(-12345), &i32, t)
++ var u64 uint64
++ checkDecode(d, uint64(12345), &u64, t)
++ var i64 int64
++ checkDecode(d, int64(-12345), &i64, t)
++ var f32 float32
++ checkDecode(d, float32(0.125), &f32, t)
++ var f64 float64
++ checkDecode(d, float64(0.125), &f64, t)
++}
++
++func TestPrimitivesCompatible(t *testing.T) {
++ d := NewDecoder(getReader("primitives"))
++ // Decoding into compatible types
++ var b bool
++ var i int
++ var u uint
++ var f float64
++ checkDecode(d, true, &b, t)
++ checkDecode(d, false, &b, t)
++ checkDecode(d, uint(42), &u, t)
++ checkDecode(d, uint(42), &u, t)
++ checkDecode(d, -42, &i, t)
++ checkDecode(d, uint(12345), &u, t)
++ checkDecode(d, -12345, &i, t)
++ checkDecode(d, uint(12345), &u, t)
++ checkDecode(d, -12345, &i, t)
++ checkDecode(d, 0.125, &f, t)
++ checkDecode(d, 0.125, &f, t)
++}
++
++// checkDecodeValue: want is the expected value, decode into a reflect.Value
++func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) {
++
++ var got, got2 interface{}
++ if err := d.Decode(&got); err != nil {
++ t.Error("Decode failed", err)
++ return
++ }
++ if err := checkEqual(want, got); err != nil {
++ t.Error(err)
++ return
++ }
++ // Try round trip encoding
++ bytes, err := Marshal(got, nil)
++ if err != nil {
++ t.Error(err)
++ return
++ }
++ n, err := Unmarshal(bytes, &got2)
++ if err != nil {
++ t.Error(err)
++ return
++ }
++ if err := checkEqual(n, len(bytes)); err != nil {
++ t.Error(err)
++ return
++ }
++ if err := checkEqual(want, got2); err != nil {
++ t.Error(err)
++ return
++ }
++}
++
++func TestPrimitivesInterface(t *testing.T) {
++ d := NewDecoder(getReader("primitives"))
++ checkDecodeInterface(d, true, t)
++ checkDecodeInterface(d, false, t)
++ checkDecodeInterface(d, uint8(42), t)
++ checkDecodeInterface(d, uint16(42), t)
++ checkDecodeInterface(d, int16(-42), t)
++ checkDecodeInterface(d, uint32(12345), t)
++ checkDecodeInterface(d, int32(-12345), t)
++ checkDecodeInterface(d, uint64(12345), t)
++ checkDecodeInterface(d, int64(-12345), t)
++ checkDecodeInterface(d, float32(0.125), t)
++ checkDecodeInterface(d, float64(0.125), t)
++}
++
++func TestStrings(t *testing.T) {
++ d := NewDecoder(getReader("strings"))
++ // Test decoding as plain Go strings
++ for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
++ var got string
++ checkDecode(d, want, &got, t)
++ }
++ remains := remaining(d)
++ if remains != "" {
++ t.Errorf("leftover: %s", remains)
++ }
++
++ // Test decoding as specific string types
++ d = NewDecoder(getReader("strings"))
++ var bytes []byte
++ var str, sym string
++ checkDecode(d, []byte("abc\000defg"), &bytes, t)
++ checkDecode(d, "abcdefg", &str, t)
++ checkDecode(d, "abcdefg", &sym, t)
++ checkDecode(d, make([]byte, 0), &bytes, t)
++ checkDecode(d, "", &str, t)
++ checkDecode(d, "", &sym, t)
++ remains = remaining(d)
++ if remains != "" {
++ t.Fatalf("leftover: %s", remains)
++ }
++
++ // Test some error handling
++ d = NewDecoder(getReader("strings"))
++ var s string
++ err := d.Decode(s)
++ if err == nil {
++ t.Fatal("Expected error")
++ }
++ if !strings.Contains(err.Error(), "not a pointer") {
++ t.Error(err)
++ }
++ var i int
++ err = d.Decode(&i)
++ if !strings.Contains(err.Error(), "cannot unmarshal") {
++ t.Error(err)
++ }
++ _, err = Unmarshal([]byte{}, nil)
++ if !strings.Contains(err.Error(), "not enough data") {
++ t.Error(err)
++ }
++ _, err = Unmarshal([]byte("foobar"), nil)
++ if !strings.Contains(err.Error(), "invalid-argument") {
++ t.Error(err)
++ }
++}
++
++func TestEncodeDecode(t *testing.T) {
++ type data struct {
++ s string
++ i int
++ u8 uint8
++ b bool
++ f float32
++ v interface{}
++ }
++
++ in := data{"foo", 42, 9, true, 1.234, "thing"}
++
++ buf := bytes.Buffer{}
++ e := NewEncoder(&buf)
++ if err := e.Encode(in.s); err != nil {
++ t.Error(err)
++ }
++ if err := e.Encode(in.i); err != nil {
++ t.Error(err)
++ }
++ if err := e.Encode(in.u8); err != nil {
++ t.Error(err)
++ }
++ if err := e.Encode(in.b); err != nil {
++ t.Error(err)
++ }
++ if err := e.Encode(in.f); err != nil {
++ t.Error(err)
++ }
++ if err := e.Encode(in.v); err != nil {
++ t.Error(err)
++ }
++
++ var out data
++ d := NewDecoder(&buf)
++ if err := d.Decode(&out.s); err != nil {
++ t.Error(err)
++ }
++ if err := d.Decode(&out.i); err != nil {
++ t.Error(err)
++ }
++ if err := d.Decode(&out.u8); err != nil {
++ t.Error(err)
++ }
++ if err := d.Decode(&out.b); err != nil {
++ t.Error(err)
++ }
++ if err := d.Decode(&out.f); err != nil {
++ t.Error(err)
++ }
++ if err := d.Decode(&out.v); err != nil {
++ t.Error(err)
++ }
++
++ if err := checkEqual(in, out); err != nil {
++ t.Error(err)
++ }
++}
++
++func TestMap(t *testing.T) {
++ d := NewDecoder(getReader("maps"))
++
++ // Generic map
++ var m Map
++ checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t)
++
++ // Interface as map
++ var i interface{}
++ checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t)
++
++ d = NewDecoder(getReader("maps"))
++ // Specific typed map
++ var m2 map[string]int
++ checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t)
++
++ // Nested map
++ m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}}
++ bytes, err := Marshal(m, nil)
++ if err != nil {
++ t.Fatal(err)
++ }
++ _, err = Unmarshal(bytes, &i)
++ if err != nil {
++ t.Fatal(err)
++ }
++ if err = checkEqual(m, i); err != nil {
++ t.Fatal(err)
++ }
++}
++
++func TestList(t *testing.T) {
++ d := NewDecoder(getReader("lists"))
++ var l List
++ checkDecode(d, List{int32(32), "foo", true}, &l, t)
++ checkDecode(d, List{}, &l, t)
++}
++
++// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as
++// as an AMQP string *inside* an AMQP binary?? Skip the test for now.
++func TODO_TestMessage(t *testing.T) {
++ bytes, err := ioutil.ReadAll(getReader("message"))
++ if err != nil {
++ t.Fatal(err)
++ }
++
++ m, err := DecodeMessage(bytes)
++ if err != nil {
++ t.Fatal(err)
++ } else {
++ if err := checkEqual(m.Body(), "hello"); err != nil {
++ t.Error(err)
++ }
++ }
++
++ m2 := NewMessageWith("hello")
++ bytes2, err := m2.Encode(nil)
++ if err != nil {
++ t.Error(err)
++ } else {
++ if err = checkEqual(bytes, bytes2); err != nil {
++ t.Error(err)
++ }
++ }
++}
++
++// TODO aconway 2015-03-13: finish the full interop test
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/marshal.go
----------------------------------------------------------------------
diff --cc amqp/marshal.go
index 0000000,0000000..666b4f6
new file mode 100644
--- /dev/null
+++ b/amqp/marshal.go
@@@ -1,0 -1,0 +1,250 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package amqp
++
++// #include <proton/codec.h>
++import "C"
++
++import (
++ "io"
++ "qpid.apache.org/internal"
++ "reflect"
++ "unsafe"
++)
++
++func dataError(prefix string, data *C.pn_data_t) error {
++ err := internal.PnError(unsafe.Pointer(C.pn_data_error(data)))
++ if err != nil {
++ err = internal.Errorf("%s: %s", prefix, err.(internal.Error))
++ }
++ return err
++}
++
++/*
++Marshal encodes a Go value as AMQP data in buffer.
++If buffer is nil, or is not large enough, a new buffer is created.
++
++Returns the buffer used for encoding with len() adjusted to the actual size of data.
++
++Go types are encoded as follows
++
++ +-------------------------------------+--------------------------------------------+
++ |Go type |AMQP type |
++ +-------------------------------------+--------------------------------------------+
++ |bool |bool |
++ +-------------------------------------+--------------------------------------------+
++ |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) |
++ +-------------------------------------+--------------------------------------------+
++ |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) |
++ +-------------------------------------+--------------------------------------------+
++ |float32, float64 |float, double. |
++ +-------------------------------------+--------------------------------------------+
++ |string |string |
++ +-------------------------------------+--------------------------------------------+
++ |[]byte, Binary |binary |
++ +-------------------------------------+--------------------------------------------+
++ |Symbol |symbol |
++ +-------------------------------------+--------------------------------------------+
++ |interface{} |the contained type |
++ +-------------------------------------+--------------------------------------------+
++ |nil |null |
++ +-------------------------------------+--------------------------------------------+
++ |map[K]T |map with K and T converted as above |
++ +-------------------------------------+--------------------------------------------+
++ |Map |map, may have mixed types for keys, values |
++ +-------------------------------------+--------------------------------------------+
++ |[]T |list with T converted as above |
++ +-------------------------------------+--------------------------------------------+
++ |List |list, may have mixed types values |
++ +-------------------------------------+--------------------------------------------+
++
++The following Go types cannot be marshaled: uintptr, function, interface, channel
++
++TODO
++
++Go types: array, slice, struct, complex64/128.
++
++AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies.
++
++Described types.
++
++*/
++func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
++ defer doRecover(&err)
++ data := C.pn_data(0)
++ defer C.pn_data_free(data)
++ marshal(v, data)
++ encode := func(buf []byte) ([]byte, error) {
++ n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
++ switch {
++ case n == int(C.PN_OVERFLOW):
++ return buf, overflow
++ case n < 0:
++ return buf, dataError("marshal error", data)
++ default:
++ return buf[:n], nil
++ }
++ }
++ return encodeGrow(buffer, encode)
++}
++
++const minEncode = 256
++
++// overflow is returned when an encoding function can't fit data in the buffer.
++var overflow = internal.Errorf("buffer too small")
++
++// encodeFn encodes into buffer[0:len(buffer)].
++// Returns buffer with length adjusted for data encoded.
++// If buffer too small, returns overflow as error.
++type encodeFn func(buffer []byte) ([]byte, error)
++
++// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
++// Returns the final buffer.
++func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
++ if buffer == nil || len(buffer) == 0 {
++ buffer = make([]byte, minEncode)
++ }
++ var err error
++ for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
++ buffer = make([]byte, 2*len(buffer))
++ }
++ return buffer, err
++}
++
++func marshal(v interface{}, data *C.pn_data_t) {
++ switch v := v.(type) {
++ case nil:
++ C.pn_data_put_null(data)
++ case bool:
++ C.pn_data_put_bool(data, C.bool(v))
++ case int8:
++ C.pn_data_put_byte(data, C.int8_t(v))
++ case int16:
++ C.pn_data_put_short(data, C.int16_t(v))
++ case int32:
++ C.pn_data_put_int(data, C.int32_t(v))
++ case int64:
++ C.pn_data_put_long(data, C.int64_t(v))
++ case int:
++ if unsafe.Sizeof(0) == 8 {
++ C.pn_data_put_long(data, C.int64_t(v))
++ } else {
++ C.pn_data_put_int(data, C.int32_t(v))
++ }
++ case uint8:
++ C.pn_data_put_ubyte(data, C.uint8_t(v))
++ case uint16:
++ C.pn_data_put_ushort(data, C.uint16_t(v))
++ case uint32:
++ C.pn_data_put_uint(data, C.uint32_t(v))
++ case uint64:
++ C.pn_data_put_ulong(data, C.uint64_t(v))
++ case uint:
++ if unsafe.Sizeof(0) == 8 {
++ C.pn_data_put_ulong(data, C.uint64_t(v))
++ } else {
++ C.pn_data_put_uint(data, C.uint32_t(v))
++ }
++ case float32:
++ C.pn_data_put_float(data, C.float(v))
++ case float64:
++ C.pn_data_put_double(data, C.double(v))
++ case string:
++ C.pn_data_put_string(data, pnBytes([]byte(v)))
++ case []byte:
++ C.pn_data_put_binary(data, pnBytes(v))
++ case Binary:
++ C.pn_data_put_binary(data, pnBytes([]byte(v)))
++ case Symbol:
++ C.pn_data_put_symbol(data, pnBytes([]byte(v)))
++ case Map: // Special map type
++ C.pn_data_put_map(data)
++ C.pn_data_enter(data)
++ for key, val := range v {
++ marshal(key, data)
++ marshal(val, data)
++ }
++ C.pn_data_exit(data)
++ default:
++ switch reflect.TypeOf(v).Kind() {
++ case reflect.Map:
++ putMap(data, v)
++ case reflect.Slice:
++ putList(data, v)
++ default:
++ panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
++ }
++ }
++ err := dataError("marshal", data)
++ if err != nil {
++ panic(err)
++ }
++ return
++}
++
++func clearMarshal(v interface{}, data *C.pn_data_t) {
++ C.pn_data_clear(data)
++ marshal(v, data)
++}
++
++func putMap(data *C.pn_data_t, v interface{}) {
++ mapValue := reflect.ValueOf(v)
++ C.pn_data_put_map(data)
++ C.pn_data_enter(data)
++ for _, key := range mapValue.MapKeys() {
++ marshal(key.Interface(), data)
++ marshal(mapValue.MapIndex(key).Interface(), data)
++ }
++ C.pn_data_exit(data)
++}
++
++func putList(data *C.pn_data_t, v interface{}) {
++ listValue := reflect.ValueOf(v)
++ C.pn_data_put_list(data)
++ C.pn_data_enter(data)
++ for i := 0; i < listValue.Len(); i++ {
++ marshal(listValue.Index(i).Interface(), data)
++ }
++ C.pn_data_exit(data)
++}
++
++// Encoder encodes AMQP values to an io.Writer
++type Encoder struct {
++ writer io.Writer
++ buffer []byte
++}
++
++// New encoder returns a new encoder that writes to w.
++func NewEncoder(w io.Writer) *Encoder {
++ return &Encoder{w, make([]byte, minEncode)}
++}
++
++func (e *Encoder) Encode(v interface{}) (err error) {
++ e.buffer, err = Marshal(v, e.buffer)
++ if err == nil {
++ e.writer.Write(e.buffer)
++ }
++ return err
++}
++
++func replace(data *C.pn_data_t, v interface{}) {
++ C.pn_data_clear(data)
++ marshal(v, data)
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/message.go
----------------------------------------------------------------------
diff --cc amqp/message.go
index 0000000,0000000..5ba4f4f
new file mode 100644
--- /dev/null
+++ b/amqp/message.go
@@@ -1,0 -1,0 +1,347 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package amqp
++
++// #include <proton/types.h>
++// #include <proton/message.h>
++// #include <proton/codec.h>
++// #include <stdlib.h>
++//
++// /* Helper for setting message string fields */
++// typedef int (*set_fn)(pn_message_t*, const char*);
++// int msg_set_str(pn_message_t* m, char* s, set_fn set) {
++// int result = set(m, s);
++// free(s);
++// return result;
++// }
++//
++import "C"
++
++import (
++ "qpid.apache.org/internal"
++ "runtime"
++ "time"
++ "unsafe"
++)
++
++// Message is the interface to an AMQP message.
++type Message interface {
++ // Durable indicates that any parties taking responsibility
++ // for the message must durably store the content.
++ Durable() bool
++ SetDurable(bool)
++
++ // Priority impacts ordering guarantees. Within a
++ // given ordered context, higher priority messages may jump ahead of
++ // lower priority messages.
++ Priority() uint8
++ SetPriority(uint8)
++
++ // TTL or Time To Live, a message it may be dropped after this duration
++ TTL() time.Duration
++ SetTTL(time.Duration)
++
++ // FirstAcquirer indicates
++ // that the recipient of the message is the first recipient to acquire
++ // the message, i.e. there have been no failed delivery attempts to
++ // other acquirers. Note that this does not mean the message has not
++ // been delivered to, but not acquired, by other recipients.
++ FirstAcquirer() bool
++ SetFirstAcquirer(bool)
++
++ // DeliveryCount tracks how many attempts have been made to
++ // delivery a message.
++ DeliveryCount() uint32
++ SetDeliveryCount(uint32)
++
++ // MessageId provides a unique identifier for a message.
++ // it can be an a string, an unsigned long, a uuid or a
++ // binary value.
++ MessageId() interface{}
++ SetMessageId(interface{})
++
++ UserId() string
++ SetUserId(string)
++
++ Address() string
++ SetAddress(string)
++
++ Subject() string
++ SetSubject(string)
++
++ ReplyTo() string
++ SetReplyTo(string)
++
++ // CorrelationId is set on correlated request and response messages. It can be
++ // an a string, an unsigned long, a uuid or a binary value.
++ CorrelationId() interface{}
++ SetCorrelationId(interface{})
++
++ ContentType() string
++ SetContentType(string)
++
++ ContentEncoding() string
++ SetContentEncoding(string)
++
++ // ExpiryTime indicates an absoulte time when the message may be dropped.
++ // A Zero time (i.e. t.isZero() == true) indicates a message never expires.
++ ExpiryTime() time.Time
++ SetExpiryTime(time.Time)
++
++ CreationTime() time.Time
++ SetCreationTime(time.Time)
++
++ GroupId() string
++ SetGroupId(string)
++
++ GroupSequence() int32
++ SetGroupSequence(int32)
++
++ ReplyToGroupId() string
++ SetReplyToGroupId(string)
++
++ // Instructions - AMQP delivery instructions.
++ Instructions() map[string]interface{}
++ SetInstructions(v map[string]interface{})
++
++ // Annotations - AMQP annotations.
++ Annotations() map[string]interface{}
++ SetAnnotations(v map[string]interface{})
++
++ // Properties - Application properties.
++ Properties() map[string]interface{}
++ SetProperties(v map[string]interface{})
++
++ // Inferred indicates how the message content
++ // is encoded into AMQP sections. If inferred is true then binary and
++ // list values in the body of the message will be encoded as AMQP DATA
++ // and AMQP SEQUENCE sections, respectively. If inferred is false,
++ // then all values in the body of the message will be encoded as AMQP
++ // VALUE sections regardless of their type.
++ Inferred() bool
++ SetInferred(bool)
++
++ // Marshal a Go value into the message body. See amqp.Marshal() for details.
++ Marshal(interface{})
++
++ // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
++ Unmarshal(interface{})
++
++ // Body value resulting from the default unmarshalling of message body as interface{}
++ Body() interface{}
++
++ // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
++ // the message is encoded into it, otherwise a new buffer is created.
++ // Returns the buffer containing the message.
++ Encode(buffer []byte) ([]byte, error)
++
++ // Decode data into this message. Overwrites an existing message content.
++ Decode(buffer []byte) error
++
++ // Clear the message contents.
++ Clear()
++
++ // Copy the contents of another message to this one.
++ Copy(m Message) error
++}
++
++type message struct{ pn *C.pn_message_t }
++
++func freeMessage(m *message) {
++ C.pn_message_free(m.pn)
++ m.pn = nil
++}
++
++// NewMessage creates a new message instance.
++func NewMessage() Message {
++ m := &message{C.pn_message()}
++ runtime.SetFinalizer(m, freeMessage)
++ return m
++}
++
++// NewMessageWith creates a message with value as the body. Equivalent to
++// m := NewMessage(); m.Marshal(body)
++func NewMessageWith(value interface{}) Message {
++ m := NewMessage()
++ m.Marshal(value)
++ return m
++}
++
++func (m *message) Clear() { C.pn_message_clear(m.pn) }
++
++func (m *message) Copy(x Message) error {
++ if data, err := x.Encode(nil); err == nil {
++ return m.Decode(data)
++ } else {
++ return err
++ }
++}
++
++// ==== message get functions
++
++func rewindGet(data *C.pn_data_t) (v interface{}) {
++ C.pn_data_rewind(data)
++ C.pn_data_next(data)
++ unmarshal(&v, data)
++ return v
++}
++
++func rewindMap(data *C.pn_data_t) (v map[string]interface{}) {
++ C.pn_data_rewind(data)
++ C.pn_data_next(data)
++ unmarshal(&v, data)
++ return v
++}
++
++func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) }
++func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) }
++func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
++func (m *message) TTL() time.Duration {
++ return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
++}
++func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) }
++func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) }
++func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) }
++func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) }
++func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) }
++func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) }
++func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
++func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
++func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) }
++func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
++
++func (m *message) ExpiryTime() time.Time {
++ return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
++}
++func (m *message) CreationTime() time.Time {
++ return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
++}
++func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) }
++func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) }
++func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
++
++func (m *message) Instructions() map[string]interface{} {
++ return rewindMap(C.pn_message_instructions(m.pn))
++}
++func (m *message) Annotations() map[string]interface{} {
++ return rewindMap(C.pn_message_annotations(m.pn))
++}
++func (m *message) Properties() map[string]interface{} {
++ return rewindMap(C.pn_message_properties(m.pn))
++}
++
++// ==== message set methods
++
++func setData(v interface{}, data *C.pn_data_t) {
++ C.pn_data_clear(data)
++ marshal(v, data)
++}
++
++func dataString(data *C.pn_data_t) string {
++ str := C.pn_string(C.CString(""))
++ defer C.pn_free(unsafe.Pointer(str))
++ C.pn_inspect(unsafe.Pointer(data), str)
++ return C.GoString(C.pn_string_get(str))
++}
++
++func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(m.Inferred())) }
++func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) }
++func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
++func (m *message) SetTTL(d time.Duration) {
++ C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
++}
++func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
++func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
++func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
++func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
++func (m *message) SetAddress(s string) {
++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
++}
++func (m *message) SetSubject(s string) {
++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
++}
++func (m *message) SetReplyTo(s string) {
++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
++}
++func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
++func (m *message) SetContentType(s string) {
++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
++}
++func (m *message) SetContentEncoding(s string) {
++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
++}
++func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
++func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
++func (m *message) SetGroupId(s string) {
++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
++}
++func (m *message) SetGroupSequence(s int32) {
++ C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
++}
++func (m *message) SetReplyToGroupId(s string) {
++ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
++}
++
++func (m *message) SetInstructions(v map[string]interface{}) {
++ setData(v, C.pn_message_instructions(m.pn))
++}
++func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) }
++func (m *message) SetProperties(v map[string]interface{}) { setData(v, C.pn_message_properties(m.pn)) }
++
++// Marshal/Unmarshal body
++func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) }
++func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) }
++func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return }
++
++func (m *message) Decode(data []byte) error {
++ m.Clear()
++ if len(data) == 0 {
++ return internal.Errorf("empty buffer for decode")
++ }
++ if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
++ return internal.Errorf("decoding message: %s",
++ internal.PnError(unsafe.Pointer(C.pn_message_error(m.pn))))
++ }
++ return nil
++}
++
++func DecodeMessage(data []byte) (m Message, err error) {
++ m = NewMessage()
++ err = m.Decode(data)
++ return
++}
++
++func (m *message) Encode(buffer []byte) ([]byte, error) {
++ encode := func(buf []byte) ([]byte, error) {
++ len := cLen(buf)
++ result := C.pn_message_encode(m.pn, cPtr(buf), &len)
++ switch {
++ case result == C.PN_OVERFLOW:
++ return buf, overflow
++ case result < 0:
++ return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result))
++ default:
++ return buf[:len], nil
++ }
++ }
++ return encodeGrow(buffer, encode)
++}
++
++// TODO aconway 2015-09-14: Multi-section messages.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/message_test.go
----------------------------------------------------------------------
diff --cc amqp/message_test.go
index 0000000,0000000..7a6e5a8
new file mode 100644
--- /dev/null
+++ b/amqp/message_test.go
@@@ -1,0 -1,0 +1,166 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package amqp
++
++import (
++ "testing"
++ "time"
++)
++
++func roundTrip(m Message) error {
++ buffer, err := m.Encode(nil)
++ if err != nil {
++ return err
++ }
++ m2, err := DecodeMessage(buffer)
++ if err != nil {
++ return err
++ }
++ return checkEqual(m, m2)
++}
++
++func TestDefaultMessage(t *testing.T) {
++ m := NewMessage()
++ // Check defaults
++ for _, data := range [][]interface{}{
++ {m.Inferred(), false},
++ {m.Durable(), false},
++ {m.Priority(), uint8(4)},
++ {m.TTL(), time.Duration(0)},
++ {m.UserId(), ""},
++ {m.Address(), ""},
++ {m.Subject(), ""},
++ {m.ReplyTo(), ""},
++ {m.ContentType(), ""},
++ {m.ContentEncoding(), ""},
++ {m.GroupId(), ""},
++ {m.GroupSequence(), int32(0)},
++ {m.ReplyToGroupId(), ""},
++ {m.MessageId(), nil},
++ {m.CorrelationId(), nil},
++ {m.Instructions(), map[string]interface{}{}},
++ {m.Annotations(), map[string]interface{}{}},
++ {m.Properties(), map[string]interface{}{}},
++ {m.Body(), nil},
++ } {
++ if err := checkEqual(data[0], data[1]); err != nil {
++ t.Error(err)
++ }
++ }
++ if err := roundTrip(m); err != nil {
++ t.Error(err)
++ }
++}
++
++func TestMessageRoundTrip(t *testing.T) {
++ m := NewMessage()
++ m.SetInferred(false)
++ m.SetDurable(true)
++ m.SetPriority(42)
++ m.SetTTL(0)
++ m.SetUserId("user")
++ m.SetAddress("address")
++ m.SetSubject("subject")
++ m.SetReplyTo("replyto")
++ m.SetContentType("content")
++ m.SetContentEncoding("encoding")
++ m.SetGroupId("group")
++ m.SetGroupSequence(42)
++ m.SetReplyToGroupId("replytogroup")
++ m.SetMessageId("id")
++ m.SetCorrelationId("correlation")
++ m.SetInstructions(map[string]interface{}{"instructions": "foo"})
++ m.SetAnnotations(map[string]interface{}{"annotations": "foo"})
++ m.SetProperties(map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"})
++ m.Marshal("hello")
++
++ for _, data := range [][]interface{}{
++ {m.Inferred(), false},
++ {m.Durable(), true},
++ {m.Priority(), uint8(42)},
++ {m.TTL(), time.Duration(0)},
++ {m.UserId(), "user"},
++ {m.Address(), "address"},
++ {m.Subject(), "subject"},
++ {m.ReplyTo(), "replyto"},
++ {m.ContentType(), "content"},
++ {m.ContentEncoding(), "encoding"},
++ {m.GroupId(), "group"},
++ {m.GroupSequence(), int32(42)},
++ {m.ReplyToGroupId(), "replytogroup"},
++ {m.MessageId(), "id"},
++ {m.CorrelationId(), "correlation"},
++ {m.Instructions(), map[string]interface{}{"instructions": "foo"}},
++ {m.Annotations(), map[string]interface{}{"annotations": "foo"}},
++ {m.Properties(), map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}},
++ {m.Body(), "hello"},
++ } {
++ if err := checkEqual(data[0], data[1]); err != nil {
++ t.Error(err)
++ }
++ }
++ if err := roundTrip(m); err != nil {
++ t.Error(err)
++ }
++}
++
++func TestMessageBodyTypes(t *testing.T) {
++ var s string
++ var body interface{}
++ var i int64
++
++ m := NewMessageWith(int64(42))
++ m.Unmarshal(&body)
++ m.Unmarshal(&i)
++ if err := checkEqual(body.(int64), int64(42)); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual(i, int64(42)); err != nil {
++ t.Error(err)
++ }
++
++ m = NewMessageWith("hello")
++ m.Unmarshal(&s)
++ m.Unmarshal(&body)
++ if err := checkEqual(s, "hello"); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual(body.(string), "hello"); err != nil {
++ t.Error(err)
++ }
++ if err := roundTrip(m); err != nil {
++ t.Error(err)
++ }
++
++ m = NewMessageWith(Binary("bin"))
++ m.Unmarshal(&s)
++ m.Unmarshal(&body)
++ if err := checkEqual(body.(Binary), Binary("bin")); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual(s, "bin"); err != nil {
++ t.Error(err)
++ }
++ if err := roundTrip(m); err != nil {
++ t.Error(err)
++ }
++
++ // TODO aconway 2015-09-08: array etc.
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/types.go
----------------------------------------------------------------------
diff --cc amqp/types.go
index 0000000,0000000..796da66
new file mode 100644
--- /dev/null
+++ b/amqp/types.go
@@@ -1,0 -1,0 +1,199 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package amqp
++
++// #include <proton/codec.h>
++import "C"
++
++import (
++ "bytes"
++ "fmt"
++ "reflect"
++ "time"
++ "unsafe"
++)
++
++type Type C.pn_type_t
++
++func (t Type) String() string {
++ switch C.pn_type_t(t) {
++ case C.PN_NULL:
++ return "null"
++ case C.PN_BOOL:
++ return "bool"
++ case C.PN_UBYTE:
++ return "ubyte"
++ case C.PN_BYTE:
++ return "byte"
++ case C.PN_USHORT:
++ return "ushort"
++ case C.PN_SHORT:
++ return "short"
++ case C.PN_CHAR:
++ return "char"
++ case C.PN_UINT:
++ return "uint"
++ case C.PN_INT:
++ return "int"
++ case C.PN_ULONG:
++ return "ulong"
++ case C.PN_LONG:
++ return "long"
++ case C.PN_TIMESTAMP:
++ return "timestamp"
++ case C.PN_FLOAT:
++ return "float"
++ case C.PN_DOUBLE:
++ return "double"
++ case C.PN_DECIMAL32:
++ return "decimal32"
++ case C.PN_DECIMAL64:
++ return "decimal64"
++ case C.PN_DECIMAL128:
++ return "decimal128"
++ case C.PN_UUID:
++ return "uuid"
++ case C.PN_BINARY:
++ return "binary"
++ case C.PN_STRING:
++ return "string"
++ case C.PN_SYMBOL:
++ return "symbol"
++ case C.PN_DESCRIBED:
++ return "described"
++ case C.PN_ARRAY:
++ return "array"
++ case C.PN_LIST:
++ return "list"
++ case C.PN_MAP:
++ return "map"
++ default:
++ if uint32(t) == uint32(C.PN_INVALID) {
++ return "no-data"
++ }
++ return fmt.Sprintf("unknown-type(%d)", t)
++ }
++}
++
++// Go types
++var (
++ bytesType = reflect.TypeOf([]byte{})
++ valueType = reflect.TypeOf(reflect.Value{})
++)
++
++// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys.
++
++// Map is a generic map that can have mixed key and value types and so can represent any AMQP map
++type Map map[interface{}]interface{}
++
++// List is a generic list that can hold mixed values and can represent any AMQP list.
++//
++type List []interface{}
++
++// Symbol is a string that is encoded as an AMQP symbol
++type Symbol string
++
++func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) }
++
++// Binary is a string that is encoded as an AMQP binary.
++// It is a string rather than a byte[] because byte[] is not hashable and can't be used as
++// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte
++type Binary string
++
++func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) }
++
++// GoString for Map prints values with their types, useful for debugging.
++func (m Map) GoString() string {
++ out := &bytes.Buffer{}
++ fmt.Fprintf(out, "%T{", m)
++ i := len(m)
++ for k, v := range m {
++ fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v)
++ i--
++ if i > 0 {
++ fmt.Fprint(out, ", ")
++ }
++ }
++ fmt.Fprint(out, "}")
++ return out.String()
++}
++
++// GoString for List prints values with their types, useful for debugging.
++func (l List) GoString() string {
++ out := &bytes.Buffer{}
++ fmt.Fprintf(out, "%T{", l)
++ for i := 0; i < len(l); i++ {
++ fmt.Fprintf(out, "%T(%#v)", l[i], l[i])
++ if i == len(l)-1 {
++ fmt.Fprint(out, ", ")
++ }
++ }
++ fmt.Fprint(out, "}")
++ return out.String()
++}
++
++// pnTime converts Go time.Time to Proton millisecond Unix time.
++func pnTime(t time.Time) C.pn_timestamp_t {
++ secs := t.Unix()
++ // Note: sub-second accuracy is not guaraunteed if the Unix time in
++ // nanoseconds cannot be represented by an int64 (sometime around year 2260)
++ msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
++ return C.pn_timestamp_t(secs*1000 + msecs)
++}
++
++// goTime converts a pn_timestamp_t to a Go time.Time.
++func goTime(t C.pn_timestamp_t) time.Time {
++ secs := int64(t) / 1000
++ nsecs := (int64(t) % 1000) * int64(time.Millisecond)
++ return time.Unix(secs, nsecs)
++}
++
++func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
++ if cBytes.start != nil {
++ bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size))
++ }
++ return
++}
++
++func goString(cBytes C.pn_bytes_t) (str string) {
++ if cBytes.start != nil {
++ str = C.GoStringN(cBytes.start, C.int(cBytes.size))
++ }
++ return
++}
++
++func pnBytes(b []byte) C.pn_bytes_t {
++ if len(b) == 0 {
++ return C.pn_bytes_t{0, nil}
++ } else {
++ return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))}
++ }
++}
++
++func cPtr(b []byte) *C.char {
++ if len(b) == 0 {
++ return nil
++ }
++ return (*C.char)(unsafe.Pointer(&b[0]))
++}
++
++func cLen(b []byte) C.size_t {
++ return C.size_t(len(b))
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/unmarshal.go
----------------------------------------------------------------------
diff --cc amqp/unmarshal.go
index 0000000,0000000..751921d
new file mode 100644
--- /dev/null
+++ b/amqp/unmarshal.go
@@@ -1,0 -1,0 +1,558 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++oor more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package amqp
++
++// #include <proton/codec.h>
++import "C"
++
++import (
++ "bytes"
++ "fmt"
++ "io"
++ "qpid.apache.org/internal"
++ "reflect"
++ "unsafe"
++)
++
++const minDecode = 1024
++
++// Error returned if AMQP data cannot be unmarshaled as the desired Go type.
++type UnmarshalError struct {
++ // The name of the AMQP type.
++ AMQPType string
++ // The Go type.
++ GoType reflect.Type
++}
++
++func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
++ return &UnmarshalError{Type(pnType).String(), reflect.TypeOf(v)}
++}
++
++func (e UnmarshalError) Error() string {
++ if e.GoType.Kind() != reflect.Ptr {
++ return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType)
++ } else {
++ return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
++ }
++}
++
++func doRecover(err *error) {
++ r := recover()
++ switch r := r.(type) {
++ case nil:
++ case *UnmarshalError, internal.Error:
++ *err = r.(error)
++ default:
++ panic(r)
++ }
++}
++
++//
++// Decoding from a pn_data_t
++//
++// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
++// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
++//
++
++// Decoder decodes AMQP values from an io.Reader.
++//
++type Decoder struct {
++ reader io.Reader
++ buffer bytes.Buffer
++}
++
++// NewDecoder returns a new decoder that reads from r.
++//
++// The decoder has it's own buffer and may read more data than required for the
++// AMQP values requested. Use Buffered to see if there is data left in the
++// buffer.
++//
++func NewDecoder(r io.Reader) *Decoder {
++ return &Decoder{r, bytes.Buffer{}}
++}
++
++// Buffered returns a reader of the data remaining in the Decoder's buffer. The
++// reader is valid until the next call to Decode.
++//
++func (d *Decoder) Buffered() io.Reader {
++ return bytes.NewReader(d.buffer.Bytes())
++}
++
++// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
++//
++// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
++//
++func (d *Decoder) Decode(v interface{}) (err error) {
++ defer doRecover(&err)
++ data := C.pn_data(0)
++ defer C.pn_data_free(data)
++ var n int
++ for n == 0 && err == nil {
++ n = decode(data, d.buffer.Bytes())
++ if n == 0 { // n == 0 means not enough data, read more
++ err = d.more()
++ } else {
++ unmarshal(v, data)
++ }
++ }
++ d.buffer.Next(n)
++ return
++}
++
++/*
++Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
++Types are converted as follows:
++
++ +---------------------------+----------------------------------------------------------------------+
++ |To Go types |From AMQP types |
++ +===========================+======================================================================+
++ |bool |bool |
++ +---------------------------+----------------------------------------------------------------------+
++ |int, int8, int16, |Equivalent or smaller signed integer type: byte, short, int, long. |
++ |int32, int64 | |
++ +---------------------------+----------------------------------------------------------------------+
++ |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: ubyte, ushort, uint, |
++ |uint32, uint64 types |ulong |
++ +---------------------------+----------------------------------------------------------------------+
++ |float32, float64 |Equivalent or smaller float or double. |
++ +---------------------------+----------------------------------------------------------------------+
++ |string, []byte |string, symbol or binary. |
++ +---------------------------+----------------------------------------------------------------------+
++ |Symbol |symbol |
++ +---------------------------+----------------------------------------------------------------------+
++ |map[K]T |map, provided all keys and values can unmarshal to types K, T |
++ +---------------------------+----------------------------------------------------------------------+
++ |Map |map, any AMQP map |
++ +---------------------------+----------------------------------------------------------------------+
++ |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: |
++ | +------------------------+---------------------------------------------+
++ | |AMQP Type |Go Type in interface{} |
++ | +========================+=============================================+
++ | |bool |bool |
++ | +------------------------+---------------------------------------------+
++ | |byte,short,int,long |int8,int16,int32,int64 |
++ | +------------------------+---------------------------------------------+
++ | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 |
++ | +------------------------+---------------------------------------------+
++ | |float, double |float32, float64 |
++ | +------------------------+---------------------------------------------+
++ | |string |string |
++ | +------------------------+---------------------------------------------+
++ | |symbol |Symbol |
++ | +------------------------+---------------------------------------------+
++ | |binary |Binary |
++ | +------------------------+---------------------------------------------+
++ | |nulll |nil |
++ | +------------------------+---------------------------------------------+
++ | |map |Map |
++ | +------------------------+---------------------------------------------+
++ | |list |List |
++ +---------------------------+------------------------+---------------------------------------------+
++
++The following Go types cannot be unmarshaled: uintptr, function, interface, channel.
++
++TODO
++
++Go types: array, struct.
++
++AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies.
++
++AMQP maps with mixed/unhashable key types need an alternate representation.
++
++Described types.
++*/
++func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
++ defer doRecover(&err)
++
++ data := C.pn_data(0)
++ defer C.pn_data_free(data)
++ n = decode(data, bytes)
++ if n == 0 {
++ err = internal.Errorf("not enough data")
++ } else {
++ unmarshal(v, data)
++ }
++ return
++}
++
++// more reads more data when we can't parse a complete AMQP type
++func (d *Decoder) more() error {
++ var readSize int64 = minDecode
++ if int64(d.buffer.Len()) > readSize { // Grow by doubling
++ readSize = int64(d.buffer.Len())
++ }
++ var n int64
++ n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
++ if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
++ err = io.EOF
++ }
++ return err
++}
++
++// Unmarshal from data into value pointed at by v.
++func unmarshal(v interface{}, data *C.pn_data_t) {
++ pnType := C.pn_data_type(data)
++ switch v := v.(type) {
++ case *bool:
++ switch pnType {
++ case C.PN_BOOL:
++ *v = bool(C.pn_data_get_bool(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++ case *int8:
++ switch pnType {
++ case C.PN_CHAR:
++ *v = int8(C.pn_data_get_char(data))
++ case C.PN_BYTE:
++ *v = int8(C.pn_data_get_byte(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++ case *uint8:
++ switch pnType {
++ case C.PN_CHAR:
++ *v = uint8(C.pn_data_get_char(data))
++ case C.PN_UBYTE:
++ *v = uint8(C.pn_data_get_ubyte(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++ case *int16:
++ switch pnType {
++ case C.PN_CHAR:
++ *v = int16(C.pn_data_get_char(data))
++ case C.PN_BYTE:
++ *v = int16(C.pn_data_get_byte(data))
++ case C.PN_SHORT:
++ *v = int16(C.pn_data_get_short(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++ case *uint16:
++ switch pnType {
++ case C.PN_CHAR:
++ *v = uint16(C.pn_data_get_char(data))
++ case C.PN_UBYTE:
++ *v = uint16(C.pn_data_get_ubyte(data))
++ case C.PN_USHORT:
++ *v = uint16(C.pn_data_get_ushort(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++ case *int32:
++ switch pnType {
++ case C.PN_CHAR:
++ *v = int32(C.pn_data_get_char(data))
++ case C.PN_BYTE:
++ *v = int32(C.pn_data_get_byte(data))
++ case C.PN_SHORT:
++ *v = int32(C.pn_data_get_short(data))
++ case C.PN_INT:
++ *v = int32(C.pn_data_get_int(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++ case *uint32:
++ switch pnType {
++ case C.PN_CHAR:
++ *v = uint32(C.pn_data_get_char(data))
++ case C.PN_UBYTE:
++ *v = uint32(C.pn_data_get_ubyte(data))
++ case C.PN_USHORT:
++ *v = uint32(C.pn_data_get_ushort(data))
++ case C.PN_UINT:
++ *v = uint32(C.pn_data_get_uint(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ case *int64:
++ switch pnType {
++ case C.PN_CHAR:
++ *v = int64(C.pn_data_get_char(data))
++ case C.PN_BYTE:
++ *v = int64(C.pn_data_get_byte(data))
++ case C.PN_SHORT:
++ *v = int64(C.pn_data_get_short(data))
++ case C.PN_INT:
++ *v = int64(C.pn_data_get_int(data))
++ case C.PN_LONG:
++ *v = int64(C.pn_data_get_long(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ case *uint64:
++ switch pnType {
++ case C.PN_CHAR:
++ *v = uint64(C.pn_data_get_char(data))
++ case C.PN_UBYTE:
++ *v = uint64(C.pn_data_get_ubyte(data))
++ case C.PN_USHORT:
++ *v = uint64(C.pn_data_get_ushort(data))
++ case C.PN_ULONG:
++ *v = uint64(C.pn_data_get_ulong(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ case *int:
++ switch pnType {
++ case C.PN_CHAR:
++ *v = int(C.pn_data_get_char(data))
++ case C.PN_BYTE:
++ *v = int(C.pn_data_get_byte(data))
++ case C.PN_SHORT:
++ *v = int(C.pn_data_get_short(data))
++ case C.PN_INT:
++ *v = int(C.pn_data_get_int(data))
++ case C.PN_LONG:
++ if unsafe.Sizeof(0) == 8 {
++ *v = int(C.pn_data_get_long(data))
++ } else {
++ panic(newUnmarshalError(pnType, v))
++ }
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ case *uint:
++ switch pnType {
++ case C.PN_CHAR:
++ *v = uint(C.pn_data_get_char(data))
++ case C.PN_UBYTE:
++ *v = uint(C.pn_data_get_ubyte(data))
++ case C.PN_USHORT:
++ *v = uint(C.pn_data_get_ushort(data))
++ case C.PN_UINT:
++ *v = uint(C.pn_data_get_uint(data))
++ case C.PN_ULONG:
++ if unsafe.Sizeof(0) == 8 {
++ *v = uint(C.pn_data_get_ulong(data))
++ } else {
++ panic(newUnmarshalError(pnType, v))
++ }
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ case *float32:
++ switch pnType {
++ case C.PN_FLOAT:
++ *v = float32(C.pn_data_get_float(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ case *float64:
++ switch pnType {
++ case C.PN_FLOAT:
++ *v = float64(C.pn_data_get_float(data))
++ case C.PN_DOUBLE:
++ *v = float64(C.pn_data_get_double(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ case *string:
++ switch pnType {
++ case C.PN_STRING:
++ *v = goString(C.pn_data_get_string(data))
++ case C.PN_SYMBOL:
++ *v = goString(C.pn_data_get_symbol(data))
++ case C.PN_BINARY:
++ *v = goString(C.pn_data_get_binary(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ case *[]byte:
++ switch pnType {
++ case C.PN_STRING:
++ *v = goBytes(C.pn_data_get_string(data))
++ case C.PN_SYMBOL:
++ *v = goBytes(C.pn_data_get_symbol(data))
++ case C.PN_BINARY:
++ *v = goBytes(C.pn_data_get_binary(data))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ case *Binary:
++ switch pnType {
++ case C.PN_BINARY:
++ *v = Binary(goBytes(C.pn_data_get_binary(data)))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ case *Symbol:
++ switch pnType {
++ case C.PN_SYMBOL:
++ *v = Symbol(goBytes(C.pn_data_get_symbol(data)))
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ case *interface{}:
++ getInterface(data, v)
++
++ default:
++ if reflect.TypeOf(v).Kind() != reflect.Ptr {
++ panic(newUnmarshalError(pnType, v))
++ }
++ switch reflect.TypeOf(v).Elem().Kind() {
++ case reflect.Map:
++ getMap(data, v)
++ case reflect.Slice:
++ getList(data, v)
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++ }
++ err := dataError("unmarshaling", data)
++ if err != nil {
++ panic(err)
++ }
++ return
++}
++
++func rewindUnmarshal(v interface{}, data *C.pn_data_t) {
++ C.pn_data_rewind(data)
++ C.pn_data_next(data)
++ unmarshal(v, data)
++}
++
++// Getting into an interface is driven completely by the AMQP type, since the interface{}
++// target is type-neutral.
++func getInterface(data *C.pn_data_t, v *interface{}) {
++ pnType := C.pn_data_type(data)
++ switch pnType {
++ // Note PN_INVALID is defined outside the enum, older Go versions don't consider it a C.pn_type_t
++ case C.PN_NULL, C.pn_type_t(C.PN_INVALID): // No data.
++ *v = nil
++ case C.PN_BOOL:
++ *v = bool(C.pn_data_get_bool(data))
++ case C.PN_UBYTE:
++ *v = uint8(C.pn_data_get_ubyte(data))
++ case C.PN_BYTE:
++ *v = int8(C.pn_data_get_byte(data))
++ case C.PN_USHORT:
++ *v = uint16(C.pn_data_get_ushort(data))
++ case C.PN_SHORT:
++ *v = int16(C.pn_data_get_short(data))
++ case C.PN_UINT:
++ *v = uint32(C.pn_data_get_uint(data))
++ case C.PN_INT:
++ *v = int32(C.pn_data_get_int(data))
++ case C.PN_CHAR:
++ *v = uint8(C.pn_data_get_char(data))
++ case C.PN_ULONG:
++ *v = uint64(C.pn_data_get_ulong(data))
++ case C.PN_LONG:
++ *v = int64(C.pn_data_get_long(data))
++ case C.PN_FLOAT:
++ *v = float32(C.pn_data_get_float(data))
++ case C.PN_DOUBLE:
++ *v = float64(C.pn_data_get_double(data))
++ case C.PN_BINARY:
++ *v = Binary(goBytes(C.pn_data_get_binary(data)))
++ case C.PN_STRING:
++ *v = goString(C.pn_data_get_string(data))
++ case C.PN_SYMBOL:
++ *v = Symbol(goString(C.pn_data_get_symbol(data)))
++ case C.PN_MAP:
++ m := make(Map)
++ unmarshal(&m, data)
++ *v = m
++ case C.PN_LIST:
++ l := make(List, 0)
++ unmarshal(&l, data)
++ *v = l
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++}
++
++// get into map pointed at by v
++func getMap(data *C.pn_data_t, v interface{}) {
++ mapValue := reflect.ValueOf(v).Elem()
++ mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map
++ switch pnType := C.pn_data_type(data); pnType {
++ case C.PN_MAP:
++ count := int(C.pn_data_get_map(data))
++ if bool(C.pn_data_enter(data)) {
++ defer C.pn_data_exit(data)
++ for i := 0; i < count/2; i++ {
++ if bool(C.pn_data_next(data)) {
++ key := reflect.New(mapValue.Type().Key())
++ unmarshal(key.Interface(), data)
++ if bool(C.pn_data_next(data)) {
++ val := reflect.New(mapValue.Type().Elem())
++ unmarshal(val.Interface(), data)
++ mapValue.SetMapIndex(key.Elem(), val.Elem())
++ }
++ }
++ }
++ }
++ // Note PN_INVALID is defined outside the enum, older Go versions don't consider it a C.pn_type_t
++ case C.pn_type_t(C.PN_INVALID): // Leave the map empty
++ default:
++ panic(newUnmarshalError(pnType, v))
++ }
++}
++
++func getList(data *C.pn_data_t, v interface{}) {
++ pnType := C.pn_data_type(data)
++ if pnType != C.PN_LIST {
++ panic(newUnmarshalError(pnType, v))
++ }
++ count := int(C.pn_data_get_list(data))
++ listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count)
++ if bool(C.pn_data_enter(data)) {
++ for i := 0; i < count; i++ {
++ if bool(C.pn_data_next(data)) {
++ val := reflect.New(listValue.Type().Elem())
++ unmarshal(val.Interface(), data)
++ listValue.Index(i).Set(val.Elem())
++ }
++ }
++ C.pn_data_exit(data)
++ }
++ reflect.ValueOf(v).Elem().Set(listValue)
++}
++
++// decode from bytes.
++// Return bytes decoded or 0 if we could not decode a complete object.
++//
++func decode(data *C.pn_data_t, bytes []byte) int {
++ if len(bytes) == 0 {
++ return 0
++ }
++ n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes)))
++ if n == int(C.PN_UNDERFLOW) {
++ C.pn_error_clear(C.pn_data_error(data))
++ return 0
++ } else if n <= 0 {
++ panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n)))
++ }
++ return n
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/url.go
----------------------------------------------------------------------
diff --cc amqp/url.go
index 0000000,0000000..0d0c662
new file mode 100644
--- /dev/null
+++ b/amqp/url.go
@@@ -1,0 -1,0 +1,96 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package amqp
++
++/*
++#include <stdlib.h>
++#include <string.h>
++#include <proton/url.h>
++
++// Helper function for setting URL fields.
++typedef void (*setter_fn)(pn_url_t* url, const char* value);
++inline void set(pn_url_t *url, setter_fn s, const char* value) {
++ s(url, value);
++}
++*/
++import "C"
++
++import (
++ "net"
++ "net/url"
++ "qpid.apache.org/internal"
++ "unsafe"
++)
++
++const (
++ amqp string = "amqp"
++ amqps = "amqps"
++)
++
++// ParseUrl parses an AMQP URL string and returns a net/url.Url.
++//
++// It is more forgiving than net/url.Parse and allows most of the parts of the
++// URL to be missing, assuming AMQP defaults.
++//
++func ParseURL(s string) (u *url.URL, err error) {
++ cstr := C.CString(s)
++ defer C.free(unsafe.Pointer(cstr))
++ pnUrl := C.pn_url_parse(cstr)
++ if pnUrl == nil {
++ return nil, internal.Errorf("bad URL %#v", s)
++ }
++ defer C.pn_url_free(pnUrl)
++
++ scheme := C.GoString(C.pn_url_get_scheme(pnUrl))
++ username := C.GoString(C.pn_url_get_username(pnUrl))
++ password := C.GoString(C.pn_url_get_password(pnUrl))
++ host := C.GoString(C.pn_url_get_host(pnUrl))
++ port := C.GoString(C.pn_url_get_port(pnUrl))
++ path := C.GoString(C.pn_url_get_path(pnUrl))
++
++ if err != nil {
++ return nil, internal.Errorf("bad URL %#v: %s", s, err)
++ }
++ if scheme == "" {
++ scheme = amqp
++ }
++ if port == "" {
++ if scheme == amqps {
++ port = amqps
++ } else {
++ port = amqp
++ }
++ }
++ var user *url.Userinfo
++ if password != "" {
++ user = url.UserPassword(username, password)
++ } else if username != "" {
++ user = url.User(username)
++ }
++
++ u = &url.URL{
++ Scheme: scheme,
++ User: user,
++ Host: net.JoinHostPort(host, port),
++ Path: path,
++ }
++
++ return u, nil
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/amqp/url_test.go
----------------------------------------------------------------------
diff --cc amqp/url_test.go
index 0000000,0000000..f80f1c4
new file mode 100644
--- /dev/null
+++ b/amqp/url_test.go
@@@ -1,0 -1,0 +1,51 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package amqp
++
++import (
++ "fmt"
++)
++
++func ExampleParseURL() {
++ for _, s := range []string{
++ "amqp://username:password@host:1234/path",
++ "host:1234",
++ "host",
++ ":1234",
++ "host/path",
++ "amqps://host",
++ "",
++ } {
++ u, err := ParseURL(s)
++ if err != nil {
++ fmt.Println(err)
++ } else {
++ fmt.Println(u)
++ }
++ }
++ // Output:
++ // amqp://username:password@host:1234/path
++ // amqp://host:1234
++ // amqp://host:amqp
++ // amqp://:1234
++ // amqp://host:amqp/path
++ // amqps://host:amqps
++ // proton: bad URL ""
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/connection.go
----------------------------------------------------------------------
diff --cc electron/connection.go
index 0000000,0000000..d6761d6
new file mode 100644
--- /dev/null
+++ b/electron/connection.go
@@@ -1,0 -1,0 +1,218 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++// #include <proton/disposition.h>
++import "C"
++
++import (
++ "net"
++ "qpid.apache.org/amqp"
++ "qpid.apache.org/internal"
++ "qpid.apache.org/proton"
++ "sync"
++ "time"
++)
++
++// Connection is an AMQP connection, created by a Container.
++type Connection interface {
++ Endpoint
++
++ // Sender opens a new sender on the DefaultSession.
++ //
++ // v can be a string, which is used as the Target address, or a SenderSettings
++ // struct containing more details settings.
++ Sender(...LinkSetting) (Sender, error)
++
++ // Receiver opens a new Receiver on the DefaultSession().
++ //
++ // v can be a string, which is used as the
++ // Source address, or a ReceiverSettings struct containing more details
++ // settings.
++ Receiver(...LinkSetting) (Receiver, error)
++
++ // DefaultSession() returns a default session for the connection. It is opened
++ // on the first call to DefaultSession and returned on subsequent calls.
++ DefaultSession() (Session, error)
++
++ // Session opens a new session.
++ Session(...SessionSetting) (Session, error)
++
++ // Container for the connection.
++ Container() Container
++
++ // Disconnect the connection abruptly with an error.
++ Disconnect(error)
++
++ // Wait waits for the connection to be disconnected.
++ Wait() error
++
++ // WaitTimeout is like Wait but returns Timeout if the timeout expires.
++ WaitTimeout(time.Duration) error
++}
++
++// ConnectionSetting can be passed when creating a connection.
++// See functions that return ConnectionSetting for details
++type ConnectionSetting func(*connection)
++
++// Server setting puts the connection in server mode.
++//
++// A server connection will do protocol negotiation to accept a incoming AMQP
++// connection. Normally you would call this for a connection created by
++// net.Listener.Accept()
++//
++func Server() ConnectionSetting { return func(c *connection) { c.engine.Server() } }
++
++// Accepter provides a function to be called when a connection receives an incoming
++// request to open an endpoint, one of IncomingSession, IncomingSender or IncomingReceiver.
++//
++// The accept() function must not block or use the accepted endpoint.
++// It can pass the endpoint to another goroutine for processing.
++//
++// By default all incoming endpoints are rejected.
++func Accepter(accept func(Incoming)) ConnectionSetting {
++ return func(c *connection) { c.accept = accept }
++}
++
++type connection struct {
++ endpoint
++ listenOnce, defaultSessionOnce, closeOnce sync.Once
++
++ container *container
++ conn net.Conn
++ accept func(Incoming)
++ handler *handler
++ engine *proton.Engine
++ err internal.ErrorHolder
++ eConnection proton.Connection
++
++ defaultSession Session
++ done chan struct{}
++}
++
++func newConnection(conn net.Conn, cont *container, setting ...ConnectionSetting) (*connection, error) {
++ c := &connection{container: cont, conn: conn, accept: func(Incoming) {}, done: make(chan struct{})}
++ c.handler = newHandler(c)
++ var err error
++ c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
++ if err != nil {
++ return nil, err
++ }
++ for _, set := range setting {
++ set(c)
++ }
++ c.str = c.engine.String()
++ c.eConnection = c.engine.Connection()
++ go func() { c.engine.Run(); close(c.done) }()
++ return c, nil
++}
++
++func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
++
++func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) }
++
++func (c *connection) Session(setting ...SessionSetting) (Session, error) {
++ var s Session
++ err := c.engine.InjectWait(func() error {
++ eSession, err := c.engine.Connection().Session()
++ if err == nil {
++ eSession.Open()
++ if err == nil {
++ s = newSession(c, eSession, setting...)
++ }
++ }
++ return err
++ })
++ return s, err
++}
++
++func (c *connection) Container() Container { return c.container }
++
++func (c *connection) DefaultSession() (s Session, err error) {
++ c.defaultSessionOnce.Do(func() {
++ c.defaultSession, err = c.Session()
++ })
++ if err == nil {
++ err = c.Error()
++ }
++ return c.defaultSession, err
++}
++
++func (c *connection) Sender(setting ...LinkSetting) (Sender, error) {
++ if s, err := c.DefaultSession(); err == nil {
++ return s.Sender(setting...)
++ } else {
++ return nil, err
++ }
++}
++
++func (c *connection) Receiver(setting ...LinkSetting) (Receiver, error) {
++ if s, err := c.DefaultSession(); err == nil {
++ return s.Receiver(setting...)
++ } else {
++ return nil, err
++ }
++}
++
++func (c *connection) Connection() Connection { return c }
++
++func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
++func (c *connection) WaitTimeout(timeout time.Duration) error {
++ _, err := timedReceive(c.done, timeout)
++ if err == Timeout {
++ return Timeout
++ }
++ return c.Error()
++}
++
++// Incoming is the interface for incoming requests to open an endpoint.
++// Implementing types are IncomingSession, IncomingSender and IncomingReceiver.
++type Incoming interface {
++ // Accept the endpoint with default settings.
++ //
++ // You must not use the returned endpoint in the accept() function that
++ // receives the Incoming value, but you can pass it to other goroutines.
++ //
++ // Implementing types provide type-specific Accept functions that take additional settings.
++ Accept() Endpoint
++
++ // Reject the endpoint with an error
++ Reject(error)
++
++ error() error
++}
++
++type incoming struct {
++ err error
++ accepted bool
++}
++
++func (i *incoming) Reject(err error) { i.err = err }
++
++func (i *incoming) error() error {
++ switch {
++ case i.err != nil:
++ return i.err
++ case !i.accepted:
++ return amqp.Errorf(amqp.NotAllowed, "remote open rejected")
++ default:
++ return nil
++ }
++}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org