You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by fa...@apache.org on 2014/09/06 13:23:14 UTC

svn commit: r1622849 [7/9] - in /qpid/proton/branches/fadams-javascript-binding: ./ contrib/ contrib/proton-hawtdispatch/ contrib/proton-hawtdispatch/src/ contrib/proton-hawtdispatch/src/main/ contrib/proton-hawtdispatch/src/main/java/ contrib/proton-h...

Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/selector.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/selector.c?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/selector.c (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/selector.c Sat Sep  6 11:23:10 2014
@@ -19,21 +19,6 @@
  *
  */
 
-/*
- * Copy of posix poll-based selector with minimal changes to use
- * select().  TODO: fully native implementaton with I/O completion
- * ports.
- *
- * This implementation comments out the posix max_fds arg to select
- * which has no meaning on windows.  The number of fd_set slots are
- * configured at compile time via FD_SETSIZE, chosen "large enough"
- * for the limited scalability of select() at the expense of
- * 3*N*sizeof(unsigned int) bytes per driver instance.  select (and
- * associated macros like FD_ZERO) are otherwise unaffected
- * performance-wise by increasing FD_SETSIZE.
- */
-
-#define FD_SETSIZE 2048
 #ifndef _WIN32_WINNT
 #define _WIN32_WINNT 0x0501
 #endif
@@ -44,37 +29,53 @@
 #include <Ws2tcpip.h>
 #define PN_WINAPI
 
-#include "../platform.h"
+#include "platform.h"
+#include <proton/object.h>
 #include <proton/io.h>
 #include <proton/selector.h>
 #include <proton/error.h>
 #include <assert.h>
-#include "../selectable.h"
-#include "../util.h"
+#include "selectable.h"
+#include "util.h"
+#include "iocp.h"
+
+static void interests_update(iocpdesc_t *iocpd, int interests);
+static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t);
 
 struct pn_selector_t {
-  fd_set readfds;
-  fd_set writefds;
-  fd_set exceptfds;
+  iocp_t *iocp;
   pn_timestamp_t *deadlines;
   size_t capacity;
   pn_list_t *selectables;
+  pn_list_t *iocp_descriptors;
   pn_timestamp_t deadline;
   size_t current;
+  iocpdesc_t *current_triggered;
   pn_timestamp_t awoken;
   pn_error_t *error;
+  iocpdesc_t *triggered_list_head;
+  iocpdesc_t *triggered_list_tail;
+  iocpdesc_t *deadlines_head;
+  iocpdesc_t *deadlines_tail;
 };
 
 void pn_selector_initialize(void *obj)
 {
   pn_selector_t *selector = (pn_selector_t *) obj;
+  selector->iocp = NULL;
   selector->deadlines = NULL;
   selector->capacity = 0;
   selector->selectables = pn_list(0, 0);
+  selector->iocp_descriptors = pn_list(0, PN_REFCOUNT);
   selector->deadline = 0;
   selector->current = 0;
+  selector->current_triggered = NULL;
   selector->awoken = 0;
   selector->error = pn_error();
+  selector->triggered_list_head = NULL;
+  selector->triggered_list_tail = NULL;
+  selector->deadlines_head = NULL;
+  selector->deadlines_tail = NULL;
 }
 
 void pn_selector_finalize(void *obj)
@@ -82,28 +83,51 @@ void pn_selector_finalize(void *obj)
   pn_selector_t *selector = (pn_selector_t *) obj;
   free(selector->deadlines);
   pn_free(selector->selectables);
+  pn_free(selector->iocp_descriptors);
   pn_error_free(selector->error);
+  selector->iocp->selector = NULL;
 }
 
 #define pn_selector_hashcode NULL
 #define pn_selector_compare NULL
 #define pn_selector_inspect NULL
 
-pn_selector_t *pn_selector(void)
+pn_selector_t *pni_selector()
 {
-  static pn_class_t clazz = PN_CLASS(pn_selector);
+  static const pn_class_t clazz = PN_CLASS(pn_selector);
   pn_selector_t *selector = (pn_selector_t *) pn_new(sizeof(pn_selector_t), &clazz);
   return selector;
 }
 
