You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/02 02:54:10 UTC
[50/50] [abbrv] qpid-proton git commit: PROTON-1338: Go: update `go
get`
PROTON-1338: Go: update `go get`
Merge branch 'master' into go1 to update the published `go get` version of the
Go packages. The new packages are tested with C library versions 0.10 - 0.15.0
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/287eeaca
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/287eeaca
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/287eeaca
Branch: refs/heads/go1
Commit: 287eeacafbfc166529108b9f6d1bf8e839c9f9be
Parents: 5e6024b 7b6b8de
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Nov 1 22:48:55 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Nov 1 22:48:55 2016 -0400
----------------------------------------------------------------------
README.md | 5 +-
amqp/error.go | 28 +-
amqp/interop_test.go | 32 +-
amqp/marshal.go | 2 +-
amqp/message.go | 4 +-
amqp/url.go | 118 +++----
amqp/url_test.go | 14 +-
amqp/version.go | 29 ++
electron/auth_test.go | 124 ++++++++
electron/connection.go | 287 +++++++++++++----
electron/container.go | 49 ++-
electron/doc.go | 43 ++-
electron/electron_test.go | 546 +++++++++++++++++++++++++++++++++
electron/endpoint.go | 122 ++++++--
electron/ex_client_server_test.go | 81 +++++
electron/handler.go | 86 +++---
electron/link.go | 62 ++--
electron/messaging_test.go | 454 ---------------------------
electron/receiver.go | 75 +++--
electron/sender.go | 32 +-
electron/session.go | 47 +--
proton/engine.go | 377 ++++++++++++-----------
proton/handlers.go | 41 ++-
proton/proton_test.go | 73 ++++-
proton/wrappers.go | 69 ++++-
proton/wrappers_gen.go | 88 +++++-
26 files changed, 1873 insertions(+), 1015 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/README.md
----------------------------------------------------------------------
diff --cc README.md
index 4b2da12,9f95939..ffd67f8
--- a/README.md
+++ b/README.md
@@@ -1,96 -1,44 +1,97 @@@
-Qpid Proton - AMQP messaging toolkit
-====================================
-
-Linux Build | Windows Build
-------------|--------------
-[![Linux Build Status](https://travis-ci.org/apache/qpid-proton.svg?branch=master)](https://travis-ci.org/apache/qpid-proton) | [![Windows Build Status](https://ci.appveyor.com/api/projects/status/github/apache/qpid-proton?branch=master&svg=true)](https://ci.appveyor.com/project/ke4qqq/qpid-proton/branch/master)
-
-Qpid Proton is a high-performance, lightweight messaging library. It can be
-used in the widest range of messaging applications, including brokers, client
-libraries, routers, bridges, proxies, and more. Proton makes it trivial to
-integrate with the AMQP 1.0 ecosystem from any platform, environment, or
-language
-
-Features
---------
-
- + A flexible and capable reactive messaging API
- + Full control of AMQP 1.0 protocol semantics
- + Portable C implementation with bindings to popular languages
- + Pure-Java and pure-JavaScript implementations
- + Peer-to-peer and brokered messaging
- + Secure communication via SSL and SASL
-
-Universal - Proton is designed to scale both up and down. Equally suitable for
-simple clients or high-powered servers, it can be deployed in simple
-peer-to-peer configurations or as part of a global federated messaging network.
-
-Embeddable - Proton is carefully written to be portable and cross platform. It
-has minimal dependencies, and it is architected to be usable with any threading
-model, as well as with non-threaded applications. These features make it
-uniquely suited for embedding messaging capabilities into existing software.
-
-Standard - Built around the AMQP 1.0 messaging standard, Proton is not only
-ideal for building out your own messaging applications but also for connecting
-them to the broader ecosystem of AMQP 1.0-based messaging applications.
-
-Getting Started
----------------
-
-See the included INSTALL file for build and install instructions and the
-DEVELOPERS file for information on how to modify and test the library code
-itself.
-
-Please see http://qpid.apache.org/proton for a more info.
+# Qpid Go packages for AMQP
+
+These packages provide [Go](http://golang.org) support for sending and receiving
+AMQP messages in client or server applications. Reference documentation is
+available at: <http://godoc.org/?q=qpid.apache.org>
+
+There are 3 packages:
+
+[qpid.apache.org/amqp](http://godoc.org/qpid.apache.org/amqp) provides functions
+to convert AMQP messages and data types to and from Go data types. Used by both
+the proton and electron packages to manage AMQP data.
+
+[qpid.apache.org/electron](http://godoc.org/qpid.apache.org/electron) is a
+simple, concurrent-safe API for sending and receiving messages. It can be used
+with goroutines and channels to build concurrent AMQP clients and servers.
+
+[qpid.apache.org/proton](http://godoc.org/qpid.apache.org/proton) is an
+event-driven, concurrent-unsafe package that closely follows the proton C
- API. Most Go programmers will find the electron package easier to use.
++API. Most Go programmers will find the
++[electron](http://godoc.org/qpid.apache.org/electron) package easier to use.
+
- There are [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md)
++See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md)
+to help you get started.
+
+Feedback is encouraged at:
+
+- Email <pr...@qpid.apache.org>
+- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue.
+
+### Why two APIs?
+
+The `proton` API is a direct mapping of the proton C library into Go. It is
+usable but not very natural for a Go programmer because it takes an
+*event-driven* approach and has no built-in support for concurrent
+use. `electron` uses `proton` internally but provides a more Go-like API that is
+safe to use from multiple concurrent goroutines.
+
+Go encourages programs to be structured as concurrent *goroutines* that
+communicate via *channels*. Go literature distinguishes between:
+
+- *concurrency*: "keeping track of things that could be done in parallel"
+- *parallelism*: "actually doing things in parallel on multiple CPUs or cores"
+
+A Go program expresses concurrency by starting goroutines for potentially
+concurrent tasks. The Go runtime schedules the activity of goroutines onto a
+small number (possibly one) of actual parallel executions.
+
+Even with no hardware parallelism, goroutine concurrency lets the Go runtime
+order unpredictable events like file descriptors being readable/writable,
+channels having data, timers firing etc. Go automatically takes care of
+switching out goroutines that block or sleep so it is normal to write code in
+terms of blocking calls.
+
+By contrast, event-driven programming is based on polling mechanisms like
+`select`, `poll` or `epoll`. These also dispatch unpredictably ordered events to
+a single thread or a small thread pool. However this requires a different style
+of programming: "event-driven" or "reactive" programming. Go developers call it
+"inside-out" programming. In an event-driven program blocking is a big problem
+as it consumes a scarce thread of execution, so actions that take time to
+complete have to be re-structured in terms of multiple events.
+
+The promise of Go is that you can express your program in concurrent, sequential
+terms and the Go runtime will turn it inside-out for you. You can start
+goroutines for all concurrent activities. They can loop forever or block for as
+long as they need waiting for timers, IO or any unpredictable event. Go will
+interleave and schedule them efficiently onto the available parallel hardware.
+
+For example: in the `electron` API, you can send a message and wait for it to be
+acknowledged in a single function. All the information about the message, why
+you sent it, and what to do when it is acknowledged can be held in local
+variables, all the code is in a simple sequence. Other goroutines in your
+program can be sending and receiving messages concurrently, they are not
+blocked.
+
+In the `proton` API, an event handler that sends a message must return
+*immediately*, it cannot block the event loop to wait for
+acknowledgement. Acknowledgement is a separate event, so the code for handling
+it is in a different event handler. Context information about the message has to
+be stored in some non-local variable that both functions can find. This makes
+the code harder to follow.
+
+The `proton` API is important because it is the foundation for the `electron`
+API, and may be useful for programs that need to be close to the original C
+library for some reason. However the `electron` API hides the event-driven
+details behind simple, sequential, concurrent-safe methods that can be called
+from arbitrary goroutines. Under the covers, data is passed through channels to
+dedicated `proton` goroutines so user goroutines can work concurrently with the
+proton event-loop.
+
+## New to Go?
+
+If you are new to Go then these are a good place to start:
+
+- [A Tour of Go](http://tour.golang.org)
+- [Effective Go](http://golang.org/doc/effective_go.html)
+
+Then look at the tools and docs at <http://golang.org> as you need them.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/amqp/error.go
----------------------------------------------------------------------
diff --cc amqp/error.go
index 349fc41,0000000..3a178b2
mode 100644,000000..100644
--- a/amqp/error.go
+++ b/amqp/error.go
@@@ -1,103 -1,0 +1,103 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/error.h>
+import "C"
+
+import (
+ "fmt"
+ "reflect"
+)
+
+// Error is an AMQP error condition. It has a name and a description.
+// It implements the Go error interface so can be returned as an error value.
+//
- // You can pass amqp.Error to methods that pass an error to a remote endpoint,
++// You can pass amqp.Error to methods that send an error to a remote endpoint,
+// this gives you full control over what the remote endpoint will see.
+//
+// You can also pass any Go error to such functions, the remote peer
+// will see the equivalent of MakeError(error)
+//
+type Error struct{ Name, Description string }
+
+// Error implements the Go error interface for AMQP error errors.
+func (c Error) Error() string { return fmt.Sprintf("%s: %s", c.Name, c.Description) }
+
+// Errorf makes a Error with name and formatted description as per fmt.Sprintf
+func Errorf(name, format string, arg ...interface{}) Error {
+ return Error{name, fmt.Sprintf(format, arg...)}
+}
+
+// MakeError makes an AMQP error from a go error using the Go error type as the name
+// and the err.Error() string as the description.
+func MakeError(err error) Error {
+ return Error{reflect.TypeOf(err).Name(), err.Error()}
+}
+
+var (
- InternalError = "amqp:internal-error"
- NotFound = "amqp:not-found"
- UnauthorizedAccess = "amqp:unauthorized-access"
- DecodeError = "amqp:decode-error"
- ResourceLimit = "amqp:resource-limit"
- NotAllowed = "amqp:not-allowed"
- InvalidField = "amqp:invalid-field"
- NotImplemented = "amqp:not-implemented"
- ResourceLocked = "amqp:resource-locked"
- PreerrorFailed = "amqp:preerror-failed"
- ResourceDeleted = "amqp:resource-deleted"
- IllegalState = "amqp:illegal-state"
- FrameSizeTooSmall = "amqp:frame-size-too-small"
++ InternalError = "amqp:internal-error"
++ NotFound = "amqp:not-found"
++ UnauthorizedAccess = "amqp:unauthorized-access"
++ DecodeError = "amqp:decode-error"
++ ResourceLimitExceeded = "amqp:resource-limit-exceeded"
++ NotAllowed = "amqp:not-allowed"
++ InvalidField = "amqp:invalid-field"
++ NotImplemented = "amqp:not-implemented"
++ ResourceLocked = "amqp:resource-locked"
++ PreconditionFailed = "amqp:precondition-failed"
++ ResourceDeleted = "amqp:resource-deleted"
++ IllegalState = "amqp:illegal-state"
++ FrameSizeTooSmall = "amqp:frame-size-too-small"
+)
+
+type PnErrorCode int
+
+func (e PnErrorCode) String() string {
+ switch e {
+ case C.PN_EOS:
+ return "end-of-data"
+ case C.PN_ERR:
+ return "error"
+ case C.PN_OVERFLOW:
+ return "overflow"
+ case C.PN_UNDERFLOW:
+ return "underflow"
+ case C.PN_STATE_ERR:
+ return "bad-state"
+ case C.PN_ARG_ERR:
+ return "invalid-argument"
+ case C.PN_TIMEOUT:
+ return "timeout"
+ case C.PN_INTR:
+ return "interrupted"
+ case C.PN_INPROGRESS:
+ return "in-progress"
+ default:
+ return fmt.Sprintf("unknown-error(%d)", e)
+ }
+}
+
+func PnError(e *C.pn_error_t) error {
+ if e == nil || C.pn_error_code(e) == 0 {
+ return nil
+ }
+ return fmt.Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e)))
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/amqp/interop_test.go
----------------------------------------------------------------------
diff --cc amqp/interop_test.go
index b36ef64,0000000..b3e27bc
mode 100644,000000..100644
--- a/amqp/interop_test.go
+++ b/amqp/interop_test.go
@@@ -1,381 -1,0 +1,385 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// Test that conversion of Go type to/from AMQP is compatible with other
+// bindings.
+//
+package amqp
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "reflect"
+ "strings"
+ "testing"
+)
+
+func checkEqual(want interface{}, got interface{}) error {
+ if !reflect.DeepEqual(want, got) {
+ return fmt.Errorf("%#v != %#v", want, got)
+ }
+ return nil
+}
+
- func getReader(name string) (r io.Reader) {
- r, err := os.Open("interop/" + name + ".amqp")
++func getReader(t *testing.T, name string) (r io.Reader) {
++ dir := os.Getenv("PN_INTEROP_DIR")
++ if dir == "" {
++ t.Skip("no PN_INTEROP_DIR in environment")
++ }
++ r, err := os.Open(dir + "/" + name + ".amqp")
+ if err != nil {
- panic(fmt.Errorf("Can't open %#v: %v", name, err))
++ t.Fatalf("can't open %#v: %v", name, err)
+ }
+ return
+}
+
+func remaining(d *Decoder) string {
+ remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader))
+ return string(remainder)
+}
+
+// checkDecode: want is the expected value, gotPtr is a pointer to a
+// instance of the same type for Decode.
+func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) {
+
+ if err := d.Decode(gotPtr); err != nil {
+ t.Error("Decode failed", err)
+ return
+ }
+ got := reflect.ValueOf(gotPtr).Elem().Interface()
+ if err := checkEqual(want, got); err != nil {
+ t.Error("Decode bad value:", err)
+ return
+ }
+
+ // Try round trip encoding
+ bytes, err := Marshal(want, nil)
+ if err != nil {
+ t.Error("Marshal failed", err)
+ return
+ }
+ n, err := Unmarshal(bytes, gotPtr)
+ if err != nil {
+ t.Error("Unmarshal failed", err)
+ return
+ }
+ if err := checkEqual(n, len(bytes)); err != nil {
+ t.Error("Bad unmarshal length", err)
+ return
+ }
+ got = reflect.ValueOf(gotPtr).Elem().Interface()
+ if err = checkEqual(want, got); err != nil {
+ t.Error("Bad unmarshal value", err)
+ return
+ }
+}
+
+func TestUnmarshal(t *testing.T) {
- bytes, err := ioutil.ReadAll(getReader("strings"))
++ bytes, err := ioutil.ReadAll(getReader(t, "strings"))
+ if err != nil {
+ t.Error(err)
+ }
+ for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
+ var got string
+ n, err := Unmarshal(bytes, &got)
+ if err != nil {
+ t.Error(err)
+ }
+ if want != got {
+ t.Errorf("%#v != %#v", want, got)
+ }
+ bytes = bytes[n:]
+ }
+}
+
+func TestPrimitivesExact(t *testing.T) {
- d := NewDecoder(getReader("primitives"))
++ d := NewDecoder(getReader(t, "primitives"))
+ // Decoding into exact types
+ var b bool
+ checkDecode(d, true, &b, t)
+ checkDecode(d, false, &b, t)
+ var u8 uint8
+ checkDecode(d, uint8(42), &u8, t)
+ var u16 uint16
+ checkDecode(d, uint16(42), &u16, t)
+ var i16 int16
+ checkDecode(d, int16(-42), &i16, t)
+ var u32 uint32
+ checkDecode(d, uint32(12345), &u32, t)
+ var i32 int32
+ checkDecode(d, int32(-12345), &i32, t)
+ var u64 uint64
+ checkDecode(d, uint64(12345), &u64, t)
+ var i64 int64
+ checkDecode(d, int64(-12345), &i64, t)
+ var f32 float32
+ checkDecode(d, float32(0.125), &f32, t)
+ var f64 float64
+ checkDecode(d, float64(0.125), &f64, t)
+}
+
+func TestPrimitivesCompatible(t *testing.T) {
- d := NewDecoder(getReader("primitives"))
++ d := NewDecoder(getReader(t, "primitives"))
+ // Decoding into compatible types
+ var b bool
+ var i int
+ var u uint
+ var f float64
+ checkDecode(d, true, &b, t)
+ checkDecode(d, false, &b, t)
+ checkDecode(d, uint(42), &u, t)
+ checkDecode(d, uint(42), &u, t)
+ checkDecode(d, -42, &i, t)
+ checkDecode(d, uint(12345), &u, t)
+ checkDecode(d, -12345, &i, t)
+ checkDecode(d, uint(12345), &u, t)
+ checkDecode(d, -12345, &i, t)
+ checkDecode(d, 0.125, &f, t)
+ checkDecode(d, 0.125, &f, t)
+}
+
+// checkDecodeValue: want is the expected value, decode into a reflect.Value
+func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) {
+
+ var got, got2 interface{}
+ if err := d.Decode(&got); err != nil {
+ t.Error("Decode failed", err)
+ return
+ }
+ if err := checkEqual(want, got); err != nil {
+ t.Error(err)
+ return
+ }
+ // Try round trip encoding
+ bytes, err := Marshal(got, nil)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ n, err := Unmarshal(bytes, &got2)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ if err := checkEqual(n, len(bytes)); err != nil {
+ t.Error(err)
+ return
+ }
+ if err := checkEqual(want, got2); err != nil {
+ t.Error(err)
+ return
+ }
+}
+
+func TestPrimitivesInterface(t *testing.T) {
- d := NewDecoder(getReader("primitives"))
++ d := NewDecoder(getReader(t, "primitives"))
+ checkDecodeInterface(d, true, t)
+ checkDecodeInterface(d, false, t)
+ checkDecodeInterface(d, uint8(42), t)
+ checkDecodeInterface(d, uint16(42), t)
+ checkDecodeInterface(d, int16(-42), t)
+ checkDecodeInterface(d, uint32(12345), t)
+ checkDecodeInterface(d, int32(-12345), t)
+ checkDecodeInterface(d, uint64(12345), t)
+ checkDecodeInterface(d, int64(-12345), t)
+ checkDecodeInterface(d, float32(0.125), t)
+ checkDecodeInterface(d, float64(0.125), t)
+}
+
+func TestStrings(t *testing.T) {
- d := NewDecoder(getReader("strings"))
++ d := NewDecoder(getReader(t, "strings"))
+ // Test decoding as plain Go strings
+ for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
+ var got string
+ checkDecode(d, want, &got, t)
+ }
+ remains := remaining(d)
+ if remains != "" {
+ t.Errorf("leftover: %s", remains)
+ }
+
+ // Test decoding as specific string types
- d = NewDecoder(getReader("strings"))
++ d = NewDecoder(getReader(t, "strings"))
+ var bytes []byte
+ var str, sym string
+ checkDecode(d, []byte("abc\000defg"), &bytes, t)
+ checkDecode(d, "abcdefg", &str, t)
+ checkDecode(d, "abcdefg", &sym, t)
+ checkDecode(d, make([]byte, 0), &bytes, t)
+ checkDecode(d, "", &str, t)
+ checkDecode(d, "", &sym, t)
+ remains = remaining(d)
+ if remains != "" {
+ t.Fatalf("leftover: %s", remains)
+ }
+
+ // Test some error handling
- d = NewDecoder(getReader("strings"))
++ d = NewDecoder(getReader(t, "strings"))
+ var s string
+ err := d.Decode(s)
+ if err == nil {
+ t.Fatal("Expected error")
+ }
+ if !strings.Contains(err.Error(), "not a pointer") {
+ t.Error(err)
+ }
+ var i int
+ err = d.Decode(&i)
+ if !strings.Contains(err.Error(), "cannot unmarshal") {
+ t.Error(err)
+ }
+ _, err = Unmarshal([]byte{}, nil)
+ if !strings.Contains(err.Error(), "not enough data") {
+ t.Error(err)
+ }
+ _, err = Unmarshal([]byte("foobar"), nil)
+ if !strings.Contains(err.Error(), "invalid-argument") {
+ t.Error(err)
+ }
+}
+
+func TestEncodeDecode(t *testing.T) {
+ type data struct {
+ s string
+ i int
+ u8 uint8
+ b bool
+ f float32
+ v interface{}
+ }
+
+ in := data{"foo", 42, 9, true, 1.234, "thing"}
+
+ buf := bytes.Buffer{}
+ e := NewEncoder(&buf)
+ if err := e.Encode(in.s); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.i); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.u8); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.b); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.f); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.v); err != nil {
+ t.Error(err)
+ }
+
+ var out data
+ d := NewDecoder(&buf)
+ if err := d.Decode(&out.s); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.i); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.u8); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.b); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.f); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.v); err != nil {
+ t.Error(err)
+ }
+
+ if err := checkEqual(in, out); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestMap(t *testing.T) {
- d := NewDecoder(getReader("maps"))
++ d := NewDecoder(getReader(t, "maps"))
+
+ // Generic map
+ var m Map
+ checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t)
+
+ // Interface as map
+ var i interface{}
+ checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t)
+
- d = NewDecoder(getReader("maps"))
++ d = NewDecoder(getReader(t, "maps"))
+ // Specific typed map
+ var m2 map[string]int
+ checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t)
+
+ // Nested map
+ m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}}
+ bytes, err := Marshal(m, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = Unmarshal(bytes, &i)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err = checkEqual(m, i); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestList(t *testing.T) {
- d := NewDecoder(getReader("lists"))
++ d := NewDecoder(getReader(t, "lists"))
+ var l List
+ checkDecode(d, List{int32(32), "foo", true}, &l, t)
+ checkDecode(d, List{}, &l, t)
+}
+
+// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as
+// as an AMQP string *inside* an AMQP binary?? Skip the test for now.
+func TODO_TestMessage(t *testing.T) {
- bytes, err := ioutil.ReadAll(getReader("message"))
++ bytes, err := ioutil.ReadAll(getReader(t, "message"))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ m, err := DecodeMessage(bytes)
+ if err != nil {
+ t.Fatal(err)
+ } else {
+ if err := checkEqual(m.Body(), "hello"); err != nil {
+ t.Error(err)
+ }
+ }
+
+ m2 := NewMessageWith("hello")
+ bytes2, err := m2.Encode(nil)
+ if err != nil {
+ t.Error(err)
+ } else {
+ if err = checkEqual(bytes, bytes2); err != nil {
+ t.Error(err)
+ }
+ }
+}
+
+// TODO aconway 2015-03-13: finish the full interop test
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/amqp/marshal.go
----------------------------------------------------------------------
diff --cc amqp/marshal.go
index 66e14d8,0000000..3b4a59e
mode 100644,000000..100644
--- a/amqp/marshal.go
+++ b/amqp/marshal.go
@@@ -1,250 -1,0 +1,250 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+//#include "codec_shim.h"
+import "C"
+
+import (
+ "fmt"
+ "io"
+ "reflect"
+ "unsafe"
+)
+
+func dataError(prefix string, data *C.pn_data_t) error {
+ err := PnError(C.pn_data_error(data))
+ if err != nil {
+ err = fmt.Errorf("%s: %s", prefix, err.Error())
+ }
+ return err
+}
+
+/*
+Marshal encodes a Go value as AMQP data in buffer.
+If buffer is nil, or is not large enough, a new buffer is created.
+
+Returns the buffer used for encoding with len() adjusted to the actual size of data.
+
+Go types are encoded as follows
+
+ +-------------------------------------+--------------------------------------------+
+ |Go type |AMQP type |
+ +-------------------------------------+--------------------------------------------+
+ |bool |bool |
+ +-------------------------------------+--------------------------------------------+
+ |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) |
+ +-------------------------------------+--------------------------------------------+
+ |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) |
+ +-------------------------------------+--------------------------------------------+
+ |float32, float64 |float, double. |
+ +-------------------------------------+--------------------------------------------+
+ |string |string |
+ +-------------------------------------+--------------------------------------------+
+ |[]byte, Binary |binary |
+ +-------------------------------------+--------------------------------------------+
+ |Symbol |symbol |
+ +-------------------------------------+--------------------------------------------+
+ |interface{} |the contained type |
+ +-------------------------------------+--------------------------------------------+
+ |nil |null |
+ +-------------------------------------+--------------------------------------------+
+ |map[K]T |map with K and T converted as above |
+ +-------------------------------------+--------------------------------------------+
+ |Map |map, may have mixed types for keys, values |
+ +-------------------------------------+--------------------------------------------+
+ |[]T |list with T converted as above |
+ +-------------------------------------+--------------------------------------------+
+ |List |list, may have mixed types values |
+ +-------------------------------------+--------------------------------------------+
+
+The following Go types cannot be marshaled: uintptr, function, interface, channel
+
+TODO
+
+Go types: array, slice, struct, complex64/128.
+
+AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies.
+
+Described types.
+
+*/
+func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
+ defer doRecover(&err)
+ data := C.pn_data(0)
+ defer C.pn_data_free(data)
+ marshal(v, data)
+ encode := func(buf []byte) ([]byte, error) {
+ n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
+ switch {
+ case n == int(C.PN_OVERFLOW):
+ return buf, overflow
+ case n < 0:
+ return buf, dataError("marshal error", data)
+ default:
+ return buf[:n], nil
+ }
+ }
+ return encodeGrow(buffer, encode)
+}
+
+const minEncode = 256
+
+// overflow is returned when an encoding function can't fit data in the buffer.
+var overflow = fmt.Errorf("buffer too small")
+
+// encodeFn encodes into buffer[0:len(buffer)].
+// Returns buffer with length adjusted for data encoded.
+// If buffer too small, returns overflow as error.
+type encodeFn func(buffer []byte) ([]byte, error)
+
+// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
+// Returns the final buffer.
+func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
+ if buffer == nil || len(buffer) == 0 {
+ buffer = make([]byte, minEncode)
+ }
+ var err error
+ for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
+ buffer = make([]byte, 2*len(buffer))
+ }
+ return buffer, err
+}
+
+func marshal(v interface{}, data *C.pn_data_t) {
+ switch v := v.(type) {
+ case nil:
+ C.pn_data_put_null(data)
+ case bool:
+ C.pn_data_put_bool(data, C.bool(v))
+ case int8:
+ C.pn_data_put_byte(data, C.int8_t(v))
+ case int16:
+ C.pn_data_put_short(data, C.int16_t(v))
+ case int32:
+ C.pn_data_put_int(data, C.int32_t(v))
+ case int64:
+ C.pn_data_put_long(data, C.int64_t(v))
+ case int:
+ if unsafe.Sizeof(0) == 8 {
+ C.pn_data_put_long(data, C.int64_t(v))
+ } else {
+ C.pn_data_put_int(data, C.int32_t(v))
+ }
+ case uint8:
+ C.pn_data_put_ubyte(data, C.uint8_t(v))
+ case uint16:
+ C.pn_data_put_ushort(data, C.uint16_t(v))
+ case uint32:
+ C.pn_data_put_uint(data, C.uint32_t(v))
+ case uint64:
+ C.pn_data_put_ulong(data, C.uint64_t(v))
+ case uint:
+ if unsafe.Sizeof(0) == 8 {
+ C.pn_data_put_ulong(data, C.uint64_t(v))
+ } else {
+ C.pn_data_put_uint(data, C.uint32_t(v))
+ }
+ case float32:
+ C.pn_data_put_float(data, C.float(v))
+ case float64:
+ C.pn_data_put_double(data, C.double(v))
+ case string:
+ C.pn_data_put_string(data, pnBytes([]byte(v)))
+ case []byte:
+ C.pn_data_put_binary(data, pnBytes(v))
+ case Binary:
+ C.pn_data_put_binary(data, pnBytes([]byte(v)))
+ case Symbol:
+ C.pn_data_put_symbol(data, pnBytes([]byte(v)))
+ case Map: // Special map type
+ C.pn_data_put_map(data)
+ C.pn_data_enter(data)
+ for key, val := range v {
+ marshal(key, data)
+ marshal(val, data)
+ }
+ C.pn_data_exit(data)
+ default:
+ switch reflect.TypeOf(v).Kind() {
+ case reflect.Map:
+ putMap(data, v)
+ case reflect.Slice:
+ putList(data, v)
+ default:
+ panic(fmt.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
+ }
+ }
+ err := dataError("marshal", data)
+ if err != nil {
+ panic(err)
+ }
+ return
+}
+
+func clearMarshal(v interface{}, data *C.pn_data_t) {
+ C.pn_data_clear(data)
+ marshal(v, data)
+}
+
+func putMap(data *C.pn_data_t, v interface{}) {
+ mapValue := reflect.ValueOf(v)
+ C.pn_data_put_map(data)
+ C.pn_data_enter(data)
+ for _, key := range mapValue.MapKeys() {
+ marshal(key.Interface(), data)
+ marshal(mapValue.MapIndex(key).Interface(), data)
+ }
+ C.pn_data_exit(data)
+}
+
+func putList(data *C.pn_data_t, v interface{}) {
+ listValue := reflect.ValueOf(v)
+ C.pn_data_put_list(data)
+ C.pn_data_enter(data)
+ for i := 0; i < listValue.Len(); i++ {
+ marshal(listValue.Index(i).Interface(), data)
+ }
+ C.pn_data_exit(data)
+}
+
+// Encoder encodes AMQP values to an io.Writer
+type Encoder struct {
+ writer io.Writer
+ buffer []byte
+}
+
+// New encoder returns a new encoder that writes to w.
+func NewEncoder(w io.Writer) *Encoder {
+ return &Encoder{w, make([]byte, minEncode)}
+}
+
+func (e *Encoder) Encode(v interface{}) (err error) {
+ e.buffer, err = Marshal(v, e.buffer)
+ if err == nil {
- e.writer.Write(e.buffer)
++ _, err = e.writer.Write(e.buffer)
+ }
+ return err
+}
+
+func replace(data *C.pn_data_t, v interface{}) {
+ C.pn_data_clear(data)
+ marshal(v, data)
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/amqp/message.go
----------------------------------------------------------------------
diff --cc amqp/message.go
index 1d1287f,0000000..48a209a
mode 100644,000000..100644
--- a/amqp/message.go
+++ b/amqp/message.go
@@@ -1,346 -1,0 +1,348 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include "codec_shim.h"
+// #include <proton/types.h>
+// #include <proton/message.h>
+// #include <stdlib.h>
+//
+// /* Helper for setting message string fields */
+// typedef int (*set_fn)(pn_message_t*, const char*);
+// int msg_set_str(pn_message_t* m, char* s, set_fn set) {
+// int result = set(m, s);
+// free(s);
+// return result;
+// }
+//
+import "C"
+
+import (
+ "fmt"
+ "runtime"
+ "time"
+ "unsafe"
+)
+
+// Message is the interface to an AMQP message.
+type Message interface {
+ // Durable indicates that any parties taking responsibility
+ // for the message must durably store the content.
+ Durable() bool
+ SetDurable(bool)
+
+ // Priority impacts ordering guarantees. Within a
+ // given ordered context, higher priority messages may jump ahead of
+ // lower priority messages.
+ Priority() uint8
+ SetPriority(uint8)
+
+ // TTL or Time To Live, a message it may be dropped after this duration
+ TTL() time.Duration
+ SetTTL(time.Duration)
+
+ // FirstAcquirer indicates
+ // that the recipient of the message is the first recipient to acquire
+ // the message, i.e. there have been no failed delivery attempts to
+ // other acquirers. Note that this does not mean the message has not
+ // been delivered to, but not acquired, by other recipients.
+ FirstAcquirer() bool
+ SetFirstAcquirer(bool)
+
+ // DeliveryCount tracks how many attempts have been made to
+ // delivery a message.
+ DeliveryCount() uint32
+ SetDeliveryCount(uint32)
+
+ // MessageId provides a unique identifier for a message.
+ // it can be an a string, an unsigned long, a uuid or a
+ // binary value.
+ MessageId() interface{}
+ SetMessageId(interface{})
+
+ UserId() string
+ SetUserId(string)
+
+ Address() string
+ SetAddress(string)
+
+ Subject() string
+ SetSubject(string)
+
+ ReplyTo() string
+ SetReplyTo(string)
+
+ // CorrelationId is set on correlated request and response messages. It can be
+ // an a string, an unsigned long, a uuid or a binary value.
+ CorrelationId() interface{}
+ SetCorrelationId(interface{})
+
+ ContentType() string
+ SetContentType(string)
+
+ ContentEncoding() string
+ SetContentEncoding(string)
+
+ // ExpiryTime indicates an absoulte time when the message may be dropped.
+ // A Zero time (i.e. t.isZero() == true) indicates a message never expires.
+ ExpiryTime() time.Time
+ SetExpiryTime(time.Time)
+
+ CreationTime() time.Time
+ SetCreationTime(time.Time)
+
+ GroupId() string
+ SetGroupId(string)
+
+ GroupSequence() int32
+ SetGroupSequence(int32)
+
+ ReplyToGroupId() string
+ SetReplyToGroupId(string)
+
+ // Instructions - AMQP delivery instructions.
+ Instructions() map[string]interface{}
+ SetInstructions(v map[string]interface{})
+
+ // Annotations - AMQP annotations.
+ Annotations() map[string]interface{}
+ SetAnnotations(v map[string]interface{})
+
+ // Properties - Application properties.
+ Properties() map[string]interface{}
+ SetProperties(v map[string]interface{})
+
+ // Inferred indicates how the message content
+ // is encoded into AMQP sections. If inferred is true then binary and
+ // list values in the body of the message will be encoded as AMQP DATA
+ // and AMQP SEQUENCE sections, respectively. If inferred is false,
+ // then all values in the body of the message will be encoded as AMQP
+ // VALUE sections regardless of their type.
+ Inferred() bool
+ SetInferred(bool)
+
+ // Marshal a Go value into the message body. See amqp.Marshal() for details.
+ Marshal(interface{})
+
+ // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
+ Unmarshal(interface{})
+
+ // Body value resulting from the default unmarshalling of message body as interface{}
+ Body() interface{}
+
+ // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
+ // the message is encoded into it, otherwise a new buffer is created.
+ // Returns the buffer containing the message.
+ Encode(buffer []byte) ([]byte, error)
+
+ // Decode data into this message. Overwrites an existing message content.
+ Decode(buffer []byte) error
+
+ // Clear the message contents.
+ Clear()
+
+ // Copy the contents of another message to this one.
+ Copy(m Message) error
+}
+
+type message struct{ pn *C.pn_message_t }
+
+func freeMessage(m *message) {
+ C.pn_message_free(m.pn)
+ m.pn = nil
+}
+
+// NewMessage creates a new message instance.
+func NewMessage() Message {
+ m := &message{C.pn_message()}
+ runtime.SetFinalizer(m, freeMessage)
+ return m
+}
+
+// NewMessageWith creates a message with value as the body. Equivalent to
+// m := NewMessage(); m.Marshal(body)
+func NewMessageWith(value interface{}) Message {
+ m := NewMessage()
+ m.Marshal(value)
+ return m
+}
+
+func (m *message) Clear() { C.pn_message_clear(m.pn) }
+
+func (m *message) Copy(x Message) error {
+ if data, err := x.Encode(nil); err == nil {
+ return m.Decode(data)
+ } else {
+ return err
+ }
+}
+
+// ==== message get functions
+
+func rewindGet(data *C.pn_data_t) (v interface{}) {
+ C.pn_data_rewind(data)
+ C.pn_data_next(data)
+ unmarshal(&v, data)
+ return v
+}
+
+func rewindMap(data *C.pn_data_t) (v map[string]interface{}) {
+ C.pn_data_rewind(data)
+ C.pn_data_next(data)
+ unmarshal(&v, data)
+ return v
+}
+
+func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) }
+func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) }
+func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
+func (m *message) TTL() time.Duration {
+ return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
+}
+func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) }
+func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) }
+func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) }
+func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) }
+func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) }
+func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) }
+func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
+func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
+func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) }
+func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
+
+func (m *message) ExpiryTime() time.Time {
+ return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
+}
+func (m *message) CreationTime() time.Time {
+ return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
+}
+func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) }
+func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) }
+func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
+
+func (m *message) Instructions() map[string]interface{} {
+ return rewindMap(C.pn_message_instructions(m.pn))
+}
+func (m *message) Annotations() map[string]interface{} {
+ return rewindMap(C.pn_message_annotations(m.pn))
+}
+func (m *message) Properties() map[string]interface{} {
+ return rewindMap(C.pn_message_properties(m.pn))
+}
+
+// ==== message set methods
+
+func setData(v interface{}, data *C.pn_data_t) {
+ C.pn_data_clear(data)
+ marshal(v, data)
+}
+
+func dataString(data *C.pn_data_t) string {
+ str := C.pn_string(C.CString(""))
+ defer C.pn_free(unsafe.Pointer(str))
+ C.pn_inspect(unsafe.Pointer(data), str)
+ return C.GoString(C.pn_string_get(str))
+}
+
- func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(m.Inferred())) }
++func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(b)) }
+func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) }
+func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
+func (m *message) SetTTL(d time.Duration) {
+ C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
+}
+func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
+func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
+func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
+func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
+func (m *message) SetAddress(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
+}
+func (m *message) SetSubject(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
+}
+func (m *message) SetReplyTo(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
+}
+func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
+func (m *message) SetContentType(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
+}
+func (m *message) SetContentEncoding(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
+}
+func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
+func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
+func (m *message) SetGroupId(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
+}
+func (m *message) SetGroupSequence(s int32) {
+ C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
+}
+func (m *message) SetReplyToGroupId(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
+}
+
+func (m *message) SetInstructions(v map[string]interface{}) {
+ setData(v, C.pn_message_instructions(m.pn))
+}
+func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) }
+func (m *message) SetProperties(v map[string]interface{}) { setData(v, C.pn_message_properties(m.pn)) }
+
+// Marshal/Unmarshal body
+func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) }
+func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) }
+func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return }
+
+func (m *message) Decode(data []byte) error {
+ m.Clear()
+ if len(data) == 0 {
+ return fmt.Errorf("empty buffer for decode")
+ }
+ if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
+ return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn)))
+ }
+ return nil
+}
+
+func DecodeMessage(data []byte) (m Message, err error) {
+ m = NewMessage()
+ err = m.Decode(data)
+ return
+}
+
+func (m *message) Encode(buffer []byte) ([]byte, error) {
+ encode := func(buf []byte) ([]byte, error) {
+ len := cLen(buf)
+ result := C.pn_message_encode(m.pn, cPtr(buf), &len)
+ switch {
+ case result == C.PN_OVERFLOW:
+ return buf, overflow
+ case result < 0:
+ return buf, fmt.Errorf("cannot encode message: %s", PnErrorCode(result))
+ default:
+ return buf[:len], nil
+ }
+ }
+ return encodeGrow(buffer, encode)
+}
+
+// TODO aconway 2015-09-14: Multi-section messages.
++
++// TODO aconway 2016-09-09: Message.String() use inspect.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/amqp/url.go
----------------------------------------------------------------------
diff --cc amqp/url.go
index 70545d2,0000000..fd6c8dc
mode 100644,000000..100644
--- a/amqp/url.go
+++ b/amqp/url.go
@@@ -1,96 -1,0 +1,104 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
- /*
- #include <stdlib.h>
- #include <string.h>
- #include <proton/url.h>
-
- // Helper function for setting URL fields.
- typedef void (*setter_fn)(pn_url_t* url, const char* value);
- inline void set(pn_url_t *url, setter_fn s, const char* value) {
- s(url, value);
- }
- */
- import "C"
-
+import (
- "fmt"
++ "errors"
+ "net"
+ "net/url"
- "unsafe"
++ "strings"
+)
+
+const (
+ amqp string = "amqp"
+ amqps = "amqps"
++ defaulthost = "localhost"
+)
+
- // ParseUrl parses an AMQP URL string and returns a net/url.Url.
- //
- // It is more forgiving than net/url.Parse and allows most of the parts of the
- // URL to be missing, assuming AMQP defaults.
- //
- func ParseURL(s string) (u *url.URL, err error) {
- cstr := C.CString(s)
- defer C.free(unsafe.Pointer(cstr))
- pnUrl := C.pn_url_parse(cstr)
- if pnUrl == nil {
- return nil, fmt.Errorf("bad URL %#v", s)
++// The way this is used it can only get a hostport already validated by
++// the URL parser, so this means we can skip some error checks
++func splitHostPort(hostport string) (string, string, error) {
++ if hostport == "" {
++ return "", "", nil
+ }
- defer C.pn_url_free(pnUrl)
-
- scheme := C.GoString(C.pn_url_get_scheme(pnUrl))
- username := C.GoString(C.pn_url_get_username(pnUrl))
- password := C.GoString(C.pn_url_get_password(pnUrl))
- host := C.GoString(C.pn_url_get_host(pnUrl))
- port := C.GoString(C.pn_url_get_port(pnUrl))
- path := C.GoString(C.pn_url_get_path(pnUrl))
++ if hostport[0] == '[' {
++ // There must be a matching ']' as already validated
++ if l := strings.LastIndex(hostport, "]"); len(hostport) == l+1 {
++ // trim off '[' and ']'
++ return hostport[1:l], "", nil
++ }
++ } else if strings.IndexByte(hostport, ':') < 0 {
++ return hostport, "", nil
++ }
++ return net.SplitHostPort(hostport)
++}
+
++func UpdateURL(in *url.URL) (err error) {
++ // Detect form without "amqp://" and stick it on front
++ // to make it match the usual proton defaults
++ u := new (url.URL)
++ *u = *in
++ if (u.Scheme != "" && u.Opaque != "") ||
++ (u.Scheme == "" && u.Host == "") {
++ input := u.String()
++ input = "amqp://" + input
++ u, err = url.Parse(input)
++ if err != nil {
++ return
++ }
++ }
++ // If Scheme is still "" then default to amqp
++ if u.Scheme == "" {
++ u.Scheme = amqp
++ }
++ // Error if the scheme is not an amqp scheme
++ if u.Scheme != amqp && u.Scheme != amqps {
++ return errors.New("invalid amqp scheme")
++ }
++ // Decompose Host into host and port
++ host, port, err := splitHostPort(u.Host)
+ if err != nil {
- return nil, fmt.Errorf("bad URL %#v: %s", s, err)
++ return
+ }
- if scheme == "" {
- scheme = amqp
++ if host == "" {
++ host = defaulthost
+ }
+ if port == "" {
- if scheme == amqps {
- port = amqps
- } else {
- port = amqp
- }
- }
- var user *url.Userinfo
- if password != "" {
- user = url.UserPassword(username, password)
- } else if username != "" {
- user = url.User(username)
++ port = u.Scheme
+ }
++ u.Host = net.JoinHostPort(host, port)
++ *in = *u
++ return nil
++}
+
- u = &url.URL{
- Scheme: scheme,
- User: user,
- Host: net.JoinHostPort(host, port),
- Path: path,
++// ParseUrl parses an AMQP URL string and returns a net/url.Url.
++//
++// It is more forgiving than net/url.Parse and allows most of the parts of the
++// URL to be missing, assuming AMQP defaults.
++//
++func ParseURL(s string) (u *url.URL, err error) {
++ if u, err = url.Parse(s); err != nil {
++ return
+ }
-
- return u, nil
++ if err = UpdateURL(u); err != nil {
++ u = nil
++ }
++ return u, err
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/amqp/url_test.go
----------------------------------------------------------------------
diff --cc amqp/url_test.go
index 99b656d,0000000..f52d4bf
mode 100644,000000..100644
--- a/amqp/url_test.go
+++ b/amqp/url_test.go
@@@ -1,51 -1,0 +1,59 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+import (
+ "fmt"
+)
+
+func ExampleParseURL() {
+ for _, s := range []string{
+ "amqp://username:password@host:1234/path",
+ "host:1234",
+ "host",
- ":1234",
+ "host/path",
+ "amqps://host",
++ "/path",
+ "",
++ ":1234",
++ // Taken out becasue the go 1.4 URL parser isn't the same as later
++ //"[::1]",
++ //"[::1",
++ // Output would be:
++ // amqp://[::1]:amqp
++ // parse amqp://[::1: missing ']' in host
+ } {
+ u, err := ParseURL(s)
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ fmt.Println(u)
+ }
+ }
+ // Output:
+ // amqp://username:password@host:1234/path
+ // amqp://host:1234
+ // amqp://host:amqp
- // amqp://:1234
+ // amqp://host:amqp/path
+ // amqps://host:amqps
- // bad URL ""
++ // amqp://localhost:amqp/path
++ // amqp://localhost:amqp
++ // parse :1234: missing protocol scheme
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/amqp/version.go
----------------------------------------------------------------------
diff --cc amqp/version.go
index 0000000,0000000..cefa904
new file mode 100644
--- /dev/null
+++ b/amqp/version.go
@@@ -1,0 -1,0 +1,29 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package amqp
++
++// Version check for proton library.
++// Done here because this is the lowest-level dependency for all the proton Go packages.
++
++// #include <proton/version.h>
++// #if PN_VERSION_MINOR < 10
++// #error packages qpid.apache.org/... require Proton-C library version 0.10 or greater
++// #endif
++import "C"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/auth_test.go
----------------------------------------------------------------------
diff --cc electron/auth_test.go
index 0000000,0000000..73a9299
new file mode 100644
--- /dev/null
+++ b/electron/auth_test.go
@@@ -1,0 -1,0 +1,124 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++ "fmt"
++ "io/ioutil"
++ "os"
++ "os/exec"
++ "path/filepath"
++ "strings"
++ "testing"
++)
++
++func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (got connectionSettings, err error) {
++ client, server := newClientServerOpts(t, copts, sopts)
++ defer closeClientServer(client, server)
++
++ go func() {
++ for in := range server.Incoming() {
++ switch in := in.(type) {
++ case *IncomingConnection:
++ got = connectionSettings{user: in.User(), virtualHost: in.VirtualHost()}
++ }
++ in.Accept()
++ }
++ }()
++
++ err = client.Sync()
++ return
++}
++
++func TestAuthAnonymous(t *testing.T) {
++ fatalIf(t, configureSASL())
++ got, err := testAuthClientServer(t,
++ []ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)},
++ []ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)})
++ fatalIf(t, err)
++ errorIf(t, checkEqual(connectionSettings{user: "anonymous", virtualHost: "vhost"}, got))
++}
++
++func TestAuthPlain(t *testing.T) {
++ fatalIf(t, configureSASL())
++ got, err := testAuthClientServer(t,
++ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))},
++ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
++ fatalIf(t, err)
++ errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got))
++}
++
++func TestAuthBadPass(t *testing.T) {
++ fatalIf(t, configureSASL())
++ _, err := testAuthClientServer(t,
++ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))},
++ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
++ if err == nil {
++ t.Error("Expected auth failure for bad pass")
++ }
++}
++
++func TestAuthBadUser(t *testing.T) {
++ fatalIf(t, configureSASL())
++ _, err := testAuthClientServer(t,
++ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))},
++ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
++ if err == nil {
++ t.Error("Expected auth failure for bad user")
++ }
++}
++
++var confDir string
++var confErr error
++
++func configureSASL() error {
++ if confDir != "" || confErr != nil {
++ return confErr
++ }
++ confDir, confErr = ioutil.TempDir("", "")
++ if confErr != nil {
++ return confErr
++ }
++
++ GlobalSASLConfigDir(confDir)
++ GlobalSASLConfigName("test")
++ conf := filepath.Join(confDir, "test.conf")
++
++ db := filepath.Join(confDir, "proton.sasldb")
++ cmd := exec.Command("saslpasswd2", "-c", "-p", "-f", db, "-u", "proton", "fred")
++ cmd.Stdin = strings.NewReader("xxx") // Password
++ if out, err := cmd.CombinedOutput(); err != nil {
++ confErr = fmt.Errorf("saslpasswd2 failed: %s\n%s", err, out)
++ return confErr
++ }
++ confStr := "sasldb_path: " + db + "\nmech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS\n"
++ if err := ioutil.WriteFile(conf, []byte(confStr), os.ModePerm); err != nil {
++ confErr = fmt.Errorf("write conf file %s failed: %s", conf, err)
++ }
++ return confErr
++}
++
++func TestMain(m *testing.M) {
++ status := m.Run()
++ if confDir != "" {
++ _ = os.RemoveAll(confDir)
++ }
++ os.Exit(status)
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/connection.go
----------------------------------------------------------------------
diff --cc electron/connection.go
index 1f8bd40,0000000..7f3050f
mode 100644,000000..100644
--- a/electron/connection.go
+++ b/electron/connection.go
@@@ -1,246 -1,0 +1,405 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+// #include <proton/disposition.h>
+import "C"
+
+import (
- "fmt"
+ "net"
+ "qpid.apache.org/proton"
+ "sync"
+ "time"
+)
+
++// Settings associated with a Connection.
++type ConnectionSettings interface {
++ // Authenticated user name associated with the connection.
++ User() string
++
++ // The AMQP virtual host name for the connection.
++ //
++ // Optional, useful when the server has multiple names and provides different
++ // service based on the name the client uses to connect.
++ //
++ // By default it is set to the DNS host name that the client uses to connect,
++ // but it can be set to something different at the client side with the
++ // VirtualHost() option.
++ //
++ // Returns error if the connection fails to authenticate.
++ VirtualHost() string
++
++ // Heartbeat is the maximum delay between sending frames that the remote peer
++ // has requested of us. If the interval expires an empty "heartbeat" frame
++ // will be sent automatically to keep the connection open.
++ Heartbeat() time.Duration
++}
++
+// Connection is an AMQP connection, created by a Container.
+type Connection interface {
+ Endpoint
++ ConnectionSettings
+
+ // Sender opens a new sender on the DefaultSession.
+ Sender(...LinkOption) (Sender, error)
+
+ // Receiver opens a new Receiver on the DefaultSession().
+ Receiver(...LinkOption) (Receiver, error)
+
+ // DefaultSession() returns a default session for the connection. It is opened
+ // on the first call to DefaultSession and returned on subsequent calls.
+ DefaultSession() (Session, error)
+
+ // Session opens a new session.
+ Session(...SessionOption) (Session, error)
+
+ // Container for the connection.
+ Container() Container
+
+ // Disconnect the connection abruptly with an error.
+ Disconnect(error)
+
+ // Wait waits for the connection to be disconnected.
+ Wait() error
+
+ // WaitTimeout is like Wait but returns Timeout if the timeout expires.
+ WaitTimeout(time.Duration) error
+
- // Incoming returns a channel for incoming endpoints opened by the remote end.
- //
- // To enable, pass AllowIncoming() when creating the Connection. Otherwise all
- // incoming endpoint requests are automatically rejected and Incoming()
- // returns nil.
- //
- // An Incoming value can be an *IncomingSession, *IncomingSender or
- // *IncomingReceiver. You must call Accept() to open the endpoint or Reject()
- // to close it with an error. The specific Incoming types have additional
- // methods to configure the endpoint.
- //
- // Not receiving from Incoming() or not calling Accept/Reject will block the
- // electron event loop. Normally you would have a dedicated goroutine receive
- // from Incoming() and start new goroutines to serve each incoming endpoint.
- // The channel is closed when the Connection closes.
++ // Incoming returns a channel for incoming endpoints opened by the remote peer.
++ // See the Incoming interface for more.
+ //
++ // Not receiving from Incoming() and calling Accept/Reject will block the
++ // electron event loop. You should run a loop to handle the types that
++ // interest you in a switch{} and and Accept() all others.
+ Incoming() <-chan Incoming
+}
+
++type connectionSettings struct {
++ user, virtualHost string
++ heartbeat time.Duration
++}
++
++func (c connectionSettings) User() string { return c.user }
++func (c connectionSettings) VirtualHost() string { return c.virtualHost }
++func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat }
++
+// ConnectionOption can be passed when creating a connection to configure various options
+type ConnectionOption func(*connection)
+
- // Server returns a ConnectionOption to put the connection in server mode.
++// User returns a ConnectionOption sets the user name for a connection
++func User(user string) ConnectionOption {
++ return func(c *connection) {
++ c.user = user
++ c.pConnection.SetUser(user)
++ }
++}
++
++// VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection.
++// Only applies to outbound client connection.
++func VirtualHost(virtualHost string) ConnectionOption {
++ return func(c *connection) {
++ c.virtualHost = virtualHost
++ c.pConnection.SetHostname(virtualHost)
++ }
++}
++
++// Password returns a ConnectionOption to set the password used to establish a
++// connection. Only applies to outbound client connection.
++//
++// The connection will erase its copy of the password from memory as soon as it
++// has been used to authenticate. If you are concerned about paswords staying in
++// memory you should never store them as strings, and should overwrite your
++// copy as soon as you are done with it.
++//
++func Password(password []byte) ConnectionOption {
++ return func(c *connection) { c.pConnection.SetPassword(password) }
++}
++
++// Server returns a ConnectionOption to put the connection in server mode for incoming connections.
+//
+// A server connection will do protocol negotiation to accept a incoming AMQP
+// connection. Normally you would call this for a connection created by
+// net.Listener.Accept()
+//
- func Server() ConnectionOption { return func(c *connection) { c.engine.Server() } }
++func Server() ConnectionOption {
++ return func(c *connection) { c.engine.Server(); c.server = true; AllowIncoming()(c) }
++}
+
- // AllowIncoming returns a ConnectionOption to enable incoming endpoint open requests.
- // See Connection.Incoming()
++// AllowIncoming returns a ConnectionOption to enable incoming endpoints, see
++// Connection.Incoming() This is automatically set for Server() connections.
+func AllowIncoming() ConnectionOption {
+ return func(c *connection) { c.incoming = make(chan Incoming) }
+}
+
++// Parent returns a ConnectionOption that associates the Connection with it's Container
++// If not set a connection will create its own default container.
++func Parent(cont Container) ConnectionOption {
++ return func(c *connection) { c.container = cont.(*container) }
++}
++
+type connection struct {
+ endpoint
++ connectionSettings
++
+ defaultSessionOnce, closeOnce sync.Once
+
+ container *container
+ conn net.Conn
++ server bool
+ incoming chan Incoming
+ handler *handler
+ engine *proton.Engine
- eConnection proton.Connection
++ pConnection proton.Connection
+
+ defaultSession Session
+}
+
- func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) {
- c := &connection{container: cont, conn: conn}
++// NewConnection creates a connection with the given options.
++func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) {
++ c := &connection{
++ conn: conn,
++ }
+ c.handler = newHandler(c)
+ var err error
+ c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
+ if err != nil {
+ return nil, err
+ }
- for _, set := range setting {
++ c.pConnection = c.engine.Connection()
++ for _, set := range opts {
+ set(c)
+ }
++ if c.container == nil {
++ c.container = NewContainer("").(*container)
++ }
++ c.pConnection.SetContainer(c.container.Id())
++ globalSASLInit(c.engine)
++
+ c.endpoint.init(c.engine.String())
- c.eConnection = c.engine.Connection()
+ go c.run()
+ return c, nil
+}
+
+func (c *connection) run() {
- c.engine.Run()
++ if !c.server {
++ c.pConnection.Open()
++ }
++ _ = c.engine.Run()
+ if c.incoming != nil {
+ close(c.incoming)
+ }
- c.closed(Closed)
++ _ = c.closed(Closed)
+}
+
+func (c *connection) Close(err error) {
+ c.err.Set(err)
+ c.engine.Close(err)
+}
+
+func (c *connection) Disconnect(err error) {
+ c.err.Set(err)
+ c.engine.Disconnect(err)
+}
+
- func (c *connection) Session(setting ...SessionOption) (Session, error) {
++func (c *connection) Session(opts ...SessionOption) (Session, error) {
+ var s Session
+ err := c.engine.InjectWait(func() error {
+ if c.Error() != nil {
+ return c.Error()
+ }
- eSession, err := c.engine.Connection().Session()
++ pSession, err := c.engine.Connection().Session()
+ if err == nil {
- eSession.Open()
++ pSession.Open()
+ if err == nil {
- s = newSession(c, eSession, setting...)
++ s = newSession(c, pSession, opts...)
+ }
+ }
+ return err
+ })
+ return s, err
+}
+
+func (c *connection) Container() Container { return c.container }
+
+func (c *connection) DefaultSession() (s Session, err error) {
+ c.defaultSessionOnce.Do(func() {
+ c.defaultSession, err = c.Session()
+ })
+ if err == nil {
+ err = c.Error()
+ }
+ return c.defaultSession, err
+}
+
- func (c *connection) Sender(setting ...LinkOption) (Sender, error) {
++func (c *connection) Sender(opts ...LinkOption) (Sender, error) {
+ if s, err := c.DefaultSession(); err == nil {
- return s.Sender(setting...)
++ return s.Sender(opts...)
+ } else {
+ return nil, err
+ }
+}
+
- func (c *connection) Receiver(setting ...LinkOption) (Receiver, error) {
++func (c *connection) Receiver(opts ...LinkOption) (Receiver, error) {
+ if s, err := c.DefaultSession(); err == nil {
- return s.Receiver(setting...)
++ return s.Receiver(opts...)
+ } else {
+ return nil, err
+ }
+}
+
+func (c *connection) Connection() Connection { return c }
+
+func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
+func (c *connection) WaitTimeout(timeout time.Duration) error {
+ _, err := timedReceive(c.done, timeout)
+ if err == Timeout {
+ return Timeout
+ }
+ return c.Error()
+}
+
- func (c *connection) Incoming() <-chan Incoming { return c.incoming }
++func (c *connection) Incoming() <-chan Incoming {
++ assert(c.incoming != nil, "electron.Connection.Incoming() disabled for %s", c)
++ return c.incoming
++}
+
- // Incoming is the interface for incoming requests to open an endpoint.
- // Implementing types are IncomingSession, IncomingSender and IncomingReceiver.
- type Incoming interface {
- // Accept and open the endpoint.
- Accept() Endpoint
++type IncomingConnection struct {
++ incoming
++ connectionSettings
++ c *connection
++}
+
- // Reject the endpoint with an error
- Reject(error)
++func newIncomingConnection(c *connection) *IncomingConnection {
++ c.user = c.pConnection.Transport().User()
++ c.virtualHost = c.pConnection.RemoteHostname()
++ return &IncomingConnection{
++ incoming: makeIncoming(c.pConnection),
++ connectionSettings: c.connectionSettings,
++ c: c}
++}
+
- // wait for and call the accept function, call in proton goroutine.
- wait() error
- pEndpoint() proton.Endpoint
++// AcceptConnection is like Accept() but takes ConnectionOption s
++// For example you can set the Heartbeat() for the accepted connection.
++func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection {
++ return in.accept(func() Endpoint {
++ for _, opt := range opts {
++ opt(in.c)
++ }
++ in.c.pConnection.Open()
++ return in.c
++ }).(Connection)
+}
+
- type incoming struct {
- endpoint proton.Endpoint
- acceptCh chan func() error
++func (in *IncomingConnection) Accept() Endpoint {
++ return in.AcceptConnection()
+}
+
- func makeIncoming(e proton.Endpoint) incoming {
- return incoming{endpoint: e, acceptCh: make(chan func() error)}
++func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() }
++
++// SASLEnable returns a ConnectionOption that enables SASL authentication.
++// Only required if you don't set any other SASL options.
++func SASLEnable() ConnectionOption { return func(c *connection) { sasl(c) } }
++
++// SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL
++// mechanisms.
++//
++// Can be used on the client or the server to restrict the SASL for a connection.
++// mechs is a space-separated list of mechanism names.
++//
++func SASLAllowedMechs(mechs string) ConnectionOption {
++ return func(c *connection) { sasl(c).AllowedMechs(mechs) }
+}
+
- func (in *incoming) String() string { return fmt.Sprintf("%s: %s", in.endpoint.Type(), in.endpoint) }
- func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } }
++// SASLAllowInsecure returns a ConnectionOption that allows or disallows clear
++// text SASL authentication mechanisms
++//
++// By default the SASL layer is configured not to allow mechanisms that disclose
++// the clear text of the password over an unencrypted AMQP connection. This specifically
++// will disallow the use of the PLAIN mechanism without using SSL encryption.
++//
++// This default is to avoid disclosing password information accidentally over an
++// insecure network.
++//
++func SASLAllowInsecure(b bool) ConnectionOption {
++ return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) }
++}
+
- // Call in proton goroutine, wait for and call the accept function fr
- func (in *incoming) wait() error { return (<-in.acceptCh)() }
++// Heartbeat returns a ConnectionOption that requests the maximum delay
++// between sending frames for the remote peer. If we don't receive any frames
++// within 2*delay we will close the connection.
++//
++func Heartbeat(delay time.Duration) ConnectionOption {
++ // Proton-C divides the idle-timeout by 2 before sending, so compensate.
++ return func(c *connection) { c.engine.Transport().SetIdleTimeout(2 * delay) }
++}
+
- func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint }
++// GlobalSASLConfigDir sets the SASL configuration directory for every
++// Connection created in this process. If not called, the default is determined
++// by your SASL installation.
++//
++// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
++//
++func GlobalSASLConfigDir(dir string) { globalSASLConfigDir = dir }
+
- // Called in app goroutine to send an accept function to proton and return the resulting endpoint.
- func (in *incoming) accept(f func() Endpoint) Endpoint {
- done := make(chan Endpoint)
- in.acceptCh <- func() error {
- ep := f()
- done <- ep
- return nil
++// GlobalSASLConfigName sets the SASL configuration name for every Connection
++// created in this process. If not called the default is "proton-server".
++//
++// The complete configuration file name is
++// <sasl-config-dir>/<sasl-config-name>.conf
++//
++// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
++//
++func GlobalSASLConfigName(dir string) { globalSASLConfigName = dir }
++
++var (
++ globalSASLConfigName string
++ globalSASLConfigDir string
++)
++
++// TODO aconway 2016-09-15: Current pn_sasl C impl config is broken, so all we
++// can realistically offer is global configuration. Later if/when the pn_sasl C
++// impl is fixed we can offer per connection over-rides.
++func globalSASLInit(eng *proton.Engine) {
++ sasl := eng.Transport().SASL()
++ if globalSASLConfigName != "" {
++ sasl.ConfigName(globalSASLConfigName)
++ }
++ if globalSASLConfigDir != "" {
++ sasl.ConfigPath(globalSASLConfigDir)
++ }
++}
++
++// Dial is shorthand for using net.Dial() then NewConnection()
++func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err error) {
++ conn, err := net.Dial(network, addr)
++ if err == nil {
++ c, err = NewConnection(conn, opts...)
++ }
++ return
++}
++
++// DialWithDialer is shorthand for using dialer.Dial() then NewConnection()
++func DialWithDialer(dialer *net.Dialer, network, addr string, opts ...ConnectionOption) (c Connection, err error) {
++ conn, err := dialer.Dial(network, addr)
++ if err == nil {
++ c, err = NewConnection(conn, opts...)
+ }
- return <-done
++ return
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/container.go
----------------------------------------------------------------------
diff --cc electron/container.go
index b5ce6c0,0000000..efb24ff
mode 100644,000000..100644
--- a/electron/container.go
+++ b/electron/container.go
@@@ -1,77 -1,0 +1,104 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "net"
+ "qpid.apache.org/proton"
+ "strconv"
+ "sync/atomic"
+)
+
- // Container is an AMQP container, it represents a single AMQP "application".It
- // provides functions to create new Connections to remote containers.
++// Container is an AMQP container, it represents a single AMQP "application"
++// which can have multiple client or server connections.
++//
++// Each Container in a distributed AMQP application must have a unique
++// container-id which is applied to its connections.
+//
+// Create with NewContainer()
+//
+type Container interface {
+ // Id is a unique identifier for the container in your distributed application.
+ Id() string
+
- // Create a new AMQP Connection over the supplied net.Conn connection.
- //
- // You must call Connection.Open() on the returned Connection, after
- // setting any Connection properties you need to set. Note the net.Conn
- // can be an outgoing connection (e.g. made with net.Dial) or an incoming
- // connection (e.g. made with net.Listener.Accept())
- Connection(net.Conn, ...ConnectionOption) (Connection, error)
++ // Connection creates a connection associated with this container.
++ Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error)
++
++ // Dial is shorthand for
++ // conn, err := net.Dial(); c, err := Connection(conn, opts...)
++ Dial(network string, addr string, opts ...ConnectionOption) (Connection, error)
++
++ // Accept is shorthand for:
++ // conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...)
++ Accept(l net.Listener, opts ...ConnectionOption) (Connection, error)
++
++ // String returns Id()
++ String() string
+}
+
+type container struct {
+ id string
+ tagCounter uint64
+}
+
+func (cont *container) nextTag() string {
+ return strconv.FormatUint(atomic.AddUint64(&cont.tagCounter, 1), 32)
+}
+
+// NewContainer creates a new container. The id must be unique in your
+// distributed application, all connections created by the container
+// will have this container-id.
+//
+// If id == "" a random UUID will be generated for the id.
+func NewContainer(id string) Container {
+ if id == "" {
+ id = proton.UUID4().String()
+ }
+ cont := &container{id: id}
+ return cont
+}
+
+func (cont *container) Id() string { return cont.id }
+
++func (cont *container) String() string { return cont.Id() }
++
+func (cont *container) nextLinkName() string {
+ return cont.id + "@" + cont.nextTag()
+}
+
- func (cont *container) Connection(conn net.Conn, setting ...ConnectionOption) (Connection, error) {
- return newConnection(conn, cont, setting...)
++func (cont *container) Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error) {
++ return NewConnection(conn, append(opts, Parent(cont))...)
++}
++
++func (cont *container) Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) {
++ conn, err := net.Dial(network, address)
++ if err == nil {
++ c, err = cont.Connection(conn, opts...)
++ }
++ return
++}
++
++func (cont *container) Accept(l net.Listener, opts ...ConnectionOption) (c Connection, err error) {
++ conn, err := l.Accept()
++ if err == nil {
++ c, err = cont.Connection(conn, append(opts, Server())...)
++ }
++ return
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/287eeaca/electron/doc.go
----------------------------------------------------------------------
diff --cc electron/doc.go
index 46bde37,0000000..436e5df
mode 100644,000000..100644
--- a/electron/doc.go
+++ b/electron/doc.go
@@@ -1,63 -1,0 +1,72 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/*
+Package electron is a procedural, concurrent-safe Go library for AMQP messaging.
+You can write clients and servers using this library.
+
- Start by creating a Container with NewContainer. A Container represents a client
- or server application that can contain many incoming or outgoing connections.
++Start by creating a Container with NewContainer. An AMQP Container represents a
++single AMQP "application" and can contain client and server connections.
+
- Create connections with the standard Go 'net' package using net.Dial or
- net.Listen. Create an AMQP connection over a net.Conn with
- Container.Connection() and open it with Connection.Open().
++You can enable AMQP over any connection that implements the standard net.Conn
++interface. Typically you can connect with net.Dial() or listen for server
++connections with net.Listen. Enable AMQP by passing the net.Conn to
++Container.Connection().
+
- AMQP sends messages over "links". Each link has a Sender end and a Receiver
- end. Connection.Sender() and Connection.Receiver() allow you to create links to
- Send() and Receive() messages.
++AMQP allows bi-direction peer-to-peer message exchange as well as
++client-to-broker. Messages are sent over "links". Each link is one-way and has a
++Sender and Receiver end. Connection.Sender() and Connection.Receiver() open
++links to Send() and Receive() messages. Connection.Incoming() lets you accept
++incoming links opened by the remote peer. You can open and accept multiple links
++in both directions on a single Connection.
+
- You can create an AMQP server connection by calling Connection.Server() and
- Connection.Listen() before calling Connection.Open(). A server connection can
- negotiate protocol security details and can accept incoming links opened from
- the remote end of the connection.
++Some of the documentation examples show client and server side by side in a
++single program, in separate goroutines. This is only for example purposes, real
++AMQP applications would run in separate processes on the network.
++More realistic examples: https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
++
++Some of the documentation examples show client and server side by side in a
++single program, in separate goroutines. This is only for example purposes, real
++AMQP applications would run in separate processes on the network.
++More realistic examples: https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
+
+*/
+package electron
+
+//#cgo LDFLAGS: -lqpid-proton
+import "C"
+
+// Just for package comment
+
+/* DEVELOPER NOTES
+
+There is a single proton.Engine per connection, each driving it's own event-loop goroutine,
+and each with a 'handler'. Most state for a connection is maintained on the handler, and
+only accessed in the event-loop goroutine, so no locks are required there.
+
+The handler sets up channels as needed to get or send data from user goroutines
+using electron types like Sender or Receiver.
+
- We also use Engine.Inject to inject actions into the event loop from user
- goroutines. It is important to check at the start of an injected function that
- required objects are still valid, for example a link may be remotely closed
- between the time a Sender function calls Inject and the time the injected
- function is execute by the handler goroutine. See comments in endpoint.go for more.
++Engine.Inject injects actions into the event loop from user goroutines. It is
++important to check at the start of an injected function that required objects
++are still valid, for example a link may be remotely closed between the time a
++Sender function calls Inject and the time the injected function is execute by
++the handler goroutine.
+
+*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org