You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/01/18 01:16:17 UTC

[01/10] qpid-proton git commit: PROTON-1386: Disable PHP binding by default

Repository: qpid-proton
Updated Branches:
  refs/heads/go1 793e21010 -> 5c4f4d7f4


PROTON-1386: Disable PHP binding by default


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/431c00d5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/431c00d5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/431c00d5

Branch: refs/heads/go1
Commit: 431c00d5f3198192c72cdb5aba10c111b19da7a8
Parents: 951dcb4
Author: Andrew Stitcher <as...@apache.org>
Authored: Tue Jan 10 16:41:33 2017 -0500
Committer: Andrew Stitcher <as...@apache.org>
Committed: Tue Jan 10 16:41:33 2017 -0500

----------------------------------------------------------------------
 proton-c/CMakeLists.txt          | 4 ++++
 proton-c/bindings/CMakeLists.txt | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/431c00d5/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index f674eee..8edb661 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -190,6 +190,10 @@ endif (PN_WINAPI)
 
 # Try to keep any platform specific overrides together here:
 
+# Until we can decide what to do with PHP support, turn it off by default
+# (We can't build with recent versions of PHP)
+set (NOBUILD_PHP ON)
+
 # MacOS has a bunch of differences in build tools and process and so we have to turn some things
 # off if building there:
 if (APPLE)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/431c00d5/proton-c/bindings/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/CMakeLists.txt b/proton-c/bindings/CMakeLists.txt
index 37943dc..6b88384 100644
--- a/proton-c/bindings/CMakeLists.txt
+++ b/proton-c/bindings/CMakeLists.txt
@@ -134,7 +134,7 @@ endif()
 foreach(BINDING ${BINDINGS})
   string(TOUPPER ${BINDING} UBINDING)
   # Check whether default was overridden
-  if ("NOBUILD_${UBINDING}")
+  if (NOBUILD_${UBINDING})
     set ("DEFAULT_${UBINDING}" OFF)
   endif ()
   option("BUILD_${UBINDING}" "Build ${BINDING} language binding" ${DEFAULT_${UBINDING}})


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[10/10] qpid-proton git commit: PROTON-1390: Merge branch 'master' into go1 for fix

Posted by ac...@apache.org.
PROTON-1390: Merge branch 'master' into go1 for fix


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5c4f4d7f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5c4f4d7f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5c4f4d7f

Branch: refs/heads/go1
Commit: 5c4f4d7f44b866e336bb93bb7e2a86afdc366244
Parents: 793e210 71e567d
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Jan 17 20:15:44 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Jan 17 20:15:44 2017 -0500

----------------------------------------------------------------------
 amqp/marshal.go   | 45 +++++++++++++++++++++++++++++++++------------
 amqp/unmarshal.go | 50 ++++++++++++++++++++++++++++++--------------------
 2 files changed, 63 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5c4f4d7f/amqp/marshal.go
----------------------------------------------------------------------
diff --cc amqp/marshal.go
index bce7323,0000000..b6adf90
mode 100644,000000..100644
--- a/amqp/marshal.go
+++ b/amqp/marshal.go
@@@ -1,250 -1,0 +1,271 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +// #include <proton/codec.h>
 +import "C"
 +
 +import (
 +	"fmt"
 +	"io"
 +	"reflect"
 +	"unsafe"
 +)
 +
