You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/02/11 04:04:57 UTC

svn commit: r1566974 - in /qpid/proton/trunk: proton-c/ proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/engine/ proton-c/src/transport/ proton-j/src/main/resources/ tests/python/proton_tests/

Author: rhs
Date: Tue Feb 11 03:04:54 2014
New Revision: 1566974

URL: http://svn.apache.org/r1566974
Log:
initial draft of event api

Added:
    qpid/proton/trunk/proton-c/src/engine/event.c
    qpid/proton/trunk/proton-c/src/engine/event.h
Modified:
    qpid/proton/trunk/proton-c/CMakeLists.txt
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/transport/transport.c
    qpid/proton/trunk/proton-j/src/main/resources/cengine.py
    qpid/proton/trunk/tests/python/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/CMakeLists.txt?rev=1566974&r1=1566973&r2=1566974&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/CMakeLists.txt Tue Feb 11 03:04:54 2014
@@ -250,6 +250,7 @@ set (qpid-proton-core
 
   src/dispatcher/dispatcher.c
   src/engine/engine.c
+  src/engine/event.c
   src/transport/transport.c
   src/message/message.c
   src/sasl/sasl.c

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1566974&r1=1566973&r2=1566974&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Tue Feb 11 03:04:54 2014
@@ -2160,6 +2160,9 @@ class Connection(Endpoint):
   def _get_remote_cond_impl(self):
     return pn_connection_remote_condition(self._conn)
 
+  def collect(self, collector):
+    pn_connection_collect(self._conn, collector._impl)
+
   def _get_container(self):
     return pn_connection_get_container(self._conn)
   def _set_container(self, name):
@@ -3072,6 +3075,57 @@ class SSLSessionDetails(object):
     return self._session_id
 
 
+class Collector:
+
+  def __init__(self):
+    self._impl = pn_collector()
+
+  def peek(self):
+    event = pn_collector_peek(self._impl)
+    if event is None:
+      return None
+
+    tpi = pn_event_transport(event)
+    if tpi:
+      tp = Transport(tpi)
+    else:
+      tp = None
+    return Event(type=pn_event_type(event),
+                 connection=wrap_connection(pn_event_connection(event)),
+                 session=wrap_session(pn_event_session(event)),
+                 link=wrap_link(pn_event_link(event)),
+                 delivery=wrap_delivery(pn_event_delivery(event)),
+                 transport=tp)
+
+  def pop(self):
+    pn_collector_pop(self._impl)
+
+  def __del__(self):
+    pn_collector_free(self._impl)
+
+class Event:
+
+  CONNECTION_STATE = PN_CONNECTION_STATE
+  SESSION_STATE = PN_SESSION_STATE
+  LINK_STATE = PN_LINK_STATE
+  LINK_FLOW = PN_LINK_FLOW
+  DELIVERY = PN_DELIVERY
+  TRANSPORT = PN_TRANSPORT
+
+  def __init__(self, type, connection, session, link, delivery, transport):
+    self.type = type
+    self.connection = connection
+    self.session = session
+    self.link = link
+    self.delivery = delivery
+    self.transport = transport
+
+  def __repr__(self):
+    objects = [self.connection, self.session, self.link, self.delivery,
+               self.transport]
+    return "%s(%s)" % (pn_event_type_name(self.type),
+                       ", ".join([str(o) for o in objects if o is not None]))
+
 ###
 # Driver
 ###
@@ -3208,6 +3262,7 @@ __all__ = [
            "SETTLED",
            "UNDESCRIBED",
            "Array",
+           "Collector",
            "Condition",
            "Connection",
            "Connector",
@@ -3218,6 +3273,7 @@ __all__ = [
            "Driver",
            "DriverException",
            "Endpoint",
+           "Event",
            "Link",
            "Listener",
            "Message",

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1566974&r1=1566973&r2=1566974&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue Feb 11 03:04:54 2014
@@ -41,13 +41,21 @@ extern "C" {
  * @todo
  */
 
-typedef struct pn_transport_t pn_transport_t;
+// top half
+typedef struct pn_container_t pn_container_t;
 typedef struct pn_connection_t pn_connection_t; /**< Connection */
 typedef struct pn_session_t pn_session_t;       /**< Session */
 typedef struct pn_link_t pn_link_t;             /**< Link */
 typedef struct pn_terminus_t pn_terminus_t;
 typedef struct pn_condition_t pn_condition_t;
 
+// bottom half
+typedef struct pn_transport_t pn_transport_t;
+
+// shared
+typedef struct pn_collector_t pn_collector_t;
+typedef struct pn_event_t pn_event_t;
+
 typedef enum {
   PN_UNSPECIFIED = 0,
   PN_SOURCE = 1,
@@ -127,6 +135,32 @@ typedef void (pn_tracer_t)(pn_transport_
 #define PN_TRACE_FRM (2)
 #define PN_TRACE_DRV (4)
 
+// event
+
+typedef enum {
+  PN_EVENT_NONE = 0,
+  PN_CONNECTION_STATE = 1,
+  PN_SESSION_STATE = 2,
+  PN_LINK_STATE = 4,
+  PN_LINK_FLOW = 8,
+  PN_DELIVERY = 16,
+  PN_TRANSPORT = 32
+} pn_event_type_t;
+
+PN_EXTERN const char *pn_event_type_name(pn_event_type_t type);
+
+PN_EXTERN pn_collector_t *pn_collector(void);
+PN_EXTERN void pn_collector_free(pn_collector_t *collector);
+PN_EXTERN pn_event_t *pn_collector_peek(pn_collector_t *collector);
+PN_EXTERN bool pn_collector_pop(pn_collector_t *collector);
+
+PN_EXTERN pn_event_type_t pn_event_type(pn_event_t *event);
+PN_EXTERN pn_connection_t *pn_event_connection(pn_event_t *event);
+PN_EXTERN pn_session_t *pn_event_session(pn_event_t *event);
+PN_EXTERN pn_link_t *pn_event_link(pn_event_t *event);
+PN_EXTERN pn_delivery_t *pn_event_delivery(pn_event_t *event);
+PN_EXTERN pn_transport_t *pn_event_transport(pn_event_t *event);
+
 // connection
 
 /** Factory to construct a new Connection.
@@ -135,6 +169,8 @@ typedef void (pn_tracer_t)(pn_transport_
  */
 PN_EXTERN pn_connection_t *pn_connection(void);
 
+PN_EXTERN void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector);
+
 /** Retrieve the state of the connection.
  *
  * @param[in] connection the connection

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1566974&r1=1566973&r2=1566974&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Feb 11 03:04:54 2014
@@ -191,6 +191,7 @@ struct pn_connection_t {
   pn_data_t *desired_capabilities;
   pn_data_t *properties;
   void *context;
+  pn_collector_t *collector;
 };
 
 struct pn_session_t {

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1566974&r1=1566973&r2=1566974&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Feb 11 03:04:54 2014
@@ -251,6 +251,8 @@ static void pn_connection_finalize(void 
 #define pn_connection_compare NULL
 #define pn_connection_inspect NULL
 
+#include "event.h"
+
 pn_connection_t *pn_connection()
 {
   static pn_class_t clazz = PN_CLASS(pn_connection);
@@ -274,10 +276,16 @@ pn_connection_t *pn_connection()
   conn->offered_capabilities = pn_data(16);
   conn->desired_capabilities = pn_data(16);
   conn->properties = pn_data(16);
+  conn->collector = NULL;
 
   return conn;
 }
 
+void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector)
+{
+  connection->collector = collector;
+}
+
 pn_state_t pn_connection_state(pn_connection_t *connection)
 {
   return connection ? connection->endpoint.state : 0;
@@ -455,6 +463,10 @@ void pn_modified(pn_connection_t *connec
     LL_ADD(connection, transport, endpoint);
     endpoint->modified = true;
   }
+  pn_event_t *event = pn_collector_put(connection->collector, PN_TRANSPORT);
+  if (event) {
+    pn_event_init_connection(event, connection);
+  }
 }
 
 void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)

Added: qpid/proton/trunk/proton-c/src/engine/event.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/event.c?rev=1566974&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/event.c (added)
+++ qpid/proton/trunk/proton-c/src/engine/event.c Tue Feb 11 03:04:54 2014
@@ -0,0 +1,264 @@
+#include <proton/engine.h>
+#include <assert.h>
+#include "engine-internal.h"
+
+struct pn_collector_t {
+  pn_event_t *head;
+  pn_event_t *tail;
+  pn_event_t *free_head;
+};
+
+struct pn_event_t {
+  pn_event_type_t type;
+  pn_connection_t *connection;
+  pn_session_t *session;
+  pn_link_t *link;
+  pn_delivery_t *delivery;
+  pn_transport_t *transport;
+  pn_event_t *next;
+};
+
+static void pn_collector_initialize(void *obj)
+{
+  pn_collector_t *collector = (pn_collector_t *) obj;
+  collector->head = NULL;
+  collector->tail = NULL;
+  collector->free_head = NULL;
+}
+
+static void pn_collector_finalize(void *obj)
+{
+  pn_collector_t *collector = (pn_collector_t *) obj;
+
+  while (pn_collector_peek(collector)) {
+    pn_collector_pop(collector);
+  }
+
+  assert(!collector->head);
+  assert(!collector->tail);
+
+  pn_event_t *event = collector->free_head;
+  while (event) {
+    pn_event_t *next = event->next;
+    pn_free(event);
+    event = next;
+  }
+}
+
+static int pn_collector_inspect(void *obj, pn_string_t *dst)
+{
+  assert(obj);
+  pn_collector_t *collector = (pn_collector_t *) obj;
+  int err = pn_string_addf(dst, "EVENTS[");
+  if (err) return err;
+  pn_event_t *event = collector->head;
+  bool first = true;
+  while (event) {
+    if (first) {
+      first = false;
+    } else {
+      err = pn_string_addf(dst, ", ");
+      if (err) return err;
+    }
+    err = pn_inspect(event, dst);
+    if (err) return err;
+    event = event->next;
+  }
+  return pn_string_addf(dst, "]");
+}
+
+#define pn_collector_hashcode NULL
+#define pn_collector_compare NULL
+
+pn_collector_t *pn_collector(void)
+{
+  static pn_class_t clazz = PN_CLASS(pn_collector);
+  pn_collector_t *collector = (pn_collector_t *) pn_new(sizeof(pn_collector_t), &clazz);
+  return collector;
+}
+
+void pn_collector_free(pn_collector_t *collector)
+{
+  pn_free(collector);
+}
+
+pn_event_t *pn_event(void);
+static void pn_event_initialize(void *obj);
+
+pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type)
+{
+  if (!collector) {
+    return NULL;
+  }
+
+  pn_event_t *event;
+
+  if (collector->free_head) {
+    event = collector->free_head;
+    collector->free_head = collector->free_head->next;
+    pn_event_initialize(event);
+  } else {
+    event = pn_event();
+  }
+
+  pn_event_t *tail = collector->tail;
+
+  if (tail) {
+    tail->next = event;
+    collector->tail = event;
+  } else {
+    collector->tail = event;
+    collector->head = event;
+  }
+
+  event->type = type;
+
+  return event;
+}
+
+pn_event_t *pn_collector_peek(pn_collector_t *collector)
+{
+  return collector->head;
+}
+
+bool pn_collector_pop(pn_collector_t *collector)
+{
+  pn_event_t *event = collector->head;
+  if (event) {
+    collector->head = event->next;
+  } else {
+    return false;
+  }
+
+  if (!collector->head) {
+    collector->tail = NULL;
+  }
+
+  event->next = collector->free_head;
+  collector->free_head = event;
+  return true;
+}
+
+static void pn_event_initialize(void *obj)
+{
+  pn_event_t *event = (pn_event_t *) obj;
+  event->type = PN_EVENT_NONE;
+  event->connection = NULL;
+  event->session = NULL;
+  event->link = NULL;
+  event->delivery = NULL;
+  event->transport = NULL;
+  event->next = NULL;
+}
+
+static void pn_event_finalize(void *obj) {}
+
+static int pn_event_inspect(void *obj, pn_string_t *dst)
+{
+  assert(obj);
+  pn_event_t *event = (pn_event_t *) obj;
+  int err = pn_string_addf(dst, "(%d", event->type);
+  void *objects[] = {event->connection, event->session, event->link,
+                     event->delivery, event->transport};
+  for (int i = 0; i < 5; i++) {
+    if (objects[i]) {
+      err = pn_string_addf(dst, ", ");
+      if (err) return err;
+      err = pn_inspect(objects[i], dst);
+      if (err) return err;
+    }
+  }
+
+  return pn_string_addf(dst, ")");
+}
+
+#define pn_event_hashcode NULL
+#define pn_event_compare NULL
+
+pn_event_t *pn_event(void)
+{
+  static pn_class_t clazz = PN_CLASS(pn_event);
+  pn_event_t *event = (pn_event_t *) pn_new(sizeof(pn_event_t), &clazz);
+  return event;
+}
+
+void pn_event_init_transport(pn_event_t *event, pn_transport_t *transport)
+{
+  event->transport = transport;
+}
+
+void pn_event_init_connection(pn_event_t *event, pn_connection_t *connection)
+{
+  event->connection = connection;
+  pn_event_init_transport(event, event->connection->transport);
+}
+
+void pn_event_init_session(pn_event_t *event, pn_session_t *session)
+{
+  event->session = session;
+  pn_event_init_connection(event, pn_session_connection(event->session));
+}
+
+void pn_event_init_link(pn_event_t *event, pn_link_t *link)
+{
+  event->link = link;
+  pn_event_init_session(event, pn_link_session(event->link));
+}
+
+void pn_event_init_delivery(pn_event_t *event, pn_delivery_t *delivery)
+{
+  event->delivery = delivery;
+  pn_event_init_link(event, pn_delivery_link(delivery));
+}
+
+pn_event_type_t pn_event_type(pn_event_t *event)
+{
+  return event->type;
+}
+
+pn_connection_t *pn_event_connection(pn_event_t *event)
+{
+  return event->connection;
+}
+
+pn_session_t *pn_event_session(pn_event_t *event)
+{
+  return event->session;
+}
+
+pn_link_t *pn_event_link(pn_event_t *event)
+{
+  return event->link;
+}
+
+pn_delivery_t *pn_event_delivery(pn_event_t *event)
+{
+  return event->delivery;
+}
+
+pn_transport_t *pn_event_transport(pn_event_t *event)
+{
+  return event->transport;
+}
+
+const char *pn_event_type_name(pn_event_type_t type)
+{
+  switch (type) {
+  case PN_EVENT_NONE:
+    return "PN_EVENT_NONE";
+  case PN_CONNECTION_STATE:
+    return "PN_CONNECTION_STATE";
+  case PN_SESSION_STATE:
+    return "PN_SESSION_STATE";
+  case PN_LINK_STATE:
+    return "PN_LINK_STATE";
+  case PN_LINK_FLOW:
+    return "PN_LINK_FLOW";
+  case PN_DELIVERY:
+    return "PN_DELIVERY";
+  case PN_TRANSPORT:
+    return "PN_TRANSPORT";
+  }
+
+  return "<unrecognized>";
+}

Added: qpid/proton/trunk/proton-c/src/engine/event.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/event.h?rev=1566974&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/event.h (added)
+++ qpid/proton/trunk/proton-c/src/engine/event.h Tue Feb 11 03:04:54 2014
@@ -0,0 +1,33 @@
+#ifndef _PROTON_EVENT_H
+#define _PROTON_EVENT_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type);
+
+void pn_event_init_transport(pn_event_t *event, pn_transport_t *transport);
+void pn_event_init_connection(pn_event_t *event, pn_connection_t *connection);
+void pn_event_init_session(pn_event_t *event, pn_session_t *session);
+void pn_event_init_link(pn_event_t *event, pn_link_t *link);
+void pn_event_init_delivery(pn_event_t *event, pn_delivery_t *delivery);
+
+#endif /* event.h */

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1566974&r1=1566973&r2=1566974&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Tue Feb 11 03:04:54 2014
@@ -29,6 +29,8 @@
 #include <stdarg.h>
 #include <stdio.h>
 
+#include "../engine/event.h"
+
 #include "../sasl/sasl-internal.h"
 #include "../ssl/ssl-internal.h"
 #include "../platform.h"
@@ -255,6 +257,10 @@ int pn_transport_bind(pn_transport_t *tr
   connection->transport = transport;
   if (transport->open_rcvd) {
     PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
+    pn_event_t *event = pn_collector_put(connection->collector, PN_CONNECTION_STATE);
+    if (event) {
+      pn_event_init_connection(event, connection);
+    }
     if (!pn_error_code(transport->error)) {
       transport->disp->halt = false;
       transport_consume(transport);        // blech - testBindAfterOpen
@@ -434,8 +440,14 @@ int pn_do_open(pn_dispatcher_t *disp)
   } else {
     transport->remote_hostname = NULL;
   }
+
   if (conn) {
     PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
+
+    pn_event_t *event = pn_collector_put(conn->collector, PN_CONNECTION_STATE);
+    if (event) {
+      pn_event_init_connection(event, conn);
+    }
   } else {
     transport->disp->halt = true;
   }
@@ -465,6 +477,11 @@ int pn_do_begin(pn_dispatcher_t *disp)
   pn_map_channel(transport, disp->channel, ssn);
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_ACTIVE);
 
+  pn_event_t *event = pn_collector_put(transport->connection->collector, PN_SESSION_STATE);
+  if (event) {
+    pn_event_init_session(event, ssn);
+  }
+
   return 0;
 }
 
@@ -634,6 +651,11 @@ int pn_do_attach(pn_dispatcher_t *disp)
     link->state.delivery_count = idc;
   }
 
+  pn_event_t *event = pn_collector_put(transport->connection->collector, PN_LINK_STATE);
+  if (event) {
+    pn_event_init_link(event, link);
+  }
+
   return 0;
 }
 
@@ -716,6 +738,11 @@ int pn_do_transfer(pn_dispatcher_t *disp
     pn_post_flow(transport, ssn, link);
   }
 
+  pn_event_t *event = pn_collector_put(transport->connection->collector, PN_DELIVERY);
+  if (event) {
+    pn_event_init_delivery(event, delivery);
+  }
+
   return 0;
 }
 
@@ -764,6 +791,11 @@ int pn_do_flow(pn_dispatcher_t *disp)
         link->drained += delta;
       }
     }