+pn_selector_t *pni_selector_create(iocp_t *iocp)
+{
+  pn_selector_t *selector = pni_selector();
+  selector->iocp = iocp;
+  return selector;
+}
+
 void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
 {
   assert(selector);
   assert(selectable);
   assert(pni_selectable_get_index(selectable) < 0);
+  pn_socket_t sock = pn_selectable_fd(selectable);
+
+  iocpdesc_t *iocpd = NULL;
+  if (sock != INVALID_SOCKET) {
+    iocpd = pni_iocpdesc_map_get(selector->iocp, sock);
+    if (!iocpd) {
+      // Socket created outside proton.  Hook it up to iocp.
+      iocpd = pni_iocpdesc_create(selector->iocp, sock, true);
+      pni_iocpdesc_start(iocpd);
+    } else {
+      assert(iocpd->iocp == selector->iocp);
+    }
+  }
 
   if (pni_selectable_get_index(selectable) < 0) {
     pn_list_add(selector->selectables, selectable);
+    pn_list_add(selector->iocp_descriptors, iocpd);
     size_t size = pn_list_size(selector->selectables);
 
     if (selector->capacity < size) {
@@ -112,6 +136,10 @@ void pn_selector_add(pn_selector_t *sele
     }
 
     pni_selectable_set_index(selectable, size - 1);
+    if (iocpd) {
+      iocpd->selector = selector;
+      iocpd->selectable = selectable;
+    }
   }
 
   pn_selector_update(selector, selectable);
@@ -121,18 +149,22 @@ void pn_selector_update(pn_selector_t *s
 {
   int idx = pni_selectable_get_index(selectable);
   assert(idx >= 0);
- /*
-  selector->fds[idx].fd = pn_selectable_fd(selectable);
-  selector->fds[idx].events = 0;
-  selector->fds[idx].revents = 0;
-  if (pn_selectable_capacity(selectable) > 0) {
-    selector->fds[idx].events |= POLLIN;
-  }
-  if (pn_selectable_pending(selectable) > 0) {
-    selector->fds[idx].events |= POLLOUT;
-  }
- */
   selector->deadlines[idx] = pn_selectable_deadline(selectable);
+
+  pn_socket_t sock = pn_selectable_fd(selectable);
+  iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
+  if (iocpd) {
+    assert(sock == iocpd->socket || iocpd->closing);
+    int interests = 0;
+    if (pn_selectable_capacity(selectable) > 0) {
+      interests |= PN_READABLE;
+    }
+    if (pn_selectable_pending(selectable) > 0) {
+      interests |= PN_WRITABLE;
+    }
+    interests_update(iocpd, interests);
+    deadlines_update(iocpd, selector->deadlines[idx]);
+  }
 }
 
 void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable)
@@ -142,107 +174,94 @@ void pn_selector_remove(pn_selector_t *s
 
   int idx = pni_selectable_get_index(selectable);
   assert(idx >= 0);
+  iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
+  if (iocpd) {
+    if (selector->current_triggered == iocpd)
+      selector->current_triggered = iocpd->triggered_list_next;
+    interests_update(iocpd, 0);
+    deadlines_update(iocpd, 0);
+    assert(selector->triggered_list_head != iocpd && !iocpd->triggered_list_prev);
+    assert(selector->deadlines_head != iocpd && !iocpd->deadlines_prev);
+    iocpd->selector = NULL;
+    iocpd->selectable = NULL;
+  }
   pn_list_del(selector->selectables, idx, 1);
+  pn_list_del(selector->iocp_descriptors, idx, 1);
   size_t size = pn_list_size(selector->selectables);
   for (size_t i = idx; i < size; i++) {
     pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i);
     pni_selectable_set_index(sel, i);
   }
-
   pni_selectable_set_index(selectable, -1);
 }
 
 int pn_selector_select(pn_selector_t *selector, int timeout)
 {
   assert(selector);
-
-  FD_ZERO(&selector->readfds);
-  FD_ZERO(&selector->writefds);
-  FD_ZERO(&selector->exceptfds);
-
-  size_t size = pn_list_size(selector->selectables);
-  if (size > FD_SETSIZE) {
-    // This Windows limitation will go away when switching to completion ports
-    pn_error_set(selector->error, PN_ERR, "maximum sockets exceeded for Windows selector");
-    return PN_ERR;
-  }
+  pn_error_clear(selector->error);
+  pn_timestamp_t deadline = 0;
+  pn_timestamp_t now = pn_i_now();
 
   if (timeout) {
-    pn_timestamp_t deadline = 0;
-    for (size_t i = 0; i < size; i++) {
-      pn_timestamp_t d = selector->deadlines[i];
-      if (d)
-        deadline = (deadline == 0) ? d : pn_min(deadline, d);
-    }
-
-    if (deadline) {
-      pn_timestamp_t now = pn_i_now();
-      int delta = selector->deadline - now;
-      if (delta < 0) {
-        timeout = 0;
-      } else if (delta < timeout) {
-        timeout = delta;
-      }
-    }
-  }
-
-  struct timeval to = {0};
-  struct timeval *to_arg = &to;
-  // block only if (timeout == 0) and (closed_count == 0)
-  if (timeout > 0) {
-    // convert millisecs to sec and usec:
-    to.tv_sec = timeout/1000;
-    to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
-  }
-  else if (timeout < 0) {
-    to_arg = NULL;
+    if (selector->deadlines_head)
+      deadline = selector->deadlines_head->deadline;
   }
-
-  for (size_t i = 0; i < size; i++) {
-    pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i);
-    pn_socket_t fd = pn_selectable_fd(sel);
-    if (pn_selectable_capacity(sel) > 0) {
-      FD_SET(fd, &selector->readfds);
-    }
-    if (pn_selectable_pending(sel) > 0) {
-      FD_SET(fd, &selector->writefds);
+  if (deadline) {
+    int delta = deadline - now;
+    if (delta < 0) {
+      delta = 0;
+    } 
+    if (timeout < 0)
+      timeout = delta;
+    else if (timeout > delta)
+      timeout = delta;
+  }	
+  deadline = (timeout >= 0) ? now + timeout : 0;
+
+  // Process all currently available completions, even if matched events available
+  pni_iocp_drain_completions(selector->iocp);
+  pni_zombie_check(selector->iocp, now);
+  // Loop until an interested event is matched, or until deadline
+  while (true) {
+    if (selector->triggered_list_head)
+      break;
+    if (deadline && deadline <= now)
+      break;
+    pn_timestamp_t completion_deadline = deadline;
+    pn_timestamp_t zd = pni_zombie_deadline(selector->iocp);
+    if (zd)
+      completion_deadline = completion_deadline ? pn_min(zd, completion_deadline) : zd;
+
+    int completion_timeout = (!completion_deadline) ? -1 : completion_deadline - now;
+    int rv = pni_iocp_wait_one(selector->iocp, completion_timeout, selector->error);
+    if (rv < 0)
+      return pn_error_code(selector->error);
+
+    now = pn_i_now();
+    if (zd && zd <= now) {
+      pni_zombie_check(selector->iocp, now);
     }
   }
 
-  int result = select(0 /* ignored in win32 */, &selector->readfds, &selector->writefds, &selector->exceptfds, to_arg);
-  if (result == -1) {
-    pn_i_error_from_errno(selector->error, "select");
-  } else {
-    selector->current = 0;
-    selector->awoken = pn_i_now();
+  selector->current = 0;
+  selector->awoken = now;
+  selector->current_triggered = selector->triggered_list_head;
+  for (iocpdesc_t *iocpd = selector->deadlines_head; iocpd; iocpd = iocpd->deadlines_next) {
+    if (iocpd->deadline <= now)
+      pni_events_update(iocpd, iocpd->events | PN_EXPIRED);
+    else
+      break;
   }
-
   return pn_error_code(selector->error);
 }
 
 pn_selectable_t *pn_selector_next(pn_selector_t *selector, int *events)
 {
-  pn_list_t *l = selector->selectables;
-  size_t size = pn_list_size(l);
-  while (selector->current < size) {
-    pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(l, selector->current);
-    pn_timestamp_t deadline = selector->deadlines[selector->current];
-    int ev = 0;
-    pn_socket_t fd = pn_selectable_fd(sel);
-    if (FD_ISSET(fd, &selector->readfds)) {
-      ev |= PN_READABLE;
-    }
-    if (FD_ISSET(fd, &selector->writefds)) {
-      ev |= PN_WRITABLE;
-    }
-    if (deadline && selector->awoken >= deadline) {
-      ev |= PN_EXPIRED;
-    }
-    selector->current++;
-    if (ev) {
-      *events = ev;
-      return sel;
-    }
+  if (selector->current_triggered) {
+    iocpdesc_t *iocpd = selector->current_triggered;
+    *events = iocpd->interests & iocpd->events;
+    selector->current_triggered = iocpd->triggered_list_next;
+    return iocpd->selectable;
   }
   return NULL;
 }
@@ -252,3 +271,91 @@ void pn_selector_free(pn_selector_t *sel
   assert(selector);
   pn_free(selector);
 }
+
+
+static void triggered_list_add(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+  if (iocpd->triggered_list_prev || selector->triggered_list_head == iocpd)
+    return; // already in list
+  LL_ADD(selector, triggered_list, iocpd);
+}
+
+static void triggered_list_remove(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+  if (!iocpd->triggered_list_prev && selector->triggered_list_head != iocpd)
+    return; // not in list
+  LL_REMOVE(selector, triggered_list, iocpd);
+  iocpd->triggered_list_prev = NULL;
+  iocpd->triggered_list_next = NULL;
+}
+
+
+void pni_events_update(iocpdesc_t *iocpd, int events)
+{
+  int old_events = iocpd->events;
+  if (old_events == events)
+    return;
+  iocpd->events = events;
+  if (iocpd->selector) {
+    if (iocpd->events & iocpd->interests)
+      triggered_list_add(iocpd->selector, iocpd);
+    else
+      triggered_list_remove(iocpd->selector, iocpd);
+  }
+}
+
+static void interests_update(iocpdesc_t *iocpd, int interests)
+{
+  int old_interests = iocpd->interests;
+  if (old_interests == interests)
+    return;
+  iocpd->interests = interests;
+  if (iocpd->selector) {
+    if (iocpd->events & iocpd->interests)
+      triggered_list_add(iocpd->selector, iocpd);
+    else
+      triggered_list_remove(iocpd->selector, iocpd);
+  }
+}
+
+static void deadlines_remove(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+  if (!iocpd->deadlines_prev && selector->deadlines_head != iocpd)
+    return; // not in list
+  LL_REMOVE(selector, deadlines, iocpd);
+  iocpd->deadlines_prev = NULL;
+  iocpd->deadlines_next = NULL;
+}
+
+
+static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t deadline)
+{
+  if (deadline == iocpd->deadline)
+    return;
+  iocpd->deadline = deadline;
+  pn_selector_t *selector = iocpd->selector;
+  if (!deadline) {
+    deadlines_remove(selector, iocpd);
+    pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED);
+    interests_update(iocpd, iocpd->interests & ~PN_EXPIRED);
+  } else {
+    if (iocpd->deadlines_prev || selector->deadlines_head == iocpd) {
+      deadlines_remove(selector, iocpd);
+      pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED);
+    }
+    interests_update(iocpd, iocpd->interests | PN_EXPIRED);
+    iocpdesc_t *dl_iocpd = LL_HEAD(selector, deadlines);
+    while (dl_iocpd && dl_iocpd->deadline <= deadline)
+      dl_iocpd = dl_iocpd->deadlines_next;
+    if (dl_iocpd) {
+      // insert
+      iocpd->deadlines_prev = dl_iocpd->deadlines_prev;
+      iocpd->deadlines_next = dl_iocpd;
+      dl_iocpd->deadlines_prev = iocpd;
+      if (selector->deadlines_head == dl_iocpd)
+        selector->deadlines_head = iocpd;
+    } else {
+      LL_ADD(selector, deadlines, iocpd);  // append
+    }
+  }
+}

Added: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c (added)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c Sat Sep  6 11:23:10 2014
@@ -0,0 +1,312 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * A simple write buffer pool.  Each socket has a dedicated "primary"
+ * buffer and can borrow from a shared pool with limited size tuning.
+ * Could enhance e.g. with separate pools per network interface and fancier
+ * memory tuning based on interface speed, system resources, and
+ * number of connections, etc.
+ */
+
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <Ws2tcpip.h>
+#define PN_WINAPI
+
+#include "platform.h"
+#include <proton/object.h>
+#include <proton/io.h>
+#include <proton/selector.h>
+#include <proton/error.h>
+#include <assert.h>
+#include "selectable.h"
+#include "util.h"
+#include "iocp.h"
+
+// Max overlapped writes per socket
+#define IOCP_MAX_OWRITES 16
+// Write buffer size
+#define IOCP_WBUFSIZE 16384
+
+static void pipeline_log(const char *fmt, ...)
+{
+    va_list ap;
+    va_start(ap, fmt);
+    vfprintf(stderr, fmt, ap);
+    va_end(ap);
+    fflush(stderr);
+}
+
+void pni_shared_pool_create(iocp_t *iocp)
+{
+  // TODO: more pools (or larger one) when using multiple non-loopback interfaces
+  iocp->shared_pool_size = 16;
+  char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging
+  if (env) {
+    int sz = atoi(env);
+    if (sz >= 0 && sz < 256) {
+      iocp->shared_pool_size = sz;
+    }
+  }
+  iocp->loopback_bufsize = 0;
+  env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging
+  if (env) {
+    int sz = atoi(env);
+    if (sz >= 0 && sz <= 128 * 1024) {
+      iocp->loopback_bufsize = sz;
+    }
+  }
+
+  if (iocp->shared_pool_size) {
+    iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
+    HRESULT status = GetLastError();
+    if (!iocp->shared_pool_memory) {
+      perror("Proton write buffer pool allocation failure\n");
+      iocp->shared_pool_size = 0;
+      iocp->shared_available_count = 0;
+      return;
+    }
+
+    iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+    iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+    iocp->shared_available_count = iocp->shared_pool_size;
+    char *mem = iocp->shared_pool_memory;
+    for (int i = 0; i < iocp->shared_pool_size; i++) {
+      iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE);
+      mem += IOCP_WBUFSIZE;
+    }
+  }
+}
+
+void pni_shared_pool_free(iocp_t *iocp)
+{
+  for (int i = 0; i < iocp->shared_pool_size; i++) {
+    write_result_t *result = iocp->shared_results[i];
+    if (result->in_use)
+      pipeline_log("Proton buffer pool leak\n");
+    else
+      free(result);
+  }
+  if (iocp->shared_pool_size) {
+    free(iocp->shared_results);
+    free(iocp->available_results);
+    if (iocp->shared_pool_memory) {
+      if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) {
+        perror("write buffers release failed");
+      }
+      iocp->shared_pool_memory = NULL;
+    }
+  }
+}
+
+static void shared_pool_push(write_result_t *result)
+{
+  iocp_t *iocp = result->base.iocpd->iocp;
+  assert(iocp->shared_available_count < iocp->shared_pool_size);
+  iocp->available_results[iocp->shared_available_count++] = result;
+}
+
+static write_result_t *shared_pool_pop(iocp_t *iocp)
+{
+  return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL;
+}
+
+struct write_pipeline_t {
+  iocpdesc_t *iocpd;
+  size_t pending_count;
+  write_result_t *primary;
+  size_t reserved_count;
+  size_t next_primary_index;
+  size_t depth;
+  bool is_writer;
+};
+
+#define write_pipeline_compare NULL
+#define write_pipeline_inspect NULL
+#define write_pipeline_hashcode NULL
+
+static void write_pipeline_initialize(void *object)
+{
+  write_pipeline_t *pl = (write_pipeline_t *) object;
+  pl->pending_count = 0;
+  const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE);
+  pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE);
+  pl->depth = 0;
+  pl->is_writer = false;
+}
+
+static void write_pipeline_finalize(void *object)
+{
+  write_pipeline_t *pl = (write_pipeline_t *) object;
+  free((void *)pl->primary->buffer.start);
+  free(pl->primary);
+}
+
+write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd)
+{
+  static const pn_class_t clazz = PN_CLASS(write_pipeline);
+  write_pipeline_t *pipeline = (write_pipeline_t *) pn_new(sizeof(write_pipeline_t), &clazz);
+  pipeline->iocpd = iocpd;
+  pipeline->primary->base.iocpd = iocpd;
+  return pipeline;
+}
+
+static void confirm_as_writer(write_pipeline_t *pl)
+{
+  if (!pl->is_writer) {
+    iocp_t *iocp = pl->iocpd->iocp;
+    iocp->writer_count++;
+    pl->is_writer = true;
+  }
+}
+
+static void remove_as_writer(write_pipeline_t *pl)
+{
+  if (!pl->is_writer)
+    return;
+  iocp_t *iocp = pl->iocpd->iocp;
+  assert(iocp->writer_count);
+  pl->is_writer = false;
+  iocp->writer_count--;
+}
+
+/*
+ * Optimal depth will depend on properties of the NIC, server, and driver.  For now,
+ * just distinguish between loopback interfaces and the rest.  Optimizations in the
+ * loopback stack allow decent performance with depth 1 and actually cause major
+ * performance hiccups if set to large values.
+ */
+static void set_depth(write_pipeline_t *pl)
+{
+  pl->depth = 1;
+  sockaddr_storage sa;
+  socklen_t salen = sizeof(sa);
+  char buf[INET6_ADDRSTRLEN];
+  DWORD buflen = sizeof(buf);
+
+  if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 &&
+      getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) {
+    if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) ||
+        (sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) {
+      // not loopback
+      pl->depth = IOCP_MAX_OWRITES;
+    } else {
+      iocp_t *iocp = pl->iocpd->iocp;
+      if (iocp->loopback_bufsize) {
+        const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize);
+        if (p) {
+          pl->primary->buffer.start = p;
+          pl->primary->buffer.size = iocp->loopback_bufsize;
+        }
+      }
+    }
+  }
+}
+
+// Reserve as many buffers as possible for count bytes.
+size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count)
+{
+  if (pl->primary->in_use)
+    return 0;  // I.e. io->wouldblock
+  if (!pl->depth)
+    set_depth(pl);
+  if (pl->depth == 1) {
+    // always use the primary
+    pl->reserved_count = 1;
+    pl->next_primary_index = 0;
+    return 1;
+  }
+
+  iocp_t *iocp = pl->iocpd->iocp;
+  confirm_as_writer(pl);
+  int wanted = (count / IOCP_WBUFSIZE);
+  if (count % IOCP_WBUFSIZE)
+    wanted++;
+  size_t pending = pl->pending_count;
+  assert(pending < pl->depth);
+  int bufs = pn_min(wanted, pl->depth - pending);
+  // Can draw from shared pool or the primary... but share with others.
+  size_t writers = iocp->writer_count;
+  int shared_count = (iocp->shared_available_count + writers - 1) / writers;
+  bufs = pn_min(bufs, shared_count + 1);
+  pl->reserved_count = pending + bufs;
+
+  if (bufs == wanted &&
+      pl->reserved_count < (pl->depth / 2) &&
+      iocp->shared_available_count > (2 * writers + bufs)) {
+    // No shortage: keep the primary as spare for future use
+    pl->next_primary_index = pl->reserved_count;
+  } else if (bufs == 1) {
+    pl->next_primary_index = pending;
+  } else {
+    // let approx 1/3 drain before replenishing
+    pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1;
+    if (pl->next_primary_index < pending)
+      pl->next_primary_index = pending;
+  }
+  return bufs;
+}
+
+write_result_t *pni_write_pipeline_next(write_pipeline_t *pl)
+{
+  size_t sz = pl->pending_count;
+  if (sz >= pl->reserved_count)
+    return NULL;
+  write_result_t *result;
+  if (sz == pl->next_primary_index) {
+    result = pl->primary;
+  } else {
+    assert(pl->iocpd->iocp->shared_available_count > 0);
+    result = shared_pool_pop(pl->iocpd->iocp);
+  }
+
+  result->in_use = true;
+  pl->pending_count++;
+  return result;
+}
+
+void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result)
+{
+  result->in_use = false;
+  pl->pending_count--;
+  pl->reserved_count = 0;
+  if (result != pl->primary)
+    shared_pool_push(result);
+  if (pl->pending_count == 0)
+    remove_as_writer(pl);
+}
+
+bool pni_write_pipeline_writable(write_pipeline_t *pl)
+{
+  // Only writable if not full and we can guarantee a buffer:
+  return pl->pending_count < pl->depth && !pl->primary->in_use;
+}
+
+size_t pni_write_pipeline_size(write_pipeline_t *pl)
+{
+  return pl->pending_count;
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/Proton.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/Proton.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/Proton.java Sat Sep  6 11:23:10 2014
@@ -30,87 +30,58 @@ import org.apache.qpid.proton.amqp.messa
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.Codec;
 import org.apache.qpid.proton.codec.Data;
-import org.apache.qpid.proton.codec.DataFactory;
 import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.driver.DriverFactory;
+import org.apache.qpid.proton.engine.Engine;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.EngineFactory;
 import org.apache.qpid.proton.engine.SslDomain;
 import org.apache.qpid.proton.engine.SslPeerDetails;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.MessageFactory;
 import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.MessengerFactory;
-
-import org.apache.qpid.proton.engine.impl.CollectorImpl;
 
 public final class Proton
 {
 
-    public static ProtonFactory.ImplementationType ANY = ProtonFactory.ImplementationType.ANY;
-    public static ProtonFactory.ImplementationType PROTON_C = ProtonFactory.ImplementationType.PROTON_C;
-    public static ProtonFactory.ImplementationType PROTON_J = ProtonFactory.ImplementationType.PROTON_J;
-
-
-    private static final MessengerFactory MESSENGER_FACTORY =
-            (new ProtonFactoryLoader<MessengerFactory>(MessengerFactory.class)).loadFactory();
-    private static final DriverFactory DRIVER_FACTORY =
-            (new ProtonFactoryLoader<DriverFactory>(DriverFactory.class)).loadFactory();
-    private static final MessageFactory MESSAGE_FACTORY =
-            (new ProtonFactoryLoader<MessageFactory>(MessageFactory.class)).loadFactory();
-    private static final DataFactory DATA_FACTORY =
-            (new ProtonFactoryLoader<DataFactory>(DataFactory.class)).loadFactory();
-    private static final EngineFactory ENGINE_FACTORY =
-            (new ProtonFactoryLoader<EngineFactory>(EngineFactory.class)).loadFactory();
-
-    private static final ProtonFactory.ImplementationType DEFAULT_IMPLEMENTATION =
-            ProtonFactoryLoader.getImpliedImplementationType();
-
     private Proton()
     {
     }
 
-    public static ProtonFactory.ImplementationType getDefaultImplementationType()
-    {
-        return DEFAULT_IMPLEMENTATION;
-    }
-
     public static Collector collector()
     {
-        return new CollectorImpl();
+        return Engine.collector();
     }
 
     public static Connection connection()
     {
-        return ENGINE_FACTORY.createConnection();
+        return Engine.connection();
     }
 
     public static Transport transport()
     {
-        return ENGINE_FACTORY.createTransport();
+        return Engine.transport();
     }
 
     public static SslDomain sslDomain()
     {
-        return ENGINE_FACTORY.createSslDomain();
+        return Engine.sslDomain();
     }
 
     public static SslPeerDetails sslPeerDetails(String hostname, int port)
     {
-        return ENGINE_FACTORY.createSslPeerDetails(hostname, port);
+        return Engine.sslPeerDetails(hostname, port);
     }
 
     public static Data data(long capacity)
     {
-        return DATA_FACTORY.createData(capacity);
+        return Codec.data(capacity);
     }
 
     public static Message message()
     {
-        return MESSAGE_FACTORY.createMessage();
+        return Message.Factory.create();
     }
 
     public static Message message(Header header,
@@ -118,134 +89,25 @@ public final class Proton
                       Properties properties, ApplicationProperties applicationProperties,
                       Section body, Footer footer)
     {
-        return MESSAGE_FACTORY.createMessage(header, deliveryAnnotations,
-                                             messageAnnotations, properties,
-                                             applicationProperties, body, footer);
+        return Message.Factory.create(header, deliveryAnnotations,
+                                      messageAnnotations, properties,
+                                      applicationProperties, body, footer);
     }
 
 
     public static Messenger messenger()
     {
-        return MESSENGER_FACTORY.createMessenger();
+        return Messenger.Factory.create();
     }
 
     public static Messenger messenger(String name)
     {
-        return MESSENGER_FACTORY.createMessenger(name);
+        return Messenger.Factory.create(name);
     }
 
     public static Driver driver() throws IOException
     {
-        return DRIVER_FACTORY.createDriver();
-    }
-
-
-
-    public static Connection connection(ProtonFactory.ImplementationType implementation)
-    {
-        return getEngineFactory(implementation).createConnection();
-    }
-
-    public static Transport transport(ProtonFactory.ImplementationType implementation)
-    {
-        return getEngineFactory(implementation).createTransport();
-    }
-
-    public static SslDomain sslDomain(ProtonFactory.ImplementationType implementation)
-    {
-        return getEngineFactory(implementation).createSslDomain();
-    }
-
-    public static SslPeerDetails sslPeerDetails(ProtonFactory.ImplementationType implementation, String hostname, int port)
-    {
-        return getEngineFactory(implementation).createSslPeerDetails(hostname, port);
-    }
-
-    public static Data data(ProtonFactory.ImplementationType implementation, long capacity)
-    {
-        return getDataFactory(implementation).createData(capacity);
-    }
-
-    public static Message message(ProtonFactory.ImplementationType implementation)
-    {
-        return getMessageFactory(implementation).createMessage();
-    }
-
-    public static Message message(ProtonFactory.ImplementationType implementation, Header header,
-                      DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations,
-                      Properties properties, ApplicationProperties applicationProperties,
-                      Section body, Footer footer)
-    {
-        return getMessageFactory(implementation).createMessage(header, deliveryAnnotations,
-                                                               messageAnnotations, properties,
-                                                               applicationProperties, body, footer);
+        return Driver.Factory.create();
     }
 
-
-    public static Messenger messenger(ProtonFactory.ImplementationType implementation)
-    {
-        return getMessengerFactory(implementation).createMessenger();
-    }
-
-    public static Messenger messenger(ProtonFactory.ImplementationType implementation, String name)
-    {
-        return getMessengerFactory(implementation).createMessenger(name);
-    }
-
-    public static Driver driver(ProtonFactory.ImplementationType implementation) throws IOException
-    {
-        return getDriverFactory(implementation).createDriver();
-    }
-
-
-    private static final ConcurrentMap<ProtonFactory.ImplementationType, EngineFactory> _engineFactories =
-            new ConcurrentHashMap<ProtonFactory.ImplementationType, EngineFactory>();
-    private static final ConcurrentMap<ProtonFactory.ImplementationType, MessageFactory> _messageFactories =
-            new ConcurrentHashMap<ProtonFactory.ImplementationType, MessageFactory>();
-    private static final ConcurrentMap<ProtonFactory.ImplementationType, MessengerFactory> _messengerFactories =
-                new ConcurrentHashMap<ProtonFactory.ImplementationType, MessengerFactory>();
-    private static final ConcurrentMap<ProtonFactory.ImplementationType, DataFactory> _dataFactories =
-                new ConcurrentHashMap<ProtonFactory.ImplementationType, DataFactory>();
-    private static final ConcurrentMap<ProtonFactory.ImplementationType, DriverFactory> _driverFactories =
-                new ConcurrentHashMap<ProtonFactory.ImplementationType, DriverFactory>();
-
-    private static EngineFactory getEngineFactory(ProtonFactory.ImplementationType implementation)
-    {
-        return getFactory(EngineFactory.class, implementation, _engineFactories);
-    }
-
-    private static MessageFactory getMessageFactory(ProtonFactory.ImplementationType implementation)
-    {
-        return getFactory(MessageFactory.class, implementation, _messageFactories);
-    }
-
-    private static MessengerFactory getMessengerFactory(ProtonFactory.ImplementationType implementation)
-    {
-        return getFactory(MessengerFactory.class, implementation, _messengerFactories);
-    }
-
-    private static DriverFactory getDriverFactory(ProtonFactory.ImplementationType implementation)
-    {
-        return getFactory(DriverFactory.class, implementation, _driverFactories);
-    }
-
-    private static DataFactory getDataFactory(ProtonFactory.ImplementationType implementation)
-    {
-        return getFactory(DataFactory.class, implementation, _dataFactories);
-    }
-
-    private static <T extends ProtonFactory>  T getFactory(Class<T> factoryClass, ProtonFactory.ImplementationType implementation,
-                                                           ConcurrentMap<ProtonFactory.ImplementationType, T> factories)
-    {
-        T factory = factories.get(implementation);
-        if(factory == null)
-        {
-            factories.putIfAbsent(implementation, (new ProtonFactoryLoader<T>(factoryClass,implementation)).loadFactory());
-            factory = factories.get(implementation);
-
-        }
-        return factory;
-    }
-
-
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java Sat Sep  6 11:23:10 2014
@@ -25,17 +25,18 @@ package org.apache.qpid.proton.amqp.mess
 
 import java.util.Map;
 
-public final class DeliveryAnnotations
-      implements Section
+import org.apache.qpid.proton.amqp.Symbol;
+
+public final class DeliveryAnnotations implements Section
 {
-    private final Map _value;
+    private final Map<Symbol, Object> _value;
 
-    public DeliveryAnnotations(Map value)
+    public DeliveryAnnotations(Map<Symbol, Object> value)
     {
         _value = value;
     }
 
-    public Map getValue()
+    public Map<Symbol, Object> getValue()
     {
         return _value;
     }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java Sat Sep  6 11:23:10 2014
@@ -23,20 +23,22 @@
 
 package org.apache.qpid.proton.amqp.messaging;
 
+import org.apache.qpid.proton.amqp.Symbol;
+
 import java.util.Map;
 
 
 public final class MessageAnnotations implements Section
 {
 
-    private final Map _value;
+    private final Map<Symbol, Object> _value;
 
-    public MessageAnnotations(Map value)
+    public MessageAnnotations(Map<Symbol, Object> value)
     {
         _value = value;
     }
 
-    public Map getValue()
+    public Map<Symbol, Object> getValue()
     {
         return _value;
     }

Added: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.codec;
+
+/**
+ * Codec
+ *
+ */
+
+public final class Codec
+{
+
+    private Codec()
+    {
+    }
+
+    public static Data data(long capacity)
+    {
+        return Data.Factory.create();
+    }
+
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java Sat Sep  6 11:23:10 2014
@@ -37,9 +37,19 @@ import org.apache.qpid.proton.amqp.Unsig
 import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.apache.qpid.proton.amqp.UnsignedShort;
 
+import org.apache.qpid.proton.codec.impl.DataImpl;
+
 public interface Data
 {
 
+    public static final class Factory {
+
+        public static Data create() {
+            return new DataImpl();
+        }
+
+    }
+
 
     enum DataType
     {

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java Sat Sep  6 11:23:10 2014
@@ -21,8 +21,10 @@
 
 package org.apache.qpid.proton.driver;
 
+import java.io.IOException;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.ServerSocketChannel;
+import org.apache.qpid.proton.driver.impl.DriverImpl;
 
 /**
  * A driver for the proton engine.
@@ -40,6 +42,14 @@ import java.nio.channels.ServerSocketCha
  */
 public interface Driver
 {
+
+    public static final class Factory
+    {
+        public static Driver create() throws IOException {
+            return new DriverImpl();
+        }
+    }
+
     /**
      * Force {@link #doWait(long)} to return.
      *

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java Sat Sep  6 11:23:10 2014
@@ -32,7 +32,6 @@ import org.apache.qpid.proton.engine.Con
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.TransportException;
-import org.apache.qpid.proton.engine.impl.TransportFactory;
 
 class ConnectorImpl<C> implements Connector<C>
 {

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java Sat Sep  6 11:23:10 2014
@@ -53,7 +53,7 @@ public class DriverImpl implements Drive
     private Queue<ConnectorImpl> _selectedConnectors = new ArrayDeque<ConnectorImpl>();
     private Queue<ListenerImpl> _selectedListeners = new ArrayDeque<ListenerImpl>();
 
-    DriverImpl() throws IOException
+    public DriverImpl() throws IOException
     {
         _selector = Selector.open();
     }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java Sat Sep  6 11:23:10 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.proton.engine;
 
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
 
 /**
  * Collector
@@ -29,6 +30,13 @@ package org.apache.qpid.proton.engine;
 public interface Collector
 {
 
+    public static final class Factory
+    {
+        public static Collector create() {
+            return new CollectorImpl();
+        }
+    }
+
     Event peek();
 
     void pop();

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java Sat Sep  6 11:23:10 2014
@@ -24,6 +24,8 @@ import java.util.EnumSet;
 import java.util.Map;
 import org.apache.qpid.proton.amqp.Symbol;
 
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+
 
 /**
  * Maintains lists of sessions, links and deliveries in a state
@@ -36,6 +38,13 @@ import org.apache.qpid.proton.amqp.Symbo
 public interface Connection extends Endpoint
 {
 
+    public static final class Factory
+    {
+        public static Connection create() {
+            return new ConnectionImpl();
+        }
+    }
+
     /**
      * Returns a newly created session
      *
@@ -81,6 +90,8 @@ public interface Connection extends Endp
 
     public void setHostname(String hostname);
 
+    public String getHostname();
+
     public String getRemoteContainer();
 
     public String getRemoteHostname();

Added: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.engine;
+
+/**
+ * Engine
+ *
+ */
+
+public final class Engine
+{
+
+    private Engine()
+    {
+    }
+
+    public static Collector collector()
+    {
+        return Collector.Factory.create();
+    }
+
+    public static Connection connection()
+    {
+        return Connection.Factory.create();
+    }
+
+    public static Transport transport()
+    {
+        return Transport.Factory.create();
+    }
+
+    public static SslDomain sslDomain()
+    {
+        return SslDomain.Factory.create();
+    }
+
+    public static SslPeerDetails sslPeerDetails(String hostname, int port)
+    {
+        return SslPeerDetails.Factory.create(hostname, port);
+    }
+
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java Sat Sep  6 11:23:10 2014
@@ -29,19 +29,38 @@ package org.apache.qpid.proton.engine;
 public interface Event
 {
     public enum Category {
-        PROTOCOL;
+        CONNECTION,
+        SESSION,
+        LINK,
+        DELIVERY,
+        TRANSPORT;
     }
 
     public enum Type {
-        CONNECTION_REMOTE_STATE(Category.PROTOCOL, 1),
-        CONNECTION_LOCAL_STATE(Category.PROTOCOL, 2),
-        SESSION_REMOTE_STATE(Category.PROTOCOL, 3),
-        SESSION_LOCAL_STATE(Category.PROTOCOL, 4),
-        LINK_REMOTE_STATE(Category.PROTOCOL, 5),
-        LINK_LOCAL_STATE(Category.PROTOCOL, 6),
-        LINK_FLOW(Category.PROTOCOL, 7),
-        DELIVERY(Category.PROTOCOL, 8),
-        TRANSPORT(Category.PROTOCOL, 9);
+        CONNECTION_INIT(Category.CONNECTION, 1),
+        CONNECTION_OPEN(Category.CONNECTION, 2),
+        CONNECTION_REMOTE_OPEN(Category.CONNECTION, 3),
+        CONNECTION_CLOSE(Category.CONNECTION, 4),
+        CONNECTION_REMOTE_CLOSE(Category.CONNECTION, 5),
+        CONNECTION_FINAL(Category.CONNECTION, 6),
+
+        SESSION_INIT(Category.SESSION, 1),
+        SESSION_OPEN(Category.SESSION, 2),
+        SESSION_REMOTE_OPEN(Category.SESSION, 3),
+        SESSION_CLOSE(Category.SESSION, 4),
+        SESSION_REMOTE_CLOSE(Category.SESSION, 5),
+        SESSION_FINAL(Category.SESSION, 6),
+
+        LINK_INIT(Category.LINK, 1),
+        LINK_OPEN(Category.LINK, 2),
+        LINK_REMOTE_OPEN(Category.LINK, 3),
+        LINK_CLOSE(Category.LINK, 4),
+        LINK_REMOTE_CLOSE(Category.LINK, 5),
+        LINK_FLOW(Category.LINK, 6),
+        LINK_FINAL(Category.LINK, 7),
+
+        DELIVERY(Category.DELIVERY, 1),
+        TRANSPORT(Category.TRANSPORT, 1);
 
         private int _opcode;
         private Category _category;
@@ -72,4 +91,6 @@ public interface Event
 
     Transport getTransport();
 
+    Event copy();
+
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java Sat Sep  6 11:23:10 2014
@@ -49,7 +49,8 @@ public interface Sasl
         PN_SASL_SYS((byte)2),
         /** failed due to unrecoverable error */
         PN_SASL_PERM((byte)3),
-        PN_SASL_TEMP((byte)4);
+        PN_SASL_TEMP((byte)4),
+        PN_SASL_SKIPPED((byte)5);
 
         private final byte _code;
 
@@ -72,6 +73,7 @@ public interface Sasl
     public static SaslOutcome PN_SASL_SYS = SaslOutcome.PN_SASL_SYS;
     public static SaslOutcome PN_SASL_PERM = SaslOutcome.PN_SASL_PERM;
     public static SaslOutcome PN_SASL_TEMP = SaslOutcome.PN_SASL_TEMP;
+    public static SaslOutcome PN_SASL_SKIPPED = SaslOutcome.PN_SASL_SKIPPED;
 
     /**
      * Access the current state of the layer.
@@ -156,4 +158,10 @@ public interface Sasl
 
     void client();
     void server();
+
+    /**
+     * Set whether servers may accept incoming connections
+     * that skip the SASL layer negotiation.
+     */
+    void allowSkip(boolean allowSkip);
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java Sat Sep  6 11:23:10 2014
@@ -18,11 +18,21 @@
  */
 package org.apache.qpid.proton.engine;
 
+import org.apache.qpid.proton.engine.impl.ssl.SslDomainImpl;
+
 /**
  * I store the details used to create SSL sessions.
  */
 public interface SslDomain
 {
+
+    public static final class Factory
+    {
+        public static SslDomain create() {
+            return new SslDomainImpl();
+        }
+    }
+
     /**
      * Determines whether the endpoint acts as a client or server.
      */

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java Sat Sep  6 11:23:10 2014
@@ -18,6 +18,8 @@
  */
 package org.apache.qpid.proton.engine;
 
+import org.apache.qpid.proton.engine.impl.ssl.SslPeerDetailsImpl;
+
 /**
  * The details of the remote peer involved in an SSL session.
  *
@@ -28,6 +30,14 @@ package org.apache.qpid.proton.engine;
  */
 public interface SslPeerDetails
 {
+
+    public static final class Factory
+    {
+        public static SslPeerDetails create(String hostname, int port) {
+            return new SslPeerDetailsImpl(hostname, port);
+        }
+    }
+
     String getHostname();
     int getPort();
-}
\ No newline at end of file
+}

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java Sat Sep  6 11:23:10 2014
@@ -22,6 +22,8 @@ package org.apache.qpid.proton.engine;
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+
 
 /**
  * <p>
@@ -63,6 +65,19 @@ import java.nio.ByteBuffer;
  */
 public interface Transport extends Endpoint
 {
+
+    public static final class Factory
+    {
+        public static Transport create() {
+            return new TransportImpl();
+        }
+    }
+
+    public static final int TRACE_OFF = 0;
+    public static final int TRACE_RAW = 1;
+    public static final int TRACE_FRM = 2;
+    public static final int TRACE_DRV = 4;
+
     public static final int DEFAULT_MAX_FRAME_SIZE = -1;
 
     /** the lower bound for the agreed maximum frame size (in bytes). */
@@ -70,7 +85,10 @@ public interface Transport extends Endpo
     public int SESSION_WINDOW = 16*1024;
     public int END_OF_STREAM = -1;
 
+    public void trace(int levels);
+
     public void bind(Connection connection);
+    public void unbind();
 
     public int capacity();
     public ByteBuffer tail();
@@ -83,6 +101,8 @@ public interface Transport extends Endpo
     public void pop(int bytes);
     public void close_head();
 
+    public boolean isClosed();
+
     /**
      * Processes the provided input.
      *
@@ -156,7 +176,15 @@ public interface Transport extends Endpo
      */
     void outputConsumed();
 
-    Sasl sasl();
+    /**
+     * Signal the transport to expect SASL frames used to establish a SASL layer prior to
+     * performing the AMQP protocol version negotiation. This must first be performed before
+     * the transport is used for processing. Subsequent invocations will return the same
+     * {@link Sasl} object.
+     *
+     * @throws IllegalStateException if transport processing has already begun prior to initial invocation
+     */
+    Sasl sasl() throws IllegalStateException;
 
     /**
      * Wrap this transport's output and input to apply SSL encryption and decryption respectively.

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java Sat Sep  6 11:23:10 2014
@@ -35,25 +35,55 @@ import java.util.Queue;
 public class CollectorImpl implements Collector
 {
 
-    private Queue<Event> events = new LinkedList<Event>();
+    private EventImpl head;
+    private EventImpl tail;
+    private EventImpl free;
 
     public CollectorImpl()
     {}
 
     public Event peek()
     {
-        return events.peek();
+        return head;
     }
 
     public void pop()
     {
-        events.poll();
+        if (head != null) {
+            EventImpl next = head.next;
+            head.next = free;
+            free = head;
+            head.clear();
+            head = next;
+        }
     }
 
-    public EventImpl put(Event.Type type)
+    public EventImpl put(Event.Type type, Object context)
     {
-        EventImpl event = new EventImpl(type);
-        events.add(event);
+        if (tail != null && tail.getType() == type &&
+            tail.getContext() == context) {
+            return null;
+        }
+
+        EventImpl event;
+        if (free == null) {
+            event = new EventImpl();
+        } else {
+            event = free;
+            free = free.next;
+            event.next = null;
+        }
+
+        event.init(type, context);
+
+        if (head == null) {
+            head = event;
+            tail = event;
+        } else {
+            tail.next = event;
+            tail = event;
+        }
+
         return event;
     }
 

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java Sat Sep  6 11:23:10 2014
@@ -52,6 +52,7 @@ public class ConnectionImpl extends Endp
     private TransportImpl _transport;
     private DeliveryImpl _transportWorkHead;
     private DeliveryImpl _transportWorkTail;
+    private int _transportWorkSize = 0;
     private String _localContainerId = "";
     private String _localHostname = "";
     private String _remoteContainer;
@@ -182,16 +183,30 @@ public class ConnectionImpl extends Endp
         return this;
     }
 
-    public void free()
-    {
-        super.free();
-        for(Session session : _sessions)
-        {
+    @Override
+    void postFinal() {
+        put(Event.Type.CONNECTION_FINAL, this);
+    }
+
+    @Override
+    void doFree() {
+        for(Session session : _sessions) {
             session.free();
         }
         _sessions = null;
     }
 
+    void modifyEndpoints() {
+        if (_sessions != null) {
+            for (SessionImpl ssn: _sessions) {
+                ssn.modifyEndpoints();
+            }
+        }
+        if (!freed) {
+            modified();
+        }
+    }
+
     void handleOpen(Open open)
     {
         // TODO - store state
@@ -201,10 +216,7 @@ public class ConnectionImpl extends Endp
         setRemoteDesiredCapabilities(open.getDesiredCapabilities());
         setRemoteOfferedCapabilities(open.getOfferedCapabilities());
         setRemoteProperties(open.getProperties());
-        EventImpl ev = put(Event.Type.CONNECTION_REMOTE_STATE);
-        if (ev != null) {
-            ev.init(this);
-        }
+        put(Event.Type.CONNECTION_REMOTE_OPEN, this);
     }
 
 
@@ -489,6 +501,10 @@ public class ConnectionImpl extends Endp
         return _transportWorkHead;
     }
 
+    int getTransportWorkSize() {
+        return _transportWorkSize;
+    }
+
     public void removeTransportWork(DeliveryImpl delivery)
     {
         if (!delivery._transportWork) return;
@@ -517,6 +533,7 @@ public class ConnectionImpl extends Endp
         }
 
         delivery._transportWork = false;
+        _transportWorkSize--;
     }
 
     void addTransportWork(DeliveryImpl delivery)
@@ -538,6 +555,7 @@ public class ConnectionImpl extends Endp
         }
 
         delivery._transportWork = true;
+        _transportWorkSize++;
     }
 
     void workUpdate(DeliveryImpl delivery)
@@ -571,23 +589,40 @@ public class ConnectionImpl extends Endp
     public void collect(Collector collector)
     {
         _collector = (CollectorImpl) collector;
+
+        put(Event.Type.CONNECTION_INIT, this);
+
+        LinkNode<SessionImpl> ssn = _sessionHead;
+        while (ssn != null) {
+            put(Event.Type.SESSION_INIT, ssn.getValue());
+            ssn = ssn.getNext();
+        }
+
+        LinkNode<LinkImpl> lnk = _linkHead;
+        while (lnk != null) {
+            put(Event.Type.LINK_INIT, lnk.getValue());
+            lnk = lnk.getNext();
+        }
     }
 
-    EventImpl put(Event.Type type)
+    EventImpl put(Event.Type type, Object context)
     {
         if (_collector != null) {
-            return _collector.put(type);
+            return _collector.put(type, context);
         } else {
             return null;
         }
     }
 
     @Override
-    protected void localStateChanged()
+    void localOpen()
     {
-        EventImpl ev = put(Event.Type.CONNECTION_LOCAL_STATE);
-        if (ev != null) {
-            ev.init(this);
-        }
+        put(Event.Type.CONNECTION_OPEN, this);
+    }
+
+    @Override
+    void localClose()
+    {
+        put(Event.Type.CONNECTION_CLOSE, this);
     }
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java Sat Sep  6 11:23:10 2014
@@ -37,7 +37,27 @@ public abstract class EndpointImpl imple
     private EndpointImpl _transportPrev;
     private Object _context;
 
-    protected abstract void localStateChanged();
+    private int refcount = 1;
+    boolean freed = false;
+
+    void incref() {
+        refcount++;
+    }
+
+    void decref() {
+        refcount--;
+        if (refcount == 0) {
+            postFinal();
+        } else if (refcount < 0) {
+            throw new IllegalStateException();
+        }
+    }
+
+    abstract void postFinal();
+
+    abstract void localOpen();
+
+    abstract void localClose();
 
     public void open()
     {
@@ -49,7 +69,7 @@ public abstract class EndpointImpl imple
                 // TODO
             case UNINITIALIZED:
                 _localState = EndpointState.ACTIVE;
-                localStateChanged();
+                localOpen();
         }
         modified();
     }
@@ -65,7 +85,7 @@ public abstract class EndpointImpl imple
                 // TODO
             case ACTIVE:
                 _localState = EndpointState.CLOSED;
-                localStateChanged();
+                localClose();
         }
         modified();
     }
@@ -129,9 +149,9 @@ public abstract class EndpointImpl imple
 
         if (emit) {
             ConnectionImpl conn = getConnectionImpl();
-            EventImpl ev = conn.put(Event.Type.TRANSPORT);
-            if (ev != null) {
-                ev.init(conn);
+            TransportImpl trans = conn.getTransport();
+            if (trans != null) {
+                conn.put(Event.Type.TRANSPORT, trans);
             }
         }
     }
@@ -162,16 +182,15 @@ public abstract class EndpointImpl imple
         return _transportPrev;
     }
 
-    public void free()
+    abstract void doFree();
+
+    final public void free()
     {
-        if(_transportNext != null)
-        {
-            _transportNext.setTransportPrev(_transportPrev);
-        }
-        if(_transportPrev != null)
-        {
-            _transportPrev.setTransportNext(_transportNext);
-        }
+        if (freed) return;
+        freed = true;
+
+        doFree();
+        decref();
     }
 
     void setTransportNext(EndpointImpl transportNext)
@@ -194,9 +213,4 @@ public abstract class EndpointImpl imple
         _context = context;
     }
 
-    @Override
-    public String toString()
-    {
-        return "EndpointImpl(" + System.identityHashCode(this) + ") [_localState=" + _localState + ", _remoteState=" + _remoteState + ", _localError=" + _localError + ", _remoteError=" + _remoteError + "]";
-    }
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java Sat Sep  6 11:23:10 2014
@@ -36,15 +36,24 @@ class EventImpl implements Event
 {
 
     Type type;
-    Connection connection;
-    Session session;
-    Link link;
-    Delivery delivery;
-    Transport transport;
+    Object context;
+    EventImpl next;
 
-    EventImpl(Type type)
+    EventImpl()
+    {
+        this.type = null;
+    }
+
+    void init(Event.Type type, Object context)
     {
         this.type = type;
+        this.context = context;
+    }
+
+    void clear()
+    {
+        type = null;
+        context = null;
     }
 
     public Category getCategory()
@@ -57,58 +66,88 @@ class EventImpl implements Event
         return type;
     }
 
+    public Object getContext()
+    {
+        return context;
+    }
+
     public Connection getConnection()
     {
-        return connection;
+        switch (type.getCategory()) {
+        case CONNECTION:
+            return (Connection) context;
+        case TRANSPORT:
+            Transport transport = getTransport();
+            if (transport == null) {
+                return null;
+            }
+            return ((TransportImpl) transport).getConnectionImpl();
+        default:
+            Session ssn = getSession();
+            if (ssn == null) {
+                return null;
+            }
+            return ssn.getConnection();
+        }
     }
 
     public Session getSession()
     {
-        return session;
+        switch (type.getCategory()) {
+        case SESSION:
+            return (Session) context;
+        default:
+            Link link = getLink();
+            if (link == null) {
+                return null;
+            }
+            return link.getSession();
+        }
     }
 
     public Link getLink()
     {
-        return link;
+        switch (type.getCategory()) {
+        case LINK:
+            return (Link) context;
+        default:
+            Delivery dlv = getDelivery();
+            if (dlv == null) {
+                return null;
+            }
+            return dlv.getLink();
+        }
     }
 
     public Delivery getDelivery()
     {
-        return delivery;
+        switch (type.getCategory()) {
+        case DELIVERY:
+            return (Delivery) context;
+        default:
+            return null;
+        }
     }
 
     public Transport getTransport()
     {
-        return transport;
-    }
-
-    void init(Transport transport)
-    {
-        this.transport = transport;
-    }
-
-    void init(Connection connection)
-    {
-        this.connection = connection;
-        init(((ConnectionImpl) connection).getTransport());
+        switch (type.getCategory()) {
+        case TRANSPORT:
+            return (Transport) context;
+        default:
+            return null;
+        }
     }
-
-    void init(Session session)
+    public Event copy()
     {
-        this.session = session;
-        init(session.getConnection());
+       EventImpl newEvent = new EventImpl();
+       newEvent.init(type, context);
+       return newEvent;
     }
 
-    void init(Link link)
+    @Override
+    public String toString()
     {
-        this.link = link;
-        init(link.getSession());
+        return "EventImpl{" + "type=" + type + ", context=" + context + '}';
     }
-
-    void init(Delivery delivery)
-    {
-        this.delivery = delivery;
-        init(delivery.getLink());
-    }
-
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java Sat Sep  6 11:23:10 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.proton.engine.impl;
 
+import org.apache.qpid.proton.engine.TransportException;
 import org.apache.qpid.proton.framing.TransportFrame;
 
 public interface FrameHandler
@@ -31,7 +32,7 @@ public interface FrameHandler
      */
     boolean handleFrame(TransportFrame frame);
 
-    void closed();
+    void closed(TransportException error);
 
     /**
      * Returns whether I am currently able to handle frames.

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java Sat Sep  6 11:23:10 2014
@@ -40,6 +40,8 @@ class FrameParser implements TransportIn
 {
     private static final Logger TRACE_LOGGER = Logger.getLogger("proton.trace");
 
+    private static final ByteBuffer _emptyInputBuffer = newWriteableBuffer(0);
+
     private enum State
     {
         HEADER0,
@@ -62,8 +64,9 @@ class FrameParser implements TransportIn
 
     private final FrameHandler _frameHandler;
     private final ByteBufferDecoder _decoder;
+    private final int _maxFrameSize;
 
-    private final ByteBuffer _inputBuffer;
+    private ByteBuffer _inputBuffer = null;
     private boolean _tail_closed = false;
 
     private State _state = State.HEADER0;
@@ -87,11 +90,7 @@ class FrameParser implements TransportIn
     {
         _frameHandler = frameHandler;
         _decoder = decoder;
-        if (maxFrameSize > 0) {
-            _inputBuffer = newWriteableBuffer(maxFrameSize);
-        } else {
-            _inputBuffer = newWriteableBuffer(4*1024);
-        }
+        _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024;
     }
 
     private void input(ByteBuffer in) throws TransportException
@@ -372,6 +371,7 @@ class FrameParser implements TransportIn
 
                             _decoder.setByteBuffer(in);
                             Object val = _decoder.readObject();
+                            _decoder.setByteBuffer(null);
 
                             Binary payload;
 
@@ -447,7 +447,7 @@ class FrameParser implements TransportIn
                 state = State.ERROR;
                 frameParsingError = new TransportException("connection aborted");
             } else {
-                _frameHandler.closed();
+                _frameHandler.closed(null);
             }
         }
 
@@ -460,7 +460,7 @@ class FrameParser implements TransportIn
             if(frameParsingError != null)
             {
                 _parsingError = frameParsingError;
-                throw frameParsingError;
+                _frameHandler.closed(frameParsingError);
             }
             else
             {
@@ -475,7 +475,11 @@ class FrameParser implements TransportIn
         if (_tail_closed) {
             return Transport.END_OF_STREAM;
         } else {
-            return _inputBuffer.remaining();
+            if (_inputBuffer != null) {
+                return _inputBuffer.remaining();
+            } else {
+                return _maxFrameSize;
+            }
         }
     }
 
@@ -483,27 +487,41 @@ class FrameParser implements TransportIn
     public ByteBuffer tail()
     {
         if (_tail_closed) {
-            if (_parsingError != null) {
-                throw new TransportException(_parsingError.getMessage());
-            } else {
-                throw new TransportException("tail closed");
-            }
+            throw new TransportException("tail closed");
+        }
+
+        if (_inputBuffer == null) {
+            _inputBuffer = newWriteableBuffer(_maxFrameSize);
         }
+
         return _inputBuffer;
     }
 
     @Override
     public void process() throws TransportException
     {
-        _inputBuffer.flip();
-
-        try
+        if (_inputBuffer != null)
         {
-            input(_inputBuffer);
+            _inputBuffer.flip();
+
+            try
+            {
+                input(_inputBuffer);
+            }
+            finally
+            {
+                if (_inputBuffer.hasRemaining()) {
+                    _inputBuffer.compact();
+                } else if (_inputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) {
+                    _inputBuffer = null;
+                } else {
+                    _inputBuffer.clear();
+                }
+            }
         }
-        finally
+        else
         {
-            _inputBuffer.compact();
+            input(_emptyInputBuffer);
         }
     }
 

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java Sat Sep  6 11:23:10 2014
@@ -45,15 +45,15 @@ class FrameWriter
     private WritableBuffer _buffer;
     private int _maxFrameSize;
     private byte _frameType;
-    private ProtocolTracer _protocolTracer;
-    private Object _logCtx;
+    final private Ref<ProtocolTracer> _protocolTracer;
+    private TransportImpl _transport;
 
     private int _frameStart = 0;
     private int _payloadStart;
     private int _performativeSize;
 
     FrameWriter(EncoderImpl encoder, int maxFrameSize, byte frameType,
-                ProtocolTracer protocolTracer, Object logCtx)
+                Ref<ProtocolTracer> protocolTracer, TransportImpl transport)
     {
         _encoder = encoder;
         _bbuf = ByteBuffer.allocate(1024);
@@ -62,7 +62,7 @@ class FrameWriter
         _maxFrameSize = maxFrameSize;
         _frameType = frameType;
         _protocolTracer = protocolTracer;
-        _logCtx = logCtx;
+        _transport = transport;
     }
 
     void setMaxFrameSize(int maxFrameSize)
@@ -155,11 +155,12 @@ class FrameWriter
         // code, further refactor will fix this
         if (_frameType == AMQP_FRAME_TYPE) {
             TransportFrame frame = new TransportFrame(channel, (FrameBody) frameBody, Binary.create(originalPayload));
-            TransportImpl.log(_logCtx, TransportImpl.OUTGOING, frame);
+            _transport.log(TransportImpl.OUTGOING, frame);
 
-            if( _protocolTracer!=null )
+            ProtocolTracer tracer = _protocolTracer.get();
+            if(tracer != null)
             {
-                _protocolTracer.sentFrame(frame);
+                tracer.sentFrame(frame);
             }
         }
 
@@ -191,6 +192,11 @@ class FrameWriter
         writeFrame(0, frameBody, null, null);
     }
 
+    boolean isFull() {
+        // XXX: this should probably be tunable
+        return _bbuf.position() > 64*1024;
+    }
+
     int readBytes(ByteBuffer dst)
     {
         ByteBuffer src = _bbuf.duplicate();

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Sat Sep  6 11:23:10 2014
@@ -53,14 +53,17 @@ public abstract class LinkImpl extends E
     private ReceiverSettleMode _remoteReceiverSettleMode;
 
 
-    private final LinkNode<LinkImpl> _node;
+    private LinkNode<LinkImpl> _node;
     private boolean _drain;
 
     LinkImpl(SessionImpl session, String name)
     {
         _session = session;
+        _session.incref();
         _name = name;
-        _node = session.getConnectionImpl().addLinkEndpoint(this);
+        ConnectionImpl conn = session.getConnectionImpl();
+        _node = conn.addLinkEndpoint(this);
+        conn.put(Event.Type.LINK_INIT, this);
     }
 
 
@@ -103,11 +106,21 @@ public abstract class LinkImpl extends E
         }
     }
 
-    public void free()
+    @Override
+    void postFinal() {
+        _session.getConnectionImpl().put(Event.Type.LINK_FINAL, this);
+        _session.decref();
+    }
+
+    @Override
+    void doFree()
     {
-        super.free();
         _session.getConnectionImpl().removeLinkEndpoint(_node);
-        //TODO.
+        _node = null;
+    }
+
+    void modifyEndpoints() {
+        modified();
     }
 
     public void remove(DeliveryImpl delivery)
@@ -375,11 +388,14 @@ public abstract class LinkImpl extends E
     }
 
     @Override
-    protected void localStateChanged()
+    void localOpen()
     {
-        EventImpl ev = getConnectionImpl().put(Event.Type.LINK_LOCAL_STATE);
-        if (ev != null) {
-            ev.init(this);
-        }
+        getConnectionImpl().put(Event.Type.LINK_OPEN, this);
+    }
+
+    @Override
+    void localClose()
+    {
+        getConnectionImpl().put(Event.Type.LINK_CLOSE, this);
     }
 }

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java Sat Sep  6 11:23:10 2014
@@ -96,12 +96,11 @@ public class ReceiverImpl extends LinkIm
         return consumed;
     }
 
-    public void free()
+    @Override
+    void doFree()
     {
         getSession().freeReceiver(this);
-
-        super.free();
-        //TODO.
+        super.doFree();
     }
 
     boolean hasIncoming()

Added: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java Sat Sep  6 11:23:10 2014
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.engine.impl;
+
+
+/**
+ * Ref
+ *
+ */
+
+class Ref<T>
+{
+
+    T value;
+
+    public Ref(T initial) {
+        value = initial;
+    }
+
+    public T get() {
+        return value;
+    }
+
+    public void set(T value) {
+        this.value = value;
+    }
+
+}

Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java Sat Sep  6 11:23:10 2014
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.qpid.proton.ProtonUnsupportedOperationException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.security.SaslChallenge;
@@ -53,6 +54,8 @@ public class SaslImpl implements Sasl, S
     private final DecoderImpl _decoder = new DecoderImpl();
     private final EncoderImpl _encoder = new EncoderImpl(_decoder);
 
+    private final TransportImpl _transport;
+
     private boolean _tail_closed = false;
     private final ByteBuffer _inputBuffer;
     private boolean _head_closed = false;
@@ -86,14 +89,15 @@ public class SaslImpl implements Sasl, S
      * returned by {@link SaslTransportWrapper#getInputBuffer()} and
      * {@link SaslTransportWrapper#getOutputBuffer()}.
      */
-    SaslImpl(int maxFrameSize)
+    SaslImpl(TransportImpl transport, int maxFrameSize)
     {
+        _transport = transport;
         _inputBuffer = newWriteableBuffer(maxFrameSize);
         _outputBuffer = newWriteableBuffer(maxFrameSize);
 
         AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
         _frameParser = new SaslFrameParser(this, _decoder);
-        _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, this);
+        _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, _transport);
     }
 
     @Override
@@ -460,6 +464,13 @@ public class SaslImpl implements Sasl, S
         _role = Role.SERVER;
     }
 
+    @Override
+    public void allowSkip(boolean allowSkip)
+    {
+        //TODO: implement
+        throw new ProtonUnsupportedOperationException();
+    }
+
     public TransportWrapper wrap(final TransportInput input, final TransportOutput output)
     {
         return new SaslTransportWrapper(input, output);
@@ -565,6 +576,9 @@ public class SaslImpl implements Sasl, S
             _tail_closed = true;
             if (isInputInSaslMode()) {
                 _head_closed = true;
+                _underlyingInput.close_tail();
+            } else {
+                _underlyingInput.close_tail();
             }
         }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org