- func dataError(prefix string, data *C.pn_data_t) error {
- 	err := PnError(C.pn_data_error(data))
- 	if err != nil {
- 		err = fmt.Errorf("%s: %s", prefix, err.Error())
++// Error returned if Go data cannot be marshaled as an AMQP type.
++type MarshalError struct {
++	// The Go type.
++	GoType reflect.Type
++	s      string
++}
++
++func (e MarshalError) Error() string { return e.s }
++
++func newMarshalError(v interface{}, s string) *MarshalError {
++	t := reflect.TypeOf(v)
++	return &MarshalError{GoType: t, s: fmt.Sprintf("cannot marshal %s: %s", t, s)}
++}
++
++func dataMarshalError(v interface{}, data *C.pn_data_t) error {
++	if pe := PnError(C.pn_data_error(data)); pe != nil {
++		return newMarshalError(v, pe.Error())
 +	}
- 	return err
++	return nil
 +}
 +
 +/*
 +Marshal encodes a Go value as AMQP data in buffer.
 +If buffer is nil, or is not large enough, a new buffer  is created.
 +
 +Returns the buffer used for encoding with len() adjusted to the actual size of data.
 +
 +Go types are encoded as follows
 +
 + +-------------------------------------+--------------------------------------------+
 + |Go type                              |AMQP type                                   |
 + +-------------------------------------+--------------------------------------------+
 + |bool                                 |bool                                        |
 + +-------------------------------------+--------------------------------------------+
 + |int8, int16, int32, int64 (int)      |byte, short, int, long (int or long)        |
 + +-------------------------------------+--------------------------------------------+
 + |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong)  |
 + +-------------------------------------+--------------------------------------------+
 + |float32, float64                     |float, double.                              |
 + +-------------------------------------+--------------------------------------------+
 + |string                               |string                                      |
 + +-------------------------------------+--------------------------------------------+
 + |[]byte, Binary                       |binary                                      |
 + +-------------------------------------+--------------------------------------------+
 + |Symbol                               |symbol                                      |
 + +-------------------------------------+--------------------------------------------+
 + |interface{}                          |the contained type                          |
 + +-------------------------------------+--------------------------------------------+
 + |nil                                  |null                                        |
 + +-------------------------------------+--------------------------------------------+
 + |map[K]T                              |map with K and T converted as above         |
 + +-------------------------------------+--------------------------------------------+
 + |Map                                  |map, may have mixed types for keys, values  |
 + +-------------------------------------+--------------------------------------------+
 + |[]T                                  |list with T converted as above              |
 + +-------------------------------------+--------------------------------------------+
 + |List                                 |list, may have mixed types  values          |
 + +-------------------------------------+--------------------------------------------+
 +
 +The following Go types cannot be marshaled: uintptr, function, interface, channel
 +
 +TODO
 +
 +Go types: array, slice, struct, complex64/128.
 +
 +AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies.
 +
 +Described types.
 +
 +*/
 +func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
- 	defer doRecover(&err)
++	defer func() {
++		if r := recover(); r != nil {
++			if merr, ok := r.(*MarshalError); ok {
++				err = merr
++			} else {
++				panic(r)
++			}
++		}
++	}()
++
 +	data := C.pn_data(0)
 +	defer C.pn_data_free(data)
 +	marshal(v, data)
 +	encode := func(buf []byte) ([]byte, error) {
 +		n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
 +		switch {
 +		case n == int(C.PN_OVERFLOW):
 +			return buf, overflow
 +		case n < 0:
- 			return buf, dataError("marshal error", data)
++			return buf, dataMarshalError(v, data)
 +		default:
 +			return buf[:n], nil
 +		}
 +	}
 +	return encodeGrow(buffer, encode)
 +}
 +
 +const minEncode = 256
 +
 +// overflow is returned when an encoding function can't fit data in the buffer.
 +var overflow = fmt.Errorf("buffer too small")
 +
 +// encodeFn encodes into buffer[0:len(buffer)].
 +// Returns buffer with length adjusted for data encoded.
 +// If buffer too small, returns overflow as error.
 +type encodeFn func(buffer []byte) ([]byte, error)
 +
 +// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
 +// Returns the final buffer.
 +func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
 +	if buffer == nil || len(buffer) == 0 {
 +		buffer = make([]byte, minEncode)
 +	}
 +	var err error
 +	for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
 +		buffer = make([]byte, 2*len(buffer))
 +	}
 +	return buffer, err
 +}
 +
 +func marshal(v interface{}, data *C.pn_data_t) {
 +	switch v := v.(type) {
 +	case nil:
 +		C.pn_data_put_null(data)
 +	case bool:
 +		C.pn_data_put_bool(data, C.bool(v))
 +	case int8:
 +		C.pn_data_put_byte(data, C.int8_t(v))
 +	case int16:
 +		C.pn_data_put_short(data, C.int16_t(v))
 +	case int32:
 +		C.pn_data_put_int(data, C.int32_t(v))
 +	case int64:
 +		C.pn_data_put_long(data, C.int64_t(v))
 +	case int:
- 		if unsafe.Sizeof(0) == 8 {
++		if unsafe.Sizeof(int(0)) == 8 {
 +			C.pn_data_put_long(data, C.int64_t(v))
 +		} else {
 +			C.pn_data_put_int(data, C.int32_t(v))
 +		}
 +	case uint8:
 +		C.pn_data_put_ubyte(data, C.uint8_t(v))
 +	case uint16:
 +		C.pn_data_put_ushort(data, C.uint16_t(v))
 +	case uint32:
 +		C.pn_data_put_uint(data, C.uint32_t(v))
 +	case uint64:
 +		C.pn_data_put_ulong(data, C.uint64_t(v))
 +	case uint:
- 		if unsafe.Sizeof(0) == 8 {
++		if unsafe.Sizeof(int(0)) == 8 {
 +			C.pn_data_put_ulong(data, C.uint64_t(v))
 +		} else {
 +			C.pn_data_put_uint(data, C.uint32_t(v))
 +		}
 +	case float32:
 +		C.pn_data_put_float(data, C.float(v))
 +	case float64:
 +		C.pn_data_put_double(data, C.double(v))
 +	case string:
 +		C.pn_data_put_string(data, pnBytes([]byte(v)))
 +	case []byte:
 +		C.pn_data_put_binary(data, pnBytes(v))
 +	case Binary:
 +		C.pn_data_put_binary(data, pnBytes([]byte(v)))
 +	case Symbol:
 +		C.pn_data_put_symbol(data, pnBytes([]byte(v)))
 +	case Map: // Special map type
 +		C.pn_data_put_map(data)
 +		C.pn_data_enter(data)
 +		for key, val := range v {
 +			marshal(key, data)
 +			marshal(val, data)
 +		}
 +		C.pn_data_exit(data)
 +	default:
 +		switch reflect.TypeOf(v).Kind() {
 +		case reflect.Map:
 +			putMap(data, v)
 +		case reflect.Slice:
 +			putList(data, v)
 +		default:
- 			panic(fmt.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
++			panic(newMarshalError(v, "no conversion"))
 +		}
 +	}
- 	err := dataError("marshal", data)
- 	if err != nil {
++	if err := dataMarshalError(v, data); err != nil {
 +		panic(err)
 +	}
 +	return
 +}
 +
 +func clearMarshal(v interface{}, data *C.pn_data_t) {
 +	C.pn_data_clear(data)
 +	marshal(v, data)
 +}
 +
 +func putMap(data *C.pn_data_t, v interface{}) {
 +	mapValue := reflect.ValueOf(v)
 +	C.pn_data_put_map(data)
 +	C.pn_data_enter(data)
 +	for _, key := range mapValue.MapKeys() {
 +		marshal(key.Interface(), data)
 +		marshal(mapValue.MapIndex(key).Interface(), data)
 +	}
 +	C.pn_data_exit(data)
 +}
 +
 +func putList(data *C.pn_data_t, v interface{}) {
 +	listValue := reflect.ValueOf(v)
 +	C.pn_data_put_list(data)
 +	C.pn_data_enter(data)
 +	for i := 0; i < listValue.Len(); i++ {
 +		marshal(listValue.Index(i).Interface(), data)
 +	}
 +	C.pn_data_exit(data)
 +}
 +
 +// Encoder encodes AMQP values to an io.Writer
 +type Encoder struct {
 +	writer io.Writer
 +	buffer []byte
 +}
 +
 +// New encoder returns a new encoder that writes to w.
 +func NewEncoder(w io.Writer) *Encoder {
 +	return &Encoder{w, make([]byte, minEncode)}
 +}
 +
 +func (e *Encoder) Encode(v interface{}) (err error) {
 +	e.buffer, err = Marshal(v, e.buffer)
 +	if err == nil {
 +		_, err = e.writer.Write(e.buffer)
 +	}
 +	return err
 +}
 +
 +func replace(data *C.pn_data_t, v interface{}) {
 +	C.pn_data_clear(data)
 +	marshal(v, data)
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5c4f4d7f/amqp/unmarshal.go
----------------------------------------------------------------------
diff --cc amqp/unmarshal.go
index 8f380a7,0000000..d56cbd2
mode 100644,000000..100644
--- a/amqp/unmarshal.go
+++ b/amqp/unmarshal.go
@@@ -1,557 -1,0 +1,567 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +oor more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +// #include <proton/codec.h>
 +import "C"
 +
 +import (
 +	"bytes"
 +	"fmt"
 +	"io"
 +	"reflect"
 +	"unsafe"
 +)
 +
 +const minDecode = 1024
 +
 +// Error returned if AMQP data cannot be unmarshaled as the desired Go type.
 +type UnmarshalError struct {
 +	// The name of the AMQP type.
 +	AMQPType string
 +	// The Go type.
 +	GoType reflect.Type
- }
 +
- func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
- 	return &UnmarshalError{C.pn_type_t(pnType).String(), reflect.TypeOf(v)}
++	s string
 +}
 +
- func (e UnmarshalError) Error() string {
++func (e UnmarshalError) Error() string { return e.s }
++
++func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
++	e := &UnmarshalError{AMQPType: C.pn_type_t(pnType).String(), GoType: reflect.TypeOf(v)}
 +	if e.GoType.Kind() != reflect.Ptr {
- 		return fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType)
++		e.s = fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType)
 +	} else {
- 		return fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
++		e.s = fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
 +	}
++	return e
 +}
 +
- func doRecover(err *error) {
- 	r := recover()
- 	switch r := r.(type) {
- 	case nil:
- 	case *UnmarshalError:
- 		*err = r
- 	default:
- 		panic(r)
++func newUnmarshalErrorData(data *C.pn_data_t, v interface{}) *UnmarshalError {
++	err := PnError(C.pn_data_error(data))
++	if err == nil {
++		return nil
++	}
++	e := newUnmarshalError(C.pn_data_type(data), v)
++	e.s = e.s + ": " + err.Error()
++	return e
++}
++
++func recoverUnmarshal(err *error) {
++	if r := recover(); r != nil {
++		if uerr, ok := r.(*UnmarshalError); ok {
++			*err = uerr
++		} else {
++			panic(r)
++		}
 +	}
 +}
 +
 +//
 +// Decoding from a pn_data_t
 +//
 +// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
 +// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
 +//
 +
 +// Decoder decodes AMQP values from an io.Reader.
 +//
 +type Decoder struct {
 +	reader io.Reader
 +	buffer bytes.Buffer
 +}
 +
 +// NewDecoder returns a new decoder that reads from r.
 +//
 +// The decoder has it's own buffer and may read more data than required for the
 +// AMQP values requested.  Use Buffered to see if there is data left in the
 +// buffer.
 +//
 +func NewDecoder(r io.Reader) *Decoder {
 +	return &Decoder{r, bytes.Buffer{}}
 +}
 +
 +// Buffered returns a reader of the data remaining in the Decoder's buffer. The
 +// reader is valid until the next call to Decode.
 +//
 +func (d *Decoder) Buffered() io.Reader {
 +	return bytes.NewReader(d.buffer.Bytes())
 +}
 +
 +// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
 +//
 +// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
 +//
 +func (d *Decoder) Decode(v interface{}) (err error) {
- 	defer doRecover(&err)
++	defer recoverUnmarshal(&err)
 +	data := C.pn_data(0)
 +	defer C.pn_data_free(data)
 +	var n int
 +	for n == 0 {
 +		n, err = decode(data, d.buffer.Bytes())
 +		if err != nil {
 +			return err
 +		}
 +		if n == 0 { // n == 0 means not enough data, read more
 +			err = d.more()
 +		} else {
 +			unmarshal(v, data)
 +		}
 +	}
 +	d.buffer.Next(n)
 +	return
 +}
 +
 +/*
 +Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
 +Types are converted as follows:
 +
 + +---------------------------+----------------------------------------------------------------------+
 + |To Go types                |From AMQP types                                                       |
 + +===========================+======================================================================+
 + |bool                       |bool                                                                  |
 + +---------------------------+----------------------------------------------------------------------+
 + |int, int8, int16,          |Equivalent or smaller signed integer type: byte, short, int, long.    |
 + |int32, int64               |                                                                      |
 + +---------------------------+----------------------------------------------------------------------+
 + |uint, uint8, uint16,       |Equivalent or smaller unsigned integer type: ubyte, ushort, uint,     |
 + |uint32, uint64 types       |ulong                                                                 |
 + +---------------------------+----------------------------------------------------------------------+
 + |float32, float64           |Equivalent or smaller float or double.                                |
 + +---------------------------+----------------------------------------------------------------------+
 + |string, []byte             |string, symbol or binary.                                             |
 + +---------------------------+----------------------------------------------------------------------+
 + |Symbol                     |symbol                                                                |
 + +---------------------------+----------------------------------------------------------------------+
 + |map[K]T                    |map, provided all keys and values can unmarshal to types K, T         |
 + +---------------------------+----------------------------------------------------------------------+
 + |Map                        |map, any AMQP map                                                     |
 + +---------------------------+----------------------------------------------------------------------+
 + |interface{}                |Any AMQP value can be unmarshaled to an interface{} as follows:       |
 + |                           +------------------------+---------------------------------------------+
 + |                           |AMQP Type               |Go Type in interface{}                       |
 + |                           +========================+=============================================+
 + |                           |bool                    |bool                                         |
 + |                           +------------------------+---------------------------------------------+
 + |                           |byte,short,int,long     |int8,int16,int32,int64                       |
 + |                           +------------------------+---------------------------------------------+
 + |                           |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64                   |
 + |                           +------------------------+---------------------------------------------+
 + |                           |float, double           |float32, float64                             |
 + |                           +------------------------+---------------------------------------------+
 + |                           |string                  |string                                       |
 + |                           +------------------------+---------------------------------------------+
 + |                           |symbol                  |Symbol                                       |
 + |                           +------------------------+---------------------------------------------+
 + |                           |binary                  |Binary                                       |
 + |                           +------------------------+---------------------------------------------+
 + |                           |nulll                   |nil                                          |
 + |                           +------------------------+---------------------------------------------+
 + |                           |map                     |Map                                          |
 + |                           +------------------------+---------------------------------------------+
 + |                           |list                    |List                                         |
 + +---------------------------+------------------------+---------------------------------------------+
 +
 +The following Go types cannot be unmarshaled: uintptr, function, interface, channel.
 +
 +TODO
 +
 +Go types: array, struct.
 +
 +AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies.
 +
 +AMQP maps with mixed/unhashable key types need an alternate representation.
 +
 +Described types.
 +*/
 +func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
- 	defer doRecover(&err)
++	defer recoverUnmarshal(&err)
 +
 +	data := C.pn_data(0)
 +	defer C.pn_data_free(data)
 +	n, err = decode(data, bytes)
 +	if err != nil {
 +		return 0, err
 +	}
 +	if n == 0 {
 +		return 0, fmt.Errorf("not enough data")
 +	} else {
 +		unmarshal(v, data)
 +	}
 +	return n, nil
 +}
 +
 +// more reads more data when we can't parse a complete AMQP type
 +func (d *Decoder) more() error {
 +	var readSize int64 = minDecode
 +	if int64(d.buffer.Len()) > readSize { // Grow by doubling
 +		readSize = int64(d.buffer.Len())
 +	}
 +	var n int64
 +	n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
 +	if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
 +		err = io.EOF
 +	}
 +	return err
 +}
 +
 +// Unmarshal from data into value pointed at by v.
 +func unmarshal(v interface{}, data *C.pn_data_t) {
 +	pnType := C.pn_data_type(data)
 +	switch v := v.(type) {
 +	case *bool:
 +		switch pnType {
 +		case C.PN_BOOL:
 +			*v = bool(C.pn_data_get_bool(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *int8:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = int8(C.pn_data_get_char(data))
 +		case C.PN_BYTE:
 +			*v = int8(C.pn_data_get_byte(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *uint8:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = uint8(C.pn_data_get_char(data))
 +		case C.PN_UBYTE:
 +			*v = uint8(C.pn_data_get_ubyte(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *int16:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = int16(C.pn_data_get_char(data))
 +		case C.PN_BYTE:
 +			*v = int16(C.pn_data_get_byte(data))
 +		case C.PN_SHORT:
 +			*v = int16(C.pn_data_get_short(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *uint16:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = uint16(C.pn_data_get_char(data))
 +		case C.PN_UBYTE:
 +			*v = uint16(C.pn_data_get_ubyte(data))
 +		case C.PN_USHORT:
 +			*v = uint16(C.pn_data_get_ushort(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *int32:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = int32(C.pn_data_get_char(data))
 +		case C.PN_BYTE:
 +			*v = int32(C.pn_data_get_byte(data))
 +		case C.PN_SHORT:
 +			*v = int32(C.pn_data_get_short(data))
 +		case C.PN_INT:
 +			*v = int32(C.pn_data_get_int(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *uint32:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = uint32(C.pn_data_get_char(data))
 +		case C.PN_UBYTE:
 +			*v = uint32(C.pn_data_get_ubyte(data))
 +		case C.PN_USHORT:
 +			*v = uint32(C.pn_data_get_ushort(data))
 +		case C.PN_UINT:
 +			*v = uint32(C.pn_data_get_uint(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *int64:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = int64(C.pn_data_get_char(data))
 +		case C.PN_BYTE:
 +			*v = int64(C.pn_data_get_byte(data))
 +		case C.PN_SHORT:
 +			*v = int64(C.pn_data_get_short(data))
 +		case C.PN_INT:
 +			*v = int64(C.pn_data_get_int(data))
 +		case C.PN_LONG:
 +			*v = int64(C.pn_data_get_long(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *uint64:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = uint64(C.pn_data_get_char(data))
 +		case C.PN_UBYTE:
 +			*v = uint64(C.pn_data_get_ubyte(data))
 +		case C.PN_USHORT:
 +			*v = uint64(C.pn_data_get_ushort(data))
 +		case C.PN_ULONG:
 +			*v = uint64(C.pn_data_get_ulong(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *int:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = int(C.pn_data_get_char(data))
 +		case C.PN_BYTE:
 +			*v = int(C.pn_data_get_byte(data))
 +		case C.PN_SHORT:
 +			*v = int(C.pn_data_get_short(data))
 +		case C.PN_INT:
 +			*v = int(C.pn_data_get_int(data))
 +		case C.PN_LONG:
- 			if unsafe.Sizeof(0) == 8 {
++			if unsafe.Sizeof(int(0)) == 8 {
 +				*v = int(C.pn_data_get_long(data))
 +			} else {
 +				panic(newUnmarshalError(pnType, v))
 +			}
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *uint:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = uint(C.pn_data_get_char(data))
 +		case C.PN_UBYTE:
 +			*v = uint(C.pn_data_get_ubyte(data))
 +		case C.PN_USHORT:
 +			*v = uint(C.pn_data_get_ushort(data))
 +		case C.PN_UINT:
 +			*v = uint(C.pn_data_get_uint(data))
 +		case C.PN_ULONG:
- 			if unsafe.Sizeof(0) == 8 {
++			if unsafe.Sizeof(int(0)) == 8 {
 +				*v = uint(C.pn_data_get_ulong(data))
 +			} else {
 +				panic(newUnmarshalError(pnType, v))
 +			}
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *float32:
 +		switch pnType {
 +		case C.PN_FLOAT:
 +			*v = float32(C.pn_data_get_float(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *float64:
 +		switch pnType {
 +		case C.PN_FLOAT:
 +			*v = float64(C.pn_data_get_float(data))
 +		case C.PN_DOUBLE:
 +			*v = float64(C.pn_data_get_double(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *string:
 +		switch pnType {
 +		case C.PN_STRING:
 +			*v = goString(C.pn_data_get_string(data))
 +		case C.PN_SYMBOL:
 +			*v = goString(C.pn_data_get_symbol(data))
 +		case C.PN_BINARY:
 +			*v = goString(C.pn_data_get_binary(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *[]byte:
 +		switch pnType {
 +		case C.PN_STRING:
 +			*v = goBytes(C.pn_data_get_string(data))
 +		case C.PN_SYMBOL:
 +			*v = goBytes(C.pn_data_get_symbol(data))
 +		case C.PN_BINARY:
 +			*v = goBytes(C.pn_data_get_binary(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *Binary:
 +		switch pnType {
 +		case C.PN_BINARY:
 +			*v = Binary(goBytes(C.pn_data_get_binary(data)))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *Symbol:
 +		switch pnType {
 +		case C.PN_SYMBOL:
 +			*v = Symbol(goBytes(C.pn_data_get_symbol(data)))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *interface{}:
 +		getInterface(data, v)
 +
 +	default:
 +		if reflect.TypeOf(v).Kind() != reflect.Ptr {
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +		switch reflect.TypeOf(v).Elem().Kind() {
 +		case reflect.Map:
 +			getMap(data, v)
 +		case reflect.Slice:
 +			getList(data, v)
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	}
- 	err := dataError("unmarshaling", data)
- 	if err != nil {
++	if err := newUnmarshalErrorData(data, v); err != nil {
 +		panic(err)
 +	}
 +	return
 +}
 +
 +func rewindUnmarshal(v interface{}, data *C.pn_data_t) {
 +	C.pn_data_rewind(data)
 +	C.pn_data_next(data)
 +	unmarshal(v, data)
 +}
 +
 +// Getting into an interface is driven completely by the AMQP type, since the interface{}
 +// target is type-neutral.
 +func getInterface(data *C.pn_data_t, v *interface{}) {
 +	pnType := C.pn_data_type(data)
 +	switch pnType {
 +	case C.PN_BOOL:
 +		*v = bool(C.pn_data_get_bool(data))
 +	case C.PN_UBYTE:
 +		*v = uint8(C.pn_data_get_ubyte(data))
 +	case C.PN_BYTE:
 +		*v = int8(C.pn_data_get_byte(data))
 +	case C.PN_USHORT:
 +		*v = uint16(C.pn_data_get_ushort(data))
 +	case C.PN_SHORT:
 +		*v = int16(C.pn_data_get_short(data))
 +	case C.PN_UINT:
 +		*v = uint32(C.pn_data_get_uint(data))
 +	case C.PN_INT:
 +		*v = int32(C.pn_data_get_int(data))
 +	case C.PN_CHAR:
 +		*v = uint8(C.pn_data_get_char(data))
 +	case C.PN_ULONG:
 +		*v = uint64(C.pn_data_get_ulong(data))
 +	case C.PN_LONG:
 +		*v = int64(C.pn_data_get_long(data))
 +	case C.PN_FLOAT:
 +		*v = float32(C.pn_data_get_float(data))
 +	case C.PN_DOUBLE:
 +		*v = float64(C.pn_data_get_double(data))
 +	case C.PN_BINARY:
 +		*v = Binary(goBytes(C.pn_data_get_binary(data)))
 +	case C.PN_STRING:
 +		*v = goString(C.pn_data_get_string(data))
 +	case C.PN_SYMBOL:
 +		*v = Symbol(goString(C.pn_data_get_symbol(data)))
 +	case C.PN_MAP:
 +		m := make(Map)
 +		unmarshal(&m, data)
 +		*v = m
 +	case C.PN_LIST:
 +		l := make(List, 0)
 +		unmarshal(&l, data)
 +		*v = l
 +	default: // No data (-1 or NULL)
 +		*v = nil
 +	}
 +}
 +
 +// get into map pointed at by v
 +func getMap(data *C.pn_data_t, v interface{}) {
 +	mapValue := reflect.ValueOf(v).Elem()
 +	mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map
 +	switch pnType := C.pn_data_type(data); pnType {
 +	case C.PN_MAP:
 +		count := int(C.pn_data_get_map(data))
 +		if bool(C.pn_data_enter(data)) {
 +			defer C.pn_data_exit(data)
 +			for i := 0; i < count/2; i++ {
 +				if bool(C.pn_data_next(data)) {
 +					key := reflect.New(mapValue.Type().Key())
 +					unmarshal(key.Interface(), data)
 +					if bool(C.pn_data_next(data)) {
 +						val := reflect.New(mapValue.Type().Elem())
 +						unmarshal(val.Interface(), data)
 +						mapValue.SetMapIndex(key.Elem(), val.Elem())
 +					}
 +				}
 +			}
 +		}
 +	default: // Empty/error/unknown, leave map empty
 +	}
 +}
 +
 +func getList(data *C.pn_data_t, v interface{}) {
 +	pnType := C.pn_data_type(data)
 +	if pnType != C.PN_LIST {
 +		panic(newUnmarshalError(pnType, v))
 +	}
 +	count := int(C.pn_data_get_list(data))
 +	listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count)
 +	if bool(C.pn_data_enter(data)) {
 +		for i := 0; i < count; i++ {
 +			if bool(C.pn_data_next(data)) {
 +				val := reflect.New(listValue.Type().Elem())
 +				unmarshal(val.Interface(), data)
 +				listValue.Index(i).Set(val.Elem())
 +			}
 +		}
 +		C.pn_data_exit(data)
 +	}
 +	reflect.ValueOf(v).Elem().Set(listValue)
 +}
 +
 +// decode from bytes.
 +// Return bytes decoded or 0 if we could not decode a complete object.
 +//
 +func decode(data *C.pn_data_t, bytes []byte) (int, error) {
 +	if len(bytes) == 0 {
 +		return 0, nil
 +	}
 +	n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes)))
 +	if n == int(C.PN_UNDERFLOW) {
 +		C.pn_error_clear(C.pn_data_error(data))
 +		return 0, nil
 +	} else if n <= 0 {
 +		return 0, fmt.Errorf("unmarshal %s", PnErrorCode(n))
 +	}
 +	return n, nil
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[03/10] qpid-proton git commit: PROTON-1382: Remove bit fields initialization for bool fields

Posted by ac...@apache.org.
PROTON-1382: Remove bit fields initialization for bool fields

Signed-off-by: aboutros <ad...@live.com>


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/99616e6c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/99616e6c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/99616e6c

Branch: refs/heads/go1
Commit: 99616e6c25d9cd56d67f53cfd81f4149c47dc440
Parents: 98e26f6
Author: Adel Boutros <ad...@live.com>
Authored: Wed Jan 4 17:25:15 2017 +0100
Committer: Andrew Stitcher <as...@apache.org>
Committed: Tue Jan 17 02:28:24 2017 -0500

----------------------------------------------------------------------
 proton-c/src/core/event.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99616e6c/proton-c/src/core/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c
index 2a0a5cf..54fdee7 100644
--- a/proton-c/src/core/event.c
+++ b/proton-c/src/core/event.c
@@ -28,8 +28,8 @@ struct pn_collector_t {
   pn_list_t *pool;
   pn_event_t *head;
   pn_event_t *tail;
-  bool freed:1;
-  bool head_returned:1;         /* Head has been returned by pn_collector_next() */
+  bool freed;
+  bool head_returned;         /* Head has been returned by pn_collector_next() */
 };
 
 struct pn_event_t {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[08/10] qpid-proton git commit: NO-JIRA: Clean up marshal/unmarshal error handling

Posted by ac...@apache.org.
NO-JIRA: Clean up marshal/unmarshal error handling


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/eafd0810
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/eafd0810
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/eafd0810

Branch: refs/heads/go1
Commit: eafd08104e232c911686b05f419a3c970706a554
Parents: 0f156d7
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Jan 17 16:16:23 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Jan 17 19:43:58 2017 -0500

----------------------------------------------------------------------
 .../go/src/qpid.apache.org/amqp/marshal.go      | 41 ++++++++++++-----
 .../go/src/qpid.apache.org/amqp/unmarshal.go    | 46 ++++++++++++--------
 2 files changed, 59 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/eafd0810/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
index e3d4e10..b6adf90 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
@@ -29,12 +29,25 @@ import (
 	"unsafe"
 )
 
-func dataError(prefix string, data *C.pn_data_t) error {
-	err := PnError(C.pn_data_error(data))
-	if err != nil {
-		err = fmt.Errorf("%s: %s", prefix, err.Error())
+// Error returned if Go data cannot be marshaled as an AMQP type.
+type MarshalError struct {
+	// The Go type.
+	GoType reflect.Type
+	s      string
+}
+
+func (e MarshalError) Error() string { return e.s }
+
+func newMarshalError(v interface{}, s string) *MarshalError {
+	t := reflect.TypeOf(v)
+	return &MarshalError{GoType: t, s: fmt.Sprintf("cannot marshal %s: %s", t, s)}
+}
+
+func dataMarshalError(v interface{}, data *C.pn_data_t) error {
+	if pe := PnError(C.pn_data_error(data)); pe != nil {
+		return newMarshalError(v, pe.Error())
 	}
-	return err
+	return nil
 }
 
 /*
@@ -87,7 +100,16 @@ Described types.
 
 */
 func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
-	defer doRecover(&err)
+	defer func() {
+		if r := recover(); r != nil {
+			if merr, ok := r.(*MarshalError); ok {
+				err = merr
+			} else {
+				panic(r)
+			}
+		}
+	}()
+
 	data := C.pn_data(0)
 	defer C.pn_data_free(data)
 	marshal(v, data)
@@ -97,7 +119,7 @@ func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
 		case n == int(C.PN_OVERFLOW):
 			return buf, overflow
 		case n < 0:
-			return buf, dataError("marshal error", data)
+			return buf, dataMarshalError(v, data)
 		default:
 			return buf[:n], nil
 		}
@@ -189,11 +211,10 @@ func marshal(v interface{}, data *C.pn_data_t) {
 		case reflect.Slice:
 			putList(data, v)
 		default:
-			panic(fmt.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
+			panic(newMarshalError(v, "no conversion"))
 		}
 	}
-	err := dataError("marshal", data)
-	if err != nil {
+	if err := dataMarshalError(v, data); err != nil {
 		panic(err)
 	}
 	return

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/eafd0810/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
index 9b9cfd3..d56cbd2 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
@@ -38,28 +38,39 @@ type UnmarshalError struct {
 	AMQPType string
 	// The Go type.
 	GoType reflect.Type
-}
 
-func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
-	return &UnmarshalError{C.pn_type_t(pnType).String(), reflect.TypeOf(v)}
+	s string
 }
 
-func (e UnmarshalError) Error() string {
+func (e UnmarshalError) Error() string { return e.s }
+
+func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
+	e := &UnmarshalError{AMQPType: C.pn_type_t(pnType).String(), GoType: reflect.TypeOf(v)}
 	if e.GoType.Kind() != reflect.Ptr {
-		return fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType)
+		e.s = fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType)
 	} else {
-		return fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
+		e.s = fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
 	}
+	return e
 }
 
-func doRecover(err *error) {
-	r := recover()
-	switch r := r.(type) {
-	case nil:
-	case *UnmarshalError:
-		*err = r
-	default:
-		panic(r)
+func newUnmarshalErrorData(data *C.pn_data_t, v interface{}) *UnmarshalError {
+	err := PnError(C.pn_data_error(data))
+	if err == nil {
+		return nil
+	}
+	e := newUnmarshalError(C.pn_data_type(data), v)
+	e.s = e.s + ": " + err.Error()
+	return e
+}
+
+func recoverUnmarshal(err *error) {
+	if r := recover(); r != nil {
+		if uerr, ok := r.(*UnmarshalError); ok {
+			*err = uerr
+		} else {
+			panic(r)
+		}
 	}
 }
 
@@ -99,7 +110,7 @@ func (d *Decoder) Buffered() io.Reader {
 // See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
 //
 func (d *Decoder) Decode(v interface{}) (err error) {
-	defer doRecover(&err)
+	defer recoverUnmarshal(&err)
 	data := C.pn_data(0)
 	defer C.pn_data_free(data)
 	var n int
@@ -181,7 +192,7 @@ AMQP maps with mixed/unhashable key types need an alternate representation.
 Described types.
 */
 func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
-	defer doRecover(&err)
+	defer recoverUnmarshal(&err)
 
 	data := C.pn_data(0)
 	defer C.pn_data_free(data)
@@ -433,8 +444,7 @@ func unmarshal(v interface{}, data *C.pn_data_t) {
 			panic(newUnmarshalError(pnType, v))
 		}
 	}
-	err := dataError("unmarshaling", data)
-	if err != nil {
+	if err := newUnmarshalErrorData(data, v); err != nil {
 		panic(err)
 	}
 	return


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[07/10] qpid-proton git commit: PROTON-1390: Get rid of relative ../util.go import, simplify examples.

Posted by ac...@apache.org.
PROTON-1390: Get rid of relative ../util.go import, simplify examples.

gccgo does not support relative imports, simplify the examples to remove the need
for a common library.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0f156d72
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0f156d72
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0f156d72

Branch: refs/heads/go1
Commit: 0f156d721f56d265d1d36814f1334fd959ab0d09
Parents: 9fc393a
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Jan 17 19:12:55 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Jan 17 19:42:54 2017 -0500

----------------------------------------------------------------------
 examples/go/electron/broker.go  | 70 ++++++++++++++++++++++++++++--------
 examples/go/electron/receive.go | 56 +++++++++++++++--------------
 examples/go/electron/send.go    | 51 +++++++++++++-------------
 examples/go/proton/broker.go    | 69 +++++++++++++++++++++++++++--------
 examples/go/util/queue.go       | 61 -------------------------------
 examples/go/util/util.go        | 68 -----------------------------------
 6 files changed, 167 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index 2078d1e..9228195 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -27,7 +27,6 @@ under the License.
 package main
 
 import (
-	"../util"
 	"flag"
 	"fmt"
 	"log"
@@ -35,6 +34,7 @@ import (
 	"os"
 	"qpid.apache.org/amqp"
 	"qpid.apache.org/electron"
+	"sync"
 )
 
 // Usage and command-line flags
@@ -49,12 +49,19 @@ A simple broker-like demo. Queues are created automatically for sender or receiv
 var addr = flag.String("addr", ":amqp", "Listening address")
 var credit = flag.Int("credit", 100, "Receiver credit window")
 var qsize = flag.Int("qsize", 1000, "Max queue size")
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var debugf = func(format string, data ...interface{}) {} // Default no debugging output
 
 func main() {
 	flag.Usage = usage
 	flag.Parse()
+
+	if *debug {
+		debugf = func(format string, data ...interface{}) { log.Printf(format, data...) }
+	}
+
 	b := &broker{
-		queues:    util.MakeQueues(*qsize),
+		queues:    makeQueues(*qsize),
 		container: electron.NewContainer(fmt.Sprintf("broker[%v]", os.Getpid())),
 		acks:      make(chan electron.Outcome),
 		sent:      make(chan sentMessage),
@@ -66,7 +73,7 @@ func main() {
 
 // State for the broker
 type broker struct {
-	queues    util.Queues           // A collection of queues.
+	queues    queues                // A collection of queues.
 	container electron.Container    // electron.Container manages AMQP connections.
 	sent      chan sentMessage      // Channel to record sent messages.
 	acks      chan electron.Outcome // Channel to receive the Outcome of sent messages.
@@ -76,7 +83,7 @@ type broker struct {
 // If a message is rejected or not acknowledged due to a failure, we will put it back on the queue.
 type sentMessage struct {
 	m amqp.Message
-	q util.Queue
+	q queue
 }
 
 // run listens for incoming net.Conn connections and starts an electron.Connection for each one.
@@ -94,12 +101,12 @@ func (b *broker) run() error {
 	for {
 		c, err := b.container.Accept(listener)
 		if err != nil {
-			util.Debugf("Accept error: %v", err)
+			debugf("Accept error: %v", err)
 			continue
 		}
 		cc := &connection{b, c}
 		go cc.run() // Handle the connection
-		util.Debugf("Accepted %v", c)
+		debugf("Accepted %v", c)
 	}
 }
 
@@ -113,7 +120,7 @@ type connection struct {
 // and start goroutines to service them.
 func (c *connection) run() {
 	for in := range c.connection.Incoming() {
-		util.Debugf("incoming %v", in)
+		debugf("incoming %v", in)
 
 		switch in := in.(type) {
 
@@ -131,7 +138,7 @@ func (c *connection) run() {
 			in.Accept() // Accept sessions unconditionally
 		}
 	}
-	util.Debugf("incoming closed: %v", c.connection)
+	debugf("incoming closed: %v", c.connection)
 }
 
 // receiver receives messages and pushes to a queue.
@@ -139,11 +146,11 @@ func (c *connection) receiver(receiver electron.Receiver) {
 	q := c.broker.queues.Get(receiver.Target())
 	for {
 		if rm, err := receiver.Receive(); err == nil {
-			util.Debugf("%v: received %v", receiver, util.FormatMessage(rm.Message))
+			debugf("%v: received %v %#v", receiver, rm.Message)
 			q <- rm.Message
 			rm.Accept()
 		} else {
-			util.Debugf("%v error: %v", receiver, err)
+			debugf("%v error: %v", receiver, err)
 			break
 		}
 	}
@@ -154,13 +161,13 @@ func (c *connection) sender(sender electron.Sender) {
 	q := c.broker.queues.Get(sender.Source())
 	for {
 		if sender.Error() != nil {
-			util.Debugf("%v closed: %v", sender, sender.Error())
+			debugf("%v closed: %v", sender, sender.Error())
 			return
 		}
 		select {
 
 		case m := <-q:
-			util.Debugf("%v: sent %v", sender, util.FormatMessage(m))
+			debugf("%v: sent %#v", sender, m)
 			sm := sentMessage{m, q}
 			c.broker.sent <- sm                    // Record sent message
 			sender.SendAsync(m, c.broker.acks, sm) // Receive outcome on c.broker.acks with Value sm
@@ -191,9 +198,44 @@ func (b *broker) acknowledgements() {
 			delete(sentMap, sm)
 			if outcome.Status != electron.Accepted { // Error, release or rejection
 				sm.q.PutBack(sm.m) // Put the message back on the queue.
-				util.Debugf("message %v put back, status %v, error %v",
-					util.FormatMessage(sm.m), outcome.Status, outcome.Error)
+				debugf("message %#v put back, status %v, error %v", sm.m, outcome.Status, outcome.Error)
 			}
 		}
 	}
 }
+
+// Use a buffered channel as a very simple queue.
+type queue chan amqp.Message
+
+// Put a message back on the queue, does not block.
+func (q queue) PutBack(m amqp.Message) {
+	select {
+	case q <- m:
+	default:
+		// Not an efficient implementation but ensures we don't block the caller.
+		go func() { q <- m }()
+	}
+}
+
+// Concurrent-safe map of queues.
+type queues struct {
+	queueSize int
+	m         map[string]queue
+	lock      sync.Mutex
+}
+
+func makeQueues(queueSize int) queues {
+	return queues{queueSize: queueSize, m: make(map[string]queue)}
+}
+
+// Create a queue if not found.
+func (qs *queues) Get(name string) queue {
+	qs.lock.Lock()
+	defer qs.lock.Unlock()
+	q := qs.m[name]
+	if q == nil {
+		q = make(queue, qs.queueSize)
+		qs.m[name] = q
+	}
+	return q
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index 3bbe327..161e911 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -20,7 +20,6 @@ under the License.
 package main
 
 import (
-	"../util"
 	"flag"
 	"fmt"
 	"log"
@@ -39,11 +38,16 @@ Receive messages from all the listed URLs concurrently and print them.
 }
 
 var count = flag.Uint64("count", 1, "Stop after receiving this many messages.")
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var debugf = func(format string, data ...interface{}) {} // Default no debugging output
 
 func main() {
 	flag.Usage = usage
 	flag.Parse()
 
+	if *debug {
+		debugf = func(format string, data ...interface{}) { log.Printf(format, data...) }
+	}
 	urls := flag.Args() // Non-flag arguments are URLs to receive from
 	if len(urls) == 0 {
 		log.Println("No URL provided")
@@ -63,33 +67,31 @@ func main() {
 	// Start a goroutine to for each URL to receive messages and send them to the messages channel.
 	// main() receives and prints them.
 	for _, urlStr := range urls {
-		util.Debugf("Connecting to %s\n", urlStr)
+		debugf("Connecting to %s\n", urlStr)
 		go func(urlStr string) { // Start the goroutine
-
-			defer wait.Done()                 // Notify main() when this goroutine is done.
-			url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
-			util.ExitIf(err)
-
-			// Open a new connection
-			c, err := container.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
-			util.ExitIf(err)
-			connections <- c // Save connection so we can Close() when main() ends
-
-			// Create a Receiver using the path of the URL as the source address
-			r, err := c.Receiver(electron.Source(url.Path))
-			util.ExitIf(err)
-
-			// Loop receiving messages and sending them to the main() goroutine
-			for {
-				if rm, err := r.Receive(); err == nil {
-					rm.Accept()
-					messages <- rm.Message
-				} else if err == electron.Closed {
-					return
-				} else {
-					log.Fatalf("receive error %v: %v", urlStr, err)
+			defer wait.Done() // Notify main() when this goroutine is done.
+			var err error
+			if url, err := amqp.ParseURL(urlStr); err == nil {
+				if c, err := container.Dial("tcp", url.Host); err == nil {
+					connections <- c // Save connection so we can Close() when main() ends
+					if r, err := c.Receiver(electron.Source(url.Path)); err == nil {
+						// Loop receiving messages and sending them to the main() goroutine
+						for {
+							if rm, err := r.Receive(); err == nil {
+								rm.Accept()
+								messages <- rm.Message
+							} else if err == electron.Closed {
+								return
+							} else {
+								log.Fatal("receive error %v: %v", urlStr, err)
+							}
+						}
+					}
 				}
 			}
+			if err != nil {
+				log.Fatal(err)
+			}
 		}(urlStr)
 	}
 
@@ -99,7 +101,7 @@ func main() {
 	// print each message until the count is exceeded.
 	for i := uint64(0); i < *count; i++ {
 		m := <-messages
-		util.Debugf("%s\n", util.FormatMessage(m))
+		debugf("%#v\n", m)
 	}
 	fmt.Printf("Received %d messages\n", *count)
 
@@ -107,7 +109,7 @@ func main() {
 	// with electron.Closed.
 	for i := 0; i < len(urls); i++ {
 		c := <-connections
-		util.Debugf("close %s", c)
+		debugf("close %s", c)
 		c.Close(nil)
 	}
 	wait.Wait() // Wait for all goroutines to finish.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index 68b8b2e..9ab5f1c 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -20,7 +20,6 @@ under the License.
 package main
 
 import (
-	"../util"
 	"flag"
 	"fmt"
 	"log"
@@ -39,11 +38,17 @@ Send messages to each URL concurrently with body "<url-path>-<n>" where n is the
 }
 
 var count = flag.Int64("count", 1, "Send this may messages per address.")
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var Debugf = func(format string, data ...interface{}) {} // Default no debugging output
 
 func main() {
 	flag.Usage = usage
 	flag.Parse()
 
+	if *debug {
+		Debugf = func(format string, data ...interface{}) { log.Printf(format, data...) }
+	}
+
 	urls := flag.Args() // Non-flag arguments are URLs to receive from
 	if len(urls) == 0 {
 		log.Println("No URL provided")
@@ -61,41 +66,39 @@ func main() {
 
 	// Start a goroutine for each URL to send messages.
 	for _, urlStr := range urls {
-		util.Debugf("Connecting to %v\n", urlStr)
+		Debugf("Connecting to %v\n", urlStr)
 		go func(urlStr string) {
-
-			defer wait.Done()                 // Notify main() that this goroutine is done.
-			url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
-			util.ExitIf(err)
-
-			// Open a new connection
-			c, err := container.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
-			util.ExitIf(err)
-			connections <- c // Save connection so we can Close() when main() ends
-
-			// Create a Sender using the path of the URL as the AMQP address
-			s, err := c.Sender(electron.Target(url.Path))
-			util.ExitIf(err)
-
-			// Loop sending messages.
-			for i := int64(0); i < *count; i++ {
-				m := amqp.NewMessage()
-				body := fmt.Sprintf("%v-%v", url.Path, i)
-				m.Marshal(body)
-				s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan
+			defer wait.Done() // Notify main() when this goroutine is done.
+			var err error
+			if url, err := amqp.ParseURL(urlStr); err == nil {
+				if c, err := container.Dial("tcp", url.Host); err == nil {
+					connections <- c // Save connection so we can Close() when main() ends
+					if s, err := c.Sender(electron.Target(url.Path)); err == nil {
+						// Loop sending messages.
+						for i := int64(0); i < *count; i++ {
+							m := amqp.NewMessage()
+							body := fmt.Sprintf("%v-%v", url.Path, i)
+							m.Marshal(body)
+							s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan
+						}
+					}
+				}
+			}
+			if err != nil {
+				log.Fatal(err)
 			}
 		}(urlStr)
 	}
 
 	// Wait for all the acknowledgements
 	expect := int(*count) * len(urls)
-	util.Debugf("Started senders, expect %v acknowledgements\n", expect)
+	Debugf("Started senders, expect %v acknowledgements\n", expect)
 	for i := 0; i < expect; i++ {
 		out := <-sentChan // Outcome of async sends.
 		if out.Error != nil {
 			log.Fatalf("acknowledgement[%v] %v error: %v\n", i, out.Value, out.Error)
 		} else {
-			util.Debugf("acknowledgement[%v]  %v (%v)\n", i, out.Value, out.Status)
+			Debugf("acknowledgement[%v]  %v (%v)\n", i, out.Value, out.Status)
 		}
 	}
 	fmt.Printf("Received all %v acknowledgements\n", expect)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/proton/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go
index 8f0efda..8291ca9 100644
--- a/examples/go/proton/broker.go
+++ b/examples/go/proton/broker.go
@@ -30,7 +30,6 @@ under the License.
 package main
 
 import (
-	"../util"
 	"flag"
 	"fmt"
 	"log"
@@ -38,6 +37,7 @@ import (
 	"os"
 	"qpid.apache.org/amqp"
 	"qpid.apache.org/proton"
+	"sync"
 )
 
 // Usage and command-line flags
@@ -52,11 +52,16 @@ A simple broker-like demo. Queues are created automatically for sender or receiv
 var addr = flag.String("addr", ":amqp", "Listening address")
 var credit = flag.Int("credit", 100, "Receiver credit window")
 var qsize = flag.Int("qsize", 1000, "Max queue size")
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var debugf = func(format string, data ...interface{}) {} // Default no debugging output
 
 func main() {
 	flag.Usage = usage
 	flag.Parse()
-	b := &broker{util.MakeQueues(*qsize)}
+	if *debug {
+		debugf = func(format string, data ...interface{}) { log.Printf(format, data...) }
+	}
+	b := &broker{makeQueues(*qsize)}
 	if err := b.run(); err != nil {
 		log.Fatal(err)
 	}
@@ -64,7 +69,7 @@ func main() {
 
 // State for the broker
 type broker struct {
-	queues util.Queues
+	queues queues
 }
 
 // Listens for connections and starts a proton.Engine for each one.
@@ -78,7 +83,7 @@ func (b *broker) run() error {
 	for {
 		conn, err := listener.Accept()
 		if err != nil {
-			util.Debugf("Accept error: %v", err)
+			debugf("Accept error: %v", err)
 			continue
 		}
 		adapter := proton.NewMessagingAdapter(newHandler(&b.queues))
@@ -88,14 +93,14 @@ func (b *broker) run() error {
 		adapter.AutoAccept = false
 		engine, err := proton.NewEngine(conn, adapter)
 		if err != nil {
-			util.Debugf("Connection error: %v", err)
+			debugf("Connection error: %v", err)
 			continue
 		}
 		engine.Server() // Enable server-side protocol negotiation.
-		util.Debugf("Accepted connection %s", engine)
+		debugf("Accepted connection %s", engine)
 		go func() { // Start goroutine to run the engine event loop
 			engine.Run()
-			util.Debugf("Closed %s", engine)
+			debugf("Closed %s", engine)
 		}()
 	}
 }
@@ -105,13 +110,13 @@ func (b *broker) run() error {
 // all calls to the handler. We use channels to communicate between the handler
 // goroutine and other goroutines sending and receiving messages.
 type handler struct {
-	queues    *util.Queues
+	queues    *queues
 	receivers map[proton.Link]*receiver
 	senders   map[proton.Link]*sender
 	injecter  proton.Injecter
 }
 
-func newHandler(queues *util.Queues) *handler {
+func newHandler(queues *queues) *handler {
 	return &handler{
 		queues:    queues,
 		receivers: make(map[proton.Link]*receiver),
@@ -156,7 +161,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		}
 		// This will not block as AMQP credit is set to the buffer capacity.
 		r.buffer <- receivedMessage{e.Delivery(), m}
-		util.Debugf("link %s received %s", e.Link(), util.FormatMessage(m))
+		debugf("link %s received %#v", e.Link(), m)
 
 	case proton.MConnectionClosed, proton.MDisconnected:
 		for l, _ := range h.receivers {
@@ -187,11 +192,11 @@ func (h *handler) linkClosed(l proton.Link, err error) {
 // channels.
 type link struct {
 	l proton.Link
-	q util.Queue
+	q queue
 	h *handler
 }
 
-func makeLink(l proton.Link, q util.Queue, h *handler) link {
+func makeLink(l proton.Link, q queue, h *handler) link {
 	lnk := link{l: l, q: q, h: h}
 	return lnk
 }
@@ -280,7 +285,7 @@ func (s *sender) sendable() {
 // run runs in a separate goroutine. It monitors the queue for messages and injects
 // a function to send them when there is credit
 func (s *sender) run() {
-	var q util.Queue // q is nil initially as we have no credit.
+	var q queue // q is nil initially as we have no credit.
 	for {
 		select {
 		case _, ok := <-s.credit:
@@ -323,9 +328,45 @@ func (s *sender) sendOne(m amqp.Message) error {
 	delivery, err := s.l.Send(m)
 	if err == nil {
 		delivery.Settle() // Pre-settled, unreliable.
-		util.Debugf("link %s sent %s", s.l, util.FormatMessage(m))
+		debugf("link %s sent %#v", s.l, m)
 	} else {
 		s.q.PutBack(m) // Put the message back on the queue, don't block
 	}
 	return err
 }
+
+// Use a buffered channel as a very simple queue.
+type queue chan amqp.Message
+
+// Put a message back on the queue, does not block.
+func (q queue) PutBack(m amqp.Message) {
+	select {
+	case q <- m:
+	default:
+		// Not an efficient implementation but ensures we don't block the caller.
+		go func() { q <- m }()
+	}
+}
+
+// Concurrent-safe map of queues.
+type queues struct {
+	queueSize int
+	m         map[string]queue
+	lock      sync.Mutex
+}
+
+func makeQueues(queueSize int) queues {
+	return queues{queueSize: queueSize, m: make(map[string]queue)}
+}
+
+// Create a queue if not found.
+func (qs *queues) Get(name string) queue {
+	qs.lock.Lock()
+	defer qs.lock.Unlock()
+	q := qs.m[name]
+	if q == nil {
+		q = make(queue, qs.queueSize)
+		qs.m[name] = q
+	}
+	return q
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/util/queue.go
----------------------------------------------------------------------
diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go
deleted file mode 100644
index 2eaba72..0000000
--- a/examples/go/util/queue.go
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package util
-
-import (
-	"qpid.apache.org/amqp"
-	"sync"
-)
-
-// Use a buffered channel as a very simple queue.
-type Queue chan amqp.Message
-
-// Put a message back on the queue, does not block.
-func (q Queue) PutBack(m amqp.Message) {
-	select {
-	case q <- m:
-	default:
-		// Not an efficient implementation but ensures we don't block the caller.
-		go func() { q <- m }()
-	}
-}
-
-// Concurrent-safe map of queues.
-type Queues struct {
-	queueSize int
-	m         map[string]Queue
-	lock      sync.Mutex
-}
-
-func MakeQueues(queueSize int) Queues {
-	return Queues{queueSize: queueSize, m: make(map[string]Queue)}
-}
-
-// Create a queue if not found.
-func (qs *Queues) Get(name string) Queue {
-	qs.lock.Lock()
-	defer qs.lock.Unlock()
-	q := qs.m[name]
-	if q == nil {
-		q = make(Queue, qs.queueSize)
-		qs.m[name] = q
-	}
-	return q
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/util/util.go
----------------------------------------------------------------------
diff --git a/examples/go/util/util.go b/examples/go/util/util.go
deleted file mode 100644
index 20f2192..0000000
--- a/examples/go/util/util.go
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-// util contains utility types and functions to simplify parts of the example
-// code that are not related to the use of proton.
-package util
-
-import (
-	"flag"
-	"fmt"
-	"log"
-	"os"
-	"path"
-	"qpid.apache.org/amqp"
-)
-
-// Debug flag "-debug" enables debug output with Debugf
-var Debug = flag.Bool("debug", false, "Print detailed debug output")
-
-// Full flag "-full" enables full message output by FormatMessage
-var Full = flag.Bool("full", false, "Print full message not just body.")
-
-// Debugf logs debug messages if "-debug" flag is set.
-func Debugf(format string, data ...interface{}) {
-	if *Debug {
-		log.Printf(format, data...)
-	}
-}
-
-// Simple error handling for demo.
-func ExitIf(err error) {
-	if err != nil {
-		log.Fatal(err)
-	}
-}
-
-// FormatMessage formats a message as a string, just the body by default or
-// the full message (with properties etc.) if "-full" flag is set.
-func FormatMessage(m amqp.Message) string {
-	if *Full {
-		return fmt.Sprintf("%#v", m)
-	} else {
-		return fmt.Sprintf("%#v", m.Body())
-	}
-}
-
-// For example programs, use the program name as the log prefix.
-func init() {
-	log.SetFlags(0)
-	_, prog := path.Split(os.Args[0])
-	log.SetPrefix(fmt.Sprintf("%s(%d): ", prog, os.Getpid()))
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[02/10] qpid-proton git commit: PROTON-1312: fix memory leak on BlockingConnection.close()

Posted by ac...@apache.org.
PROTON-1312: fix memory leak on BlockingConnection.close()


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/98e26f69
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/98e26f69
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/98e26f69

Branch: refs/heads/go1
Commit: 98e26f69995e6628c4b6c97b1826a761d9168d8c
Parents: 431c00d
Author: Clifford Jansen <cl...@apache.org>
Authored: Wed Jan 11 14:20:14 2017 -0800
Committer: Clifford Jansen <cl...@apache.org>
Committed: Wed Jan 11 14:20:14 2017 -0800

----------------------------------------------------------------------
 proton-c/bindings/python/proton/__init__.py |  3 +++
 proton-c/bindings/python/proton/reactor.py  |  5 -----
 proton-c/bindings/python/proton/utils.py    | 14 ++++++++++++++
 3 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/98e26f69/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index cfac01e..2b354df 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -2581,6 +2581,9 @@ and SASL layers to identify the peer.
     """
     self._update_cond()
     pn_connection_close(self._impl)
+    if hasattr(self, '_session_policy'):
+      # break circular ref
+      del self._session_policy
 
   @property
   def state(self):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/98e26f69/proton-c/bindings/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py
index e4dab95..269ed9e 100644
--- a/proton-c/bindings/python/proton/reactor.py
+++ b/proton-c/bindings/python/proton/reactor.py
@@ -494,13 +494,8 @@ class SessionPerConnection(object):
     def session(self, connection):
         if not self._default_session:
             self._default_session = _create_session(connection)
-            self._default_session.context = self
         return self._default_session
 
-    def on_session_remote_close(self, event):
-        event.connection.close()
-        self._default_session = None
-
 class GlobalOverrides(object):
     """
     Internal handler that triggers the necessary socket connect for an

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/98e26f69/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 9cd7cf3..05ef80d 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -132,10 +132,15 @@ class BlockingReceiver(BlockingLink):
             raise LinkException("Failed to open receiver %s, source does not match" % self.link.name)
         if credit: receiver.flow(credit)
         self.fetcher = fetcher
+        self.container = connection.container
 
     def __del__(self):
         self.fetcher = None
+        # The next line causes a core dump if the Proton-C reactor finalizes
+        # first.  The self.container reference prevents reactor finalization
+        # until after it is set to None.
         self.link.handler = None
+        self.container = None
 
     def receive(self, timeout=False):
         if not self.fetcher:
@@ -222,12 +227,19 @@ class BlockingConnection(Handler):
             self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
 
     def close(self):
+        if not self.conn:
+            return
         self.conn.close()
         try:
             self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
                       msg="Closing connection")
         finally:
+            self.conn.free()
+            # For cleanup, reactor needs to process PN_CONNECTION_FINAL
+            # and all events with embedded contexts must be drained.
+            self.run() # will not block any more
             self.conn = None
+            self.container.global_handler = None # break circular ref: container to cadapter.on_error
             self.container = None
 
     def _is_closed(self):
@@ -236,6 +248,8 @@ class BlockingConnection(Handler):
     def run(self):
         """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
         while self.container.process(): pass
+        self.container.stop()
+        self.container.process()
 
     def wait(self, condition, timeout=False, msg=None):
         """Call process until condition() is true"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[04/10] qpid-proton git commit: NO-JIRA: update year in NOTICE file

Posted by ac...@apache.org.
NO-JIRA: update year in NOTICE file


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0cfa399d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0cfa399d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0cfa399d

Branch: refs/heads/go1
Commit: 0cfa399da7d7ceec6ed6413e355d58179dc8e27d
Parents: 99616e6
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue Jan 17 11:33:31 2017 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue Jan 17 11:33:40 2017 +0000

----------------------------------------------------------------------
 NOTICE | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0cfa399d/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 9f6952c..76534fc 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
 Apache Qpid Proton
-Copyright 2012-2016 The Apache Software Foundation
+Copyright 2012-2017 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[05/10] qpid-proton git commit: PROTON-1388: Make sure we switch to sasl encrypted input early enough - Previously if the server sent an outcome frame followed immediately by encrypted AMQP frames we wouldn't always switch to reading encrypted AMQP fra

Posted by ac...@apache.org.
PROTON-1388: Make sure we switch to sasl encrypted input early enough
- Previously if the server sent an outcome frame followed immediately by encrypted
  AMQP frames we wouldn't always switch to reading encrypted AMQP frames early enough.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/bbf8a6a3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/bbf8a6a3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/bbf8a6a3

Branch: refs/heads/go1
Commit: bbf8a6a3097c8e592b223d7fb8bca0b75dc6a2ef
Parents: 0cfa399
Author: Andrew Stitcher <as...@apache.org>
Authored: Tue Jan 17 16:23:22 2017 -0500
Committer: Andrew Stitcher <as...@apache.org>
Committed: Tue Jan 17 16:23:22 2017 -0500

----------------------------------------------------------------------
 proton-c/src/sasl/sasl.c | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bbf8a6a3/proton-c/src/sasl/sasl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c
index 69fb6b2..12b6ad0 100644
--- a/proton-c/src/sasl/sasl.c
+++ b/proton-c/src/sasl/sasl.c
@@ -120,7 +120,9 @@ static bool pni_sasl_is_final_input_state(pni_sasl_t *sasl)
 static bool pni_sasl_is_final_output_state(pni_sasl_t *sasl)
 {
   enum pni_sasl_state last_state = sasl->last_state;
-  return last_state==SASL_RECVED_OUTCOME_SUCCEED
+  enum pni_sasl_state desired_state = sasl->desired_state;
+  return (desired_state==SASL_RECVED_OUTCOME_SUCCEED && last_state>=SASL_POSTED_INIT)
+      || last_state==SASL_RECVED_OUTCOME_SUCCEED
       || last_state==SASL_RECVED_OUTCOME_FAIL
       || last_state==SASL_ERROR
       || last_state==SASL_POSTED_OUTCOME;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[06/10] qpid-proton git commit: PROTON-1390: Go fix use of unsafe.Sizeof(0)

Posted by ac...@apache.org.
PROTON-1390: Go fix use of unsafe.Sizeof(0)

Work around gccgo bug by using unsafe.Sizeof(int(0)). This is in any case better
as it makes the type clearer.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9fc393a4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9fc393a4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9fc393a4

Branch: refs/heads/go1
Commit: 9fc393a4584cafddfba3666e30f6d8762bae837f
Parents: bbf8a6a
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Jan 17 17:12:13 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Jan 17 19:41:56 2017 -0500

----------------------------------------------------------------------
 proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go   | 4 ++--
 proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fc393a4/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
index bce7323..e3d4e10 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
@@ -143,7 +143,7 @@ func marshal(v interface{}, data *C.pn_data_t) {
 	case int64:
 		C.pn_data_put_long(data, C.int64_t(v))
 	case int:
-		if unsafe.Sizeof(0) == 8 {
+		if unsafe.Sizeof(int(0)) == 8 {
 			C.pn_data_put_long(data, C.int64_t(v))
 		} else {
 			C.pn_data_put_int(data, C.int32_t(v))
@@ -157,7 +157,7 @@ func marshal(v interface{}, data *C.pn_data_t) {
 	case uint64:
 		C.pn_data_put_ulong(data, C.uint64_t(v))
 	case uint:
-		if unsafe.Sizeof(0) == 8 {
+		if unsafe.Sizeof(int(0)) == 8 {
 			C.pn_data_put_ulong(data, C.uint64_t(v))
 		} else {
 			C.pn_data_put_uint(data, C.uint32_t(v))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fc393a4/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
index 8f380a7..9b9cfd3 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
@@ -330,7 +330,7 @@ func unmarshal(v interface{}, data *C.pn_data_t) {
 		case C.PN_INT:
 			*v = int(C.pn_data_get_int(data))
 		case C.PN_LONG:
-			if unsafe.Sizeof(0) == 8 {
+			if unsafe.Sizeof(int(0)) == 8 {
 				*v = int(C.pn_data_get_long(data))
 			} else {
 				panic(newUnmarshalError(pnType, v))
@@ -350,7 +350,7 @@ func unmarshal(v interface{}, data *C.pn_data_t) {
 		case C.PN_UINT:
 			*v = uint(C.pn_data_get_uint(data))
 		case C.PN_ULONG:
-			if unsafe.Sizeof(0) == 8 {
+			if unsafe.Sizeof(int(0)) == 8 {
 				*v = uint(C.pn_data_get_ulong(data))
 			} else {
 				panic(newUnmarshalError(pnType, v))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[09/10] qpid-proton git commit: PROTON-1390: Fix message body output in examples

Posted by ac...@apache.org.
PROTON-1390: Fix message body output in examples

Print message body as %v, not messages as %#v to give readable example output.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/71e567d9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/71e567d9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/71e567d9

Branch: refs/heads/go1
Commit: 71e567d90d2a123bb323320e1c308a5e3180d26b
Parents: eafd081
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Jan 17 20:07:43 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Jan 17 20:07:43 2017 -0500

----------------------------------------------------------------------
 examples/go/electron/broker.go  | 6 +++---
 examples/go/electron/receive.go | 2 +-
 examples/go/electron/send.go    | 2 +-
 3 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/71e567d9/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index 9228195..ba06e89 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -146,7 +146,7 @@ func (c *connection) receiver(receiver electron.Receiver) {
 	q := c.broker.queues.Get(receiver.Target())
 	for {
 		if rm, err := receiver.Receive(); err == nil {
-			debugf("%v: received %v %#v", receiver, rm.Message)
+			debugf("%v: received %v", receiver, rm.Message.Body())
 			q <- rm.Message
 			rm.Accept()
 		} else {
@@ -167,7 +167,7 @@ func (c *connection) sender(sender electron.Sender) {
 		select {
 
 		case m := <-q:
-			debugf("%v: sent %#v", sender, m)
+			debugf("%v: sent %v", sender, m.Body())
 			sm := sentMessage{m, q}
 			c.broker.sent <- sm                    // Record sent message
 			sender.SendAsync(m, c.broker.acks, sm) // Receive outcome on c.broker.acks with Value sm
@@ -198,7 +198,7 @@ func (b *broker) acknowledgements() {
 			delete(sentMap, sm)
 			if outcome.Status != electron.Accepted { // Error, release or rejection
 				sm.q.PutBack(sm.m) // Put the message back on the queue.
-				debugf("message %#v put back, status %v, error %v", sm.m, outcome.Status, outcome.Error)
+				debugf("message %v put back, status %v, error %v", sm.m.Body(), outcome.Status, outcome.Error)
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/71e567d9/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index 161e911..37a8583 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -101,7 +101,7 @@ func main() {
 	// print each message until the count is exceeded.
 	for i := uint64(0); i < *count; i++ {
 		m := <-messages
-		debugf("%#v\n", m)
+		debugf("%v\n", m.Body())
 	}
 	fmt.Printf("Received %d messages\n", *count)
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/71e567d9/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index 9ab5f1c..f478f4b 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -77,7 +77,7 @@ func main() {
 						// Loop sending messages.
 						for i := int64(0); i < *count; i++ {
 							m := amqp.NewMessage()
-							body := fmt.Sprintf("%v-%v", url.Path, i)
+							body := fmt.Sprintf("%v%v", url.Path, i)
 							m.Marshal(body)
 							s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan
 						}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org