+
+    pn_event_t *event = pn_collector_put(transport->connection->collector, PN_LINK_FLOW);
+    if (event) {
+      pn_event_init_link(event, link);
+    }
   }
 
   return 0;
@@ -885,6 +917,10 @@ int pn_do_detach(pn_dispatcher_t *disp)
   if (closed)
   {
     PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED);
+    pn_event_t *event = pn_collector_put(transport->connection->collector, PN_LINK_STATE);
+    if (event) {
+      pn_event_init_link(event, link);
+    }
   } else {
     // TODO: implement
   }
@@ -900,6 +936,10 @@ int pn_do_end(pn_dispatcher_t *disp)
   if (err) return err;
   pn_unmap_channel(transport, ssn);
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED);
+  pn_event_t *event = pn_collector_put(transport->connection->collector, PN_SESSION_STATE);
+  if (event) {
+    pn_event_init_session(event, ssn);
+  }
   return 0;
 }
 
@@ -911,6 +951,10 @@ int pn_do_close(pn_dispatcher_t *disp)
   if (err) return err;
   transport->close_rcvd = true;
   PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED);
+  pn_event_t *event = pn_collector_put(transport->connection->collector, PN_LINK_STATE);
+  if (event) {
+    pn_event_init_connection(event, conn);
+  }
   return 0;
 }
 
@@ -1957,6 +2001,7 @@ int pn_transport_process(pn_transport_t 
   if (n == PN_EOS) {
     transport->tail_closed = true;
   }
+
   if (n < 0 && n != PN_EOS) return n;
   return 0;
 }

Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1566974&r1=1566973&r2=1566974&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Tue Feb 11 03:04:54 2014
@@ -918,3 +918,13 @@ def pn_transport_close_tail(trans):
 
 def pn_transport_error(trans):
   return trans.error
+
+PN_CONNECTION_STATE = "PN_CONNECTION_STATE"
+PN_SESSION_STATE = "PN_SESSION_STATE"
+PN_LINK_STATE = "PN_LINK_STATE"
+PN_LINK_FLOW = "PN_LINK_FLOW"
+PN_DELIVERY = "PN_DELIVERY"
+PN_TRANSPORT = "PN_TRANSPORT"
+
+def pn_collector():
+  raise Skipped()

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1566974&r1=1566973&r2=1566974&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Tue Feb 11 03:04:54 2014
@@ -1906,3 +1906,71 @@ class DeliveryTest(Test):
 
   def testCustom(self):
     self.testDisposition(type=0x12345, value=CustomValue([1, 2, 3]))
+
+class EventTest(Test):
+
+  def list(self, collector):
+    result = []
+    while True:
+      e = collector.peek()
+      if e:
+        result.append(e)
+        collector.pop()
+      else:
+        break
+    return result
+
+  def expect(self, collector, *types):
+    events = self.list(collector)
+    assert types == tuple([e.type for e in events]), (types, events)
+
+  def testEndpointEvents(self):
+    c1, c2 = self.connection()
+    coll = Collector()
+    c1.collect(coll)
+    self.expect(coll)
+    self.pump()
+    self.expect(coll)
+    c2.open()
+    self.pump()
+    self.expect(coll, Event.CONNECTION_STATE)
+    self.pump()
+    self.expect(coll)
+
+    ssn = c2.session()
+    snd = ssn.sender("sender")
+    ssn.open()
+    snd.open()
+
+    self.expect(coll)
+    self.pump()
+    self.expect(coll, Event.SESSION_STATE, Event.LINK_STATE)
+
+  def testFlowEvents(self):
+    snd, rcv = self.link("test-link")
+    coll = Collector()
+    snd.session.connection.collect(coll)
+    rcv.open()
+    rcv.flow(10)
+    self.pump()
+    self.expect(coll, Event.LINK_STATE, Event.LINK_FLOW)
+    rcv.flow(10)
+    self.pump()
+    self.expect(coll, Event.LINK_FLOW)
+
+  def testDeliveryEvents(self):
+    snd, rcv = self.link("test-link")
+    coll = Collector()
+    rcv.session.connection.collect(coll)
+    rcv.open()
+    rcv.flow(10)
+    self.pump()
+    self.expect(coll, Event.TRANSPORT, Event.TRANSPORT)
+    snd.delivery("delivery")
+    snd.send("Hello World!")
+    snd.advance()
+    self.pump()
+    self.expect(coll)
+    snd.open()
+    self.pump()
+    self.expect(coll, Event.LINK_STATE, Event.DELIVERY)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org