You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by fa...@apache.org on 2014/09/06 13:23:14 UTC
svn commit: r1622849 [7/9] - in
/qpid/proton/branches/fadams-javascript-binding: ./ contrib/
contrib/proton-hawtdispatch/ contrib/proton-hawtdispatch/src/
contrib/proton-hawtdispatch/src/main/
contrib/proton-hawtdispatch/src/main/java/ contrib/proton-h...
Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/selector.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/selector.c?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/selector.c (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/selector.c Sat Sep 6 11:23:10 2014
@@ -19,21 +19,6 @@
*
*/
-/*
- * Copy of posix poll-based selector with minimal changes to use
- * select(). TODO: fully native implementaton with I/O completion
- * ports.
- *
- * This implementation comments out the posix max_fds arg to select
- * which has no meaning on windows. The number of fd_set slots are
- * configured at compile time via FD_SETSIZE, chosen "large enough"
- * for the limited scalability of select() at the expense of
- * 3*N*sizeof(unsigned int) bytes per driver instance. select (and
- * associated macros like FD_ZERO) are otherwise unaffected
- * performance-wise by increasing FD_SETSIZE.
- */
-
-#define FD_SETSIZE 2048
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0501
#endif
@@ -44,37 +29,53 @@
#include <Ws2tcpip.h>
#define PN_WINAPI
-#include "../platform.h"
+#include "platform.h"
+#include <proton/object.h>
#include <proton/io.h>
#include <proton/selector.h>
#include <proton/error.h>
#include <assert.h>
-#include "../selectable.h"
-#include "../util.h"
+#include "selectable.h"
+#include "util.h"
+#include "iocp.h"
+
+static void interests_update(iocpdesc_t *iocpd, int interests);
+static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t);
struct pn_selector_t {
- fd_set readfds;
- fd_set writefds;
- fd_set exceptfds;
+ iocp_t *iocp;
pn_timestamp_t *deadlines;
size_t capacity;
pn_list_t *selectables;
+ pn_list_t *iocp_descriptors;
pn_timestamp_t deadline;
size_t current;
+ iocpdesc_t *current_triggered;
pn_timestamp_t awoken;
pn_error_t *error;
+ iocpdesc_t *triggered_list_head;
+ iocpdesc_t *triggered_list_tail;
+ iocpdesc_t *deadlines_head;
+ iocpdesc_t *deadlines_tail;
};
void pn_selector_initialize(void *obj)
{
pn_selector_t *selector = (pn_selector_t *) obj;
+ selector->iocp = NULL;
selector->deadlines = NULL;
selector->capacity = 0;
selector->selectables = pn_list(0, 0);
+ selector->iocp_descriptors = pn_list(0, PN_REFCOUNT);
selector->deadline = 0;
selector->current = 0;
+ selector->current_triggered = NULL;
selector->awoken = 0;
selector->error = pn_error();
+ selector->triggered_list_head = NULL;
+ selector->triggered_list_tail = NULL;
+ selector->deadlines_head = NULL;
+ selector->deadlines_tail = NULL;
}
void pn_selector_finalize(void *obj)
@@ -82,28 +83,51 @@ void pn_selector_finalize(void *obj)
pn_selector_t *selector = (pn_selector_t *) obj;
free(selector->deadlines);
pn_free(selector->selectables);
+ pn_free(selector->iocp_descriptors);
pn_error_free(selector->error);
+ selector->iocp->selector = NULL;
}
#define pn_selector_hashcode NULL
#define pn_selector_compare NULL
#define pn_selector_inspect NULL
-pn_selector_t *pn_selector(void)
+pn_selector_t *pni_selector()
{
- static pn_class_t clazz = PN_CLASS(pn_selector);
+ static const pn_class_t clazz = PN_CLASS(pn_selector);
pn_selector_t *selector = (pn_selector_t *) pn_new(sizeof(pn_selector_t), &clazz);
return selector;
}
+pn_selector_t *pni_selector_create(iocp_t *iocp)
+{
+ pn_selector_t *selector = pni_selector();
+ selector->iocp = iocp;
+ return selector;
+}
+
void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
{
assert(selector);
assert(selectable);
assert(pni_selectable_get_index(selectable) < 0);
+ pn_socket_t sock = pn_selectable_fd(selectable);
+
+ iocpdesc_t *iocpd = NULL;
+ if (sock != INVALID_SOCKET) {
+ iocpd = pni_iocpdesc_map_get(selector->iocp, sock);
+ if (!iocpd) {
+ // Socket created outside proton. Hook it up to iocp.
+ iocpd = pni_iocpdesc_create(selector->iocp, sock, true);
+ pni_iocpdesc_start(iocpd);
+ } else {
+ assert(iocpd->iocp == selector->iocp);
+ }
+ }
if (pni_selectable_get_index(selectable) < 0) {
pn_list_add(selector->selectables, selectable);
+ pn_list_add(selector->iocp_descriptors, iocpd);
size_t size = pn_list_size(selector->selectables);
if (selector->capacity < size) {
@@ -112,6 +136,10 @@ void pn_selector_add(pn_selector_t *sele
}
pni_selectable_set_index(selectable, size - 1);
+ if (iocpd) {
+ iocpd->selector = selector;
+ iocpd->selectable = selectable;
+ }
}
pn_selector_update(selector, selectable);
@@ -121,18 +149,22 @@ void pn_selector_update(pn_selector_t *s
{
int idx = pni_selectable_get_index(selectable);
assert(idx >= 0);
- /*
- selector->fds[idx].fd = pn_selectable_fd(selectable);
- selector->fds[idx].events = 0;
- selector->fds[idx].revents = 0;
- if (pn_selectable_capacity(selectable) > 0) {
- selector->fds[idx].events |= POLLIN;
- }
- if (pn_selectable_pending(selectable) > 0) {
- selector->fds[idx].events |= POLLOUT;
- }
- */
selector->deadlines[idx] = pn_selectable_deadline(selectable);
+
+ pn_socket_t sock = pn_selectable_fd(selectable);
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
+ if (iocpd) {
+ assert(sock == iocpd->socket || iocpd->closing);
+ int interests = 0;
+ if (pn_selectable_capacity(selectable) > 0) {
+ interests |= PN_READABLE;
+ }
+ if (pn_selectable_pending(selectable) > 0) {
+ interests |= PN_WRITABLE;
+ }
+ interests_update(iocpd, interests);
+ deadlines_update(iocpd, selector->deadlines[idx]);
+ }
}
void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable)
@@ -142,107 +174,94 @@ void pn_selector_remove(pn_selector_t *s
int idx = pni_selectable_get_index(selectable);
assert(idx >= 0);
+ iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
+ if (iocpd) {
+ if (selector->current_triggered == iocpd)
+ selector->current_triggered = iocpd->triggered_list_next;
+ interests_update(iocpd, 0);
+ deadlines_update(iocpd, 0);
+ assert(selector->triggered_list_head != iocpd && !iocpd->triggered_list_prev);
+ assert(selector->deadlines_head != iocpd && !iocpd->deadlines_prev);
+ iocpd->selector = NULL;
+ iocpd->selectable = NULL;
+ }
pn_list_del(selector->selectables, idx, 1);
+ pn_list_del(selector->iocp_descriptors, idx, 1);
size_t size = pn_list_size(selector->selectables);
for (size_t i = idx; i < size; i++) {
pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i);
pni_selectable_set_index(sel, i);
}
-
pni_selectable_set_index(selectable, -1);
}
int pn_selector_select(pn_selector_t *selector, int timeout)
{
assert(selector);
-
- FD_ZERO(&selector->readfds);
- FD_ZERO(&selector->writefds);
- FD_ZERO(&selector->exceptfds);
-
- size_t size = pn_list_size(selector->selectables);
- if (size > FD_SETSIZE) {
- // This Windows limitation will go away when switching to completion ports
- pn_error_set(selector->error, PN_ERR, "maximum sockets exceeded for Windows selector");
- return PN_ERR;
- }
+ pn_error_clear(selector->error);
+ pn_timestamp_t deadline = 0;
+ pn_timestamp_t now = pn_i_now();
if (timeout) {
- pn_timestamp_t deadline = 0;
- for (size_t i = 0; i < size; i++) {
- pn_timestamp_t d = selector->deadlines[i];
- if (d)
- deadline = (deadline == 0) ? d : pn_min(deadline, d);
- }
-
- if (deadline) {
- pn_timestamp_t now = pn_i_now();
- int delta = selector->deadline - now;
- if (delta < 0) {
- timeout = 0;
- } else if (delta < timeout) {
- timeout = delta;
- }
- }
- }
-
- struct timeval to = {0};
- struct timeval *to_arg = &to;
- // block only if (timeout == 0) and (closed_count == 0)
- if (timeout > 0) {
- // convert millisecs to sec and usec:
- to.tv_sec = timeout/1000;
- to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
- }
- else if (timeout < 0) {
- to_arg = NULL;
+ if (selector->deadlines_head)
+ deadline = selector->deadlines_head->deadline;
}
-
- for (size_t i = 0; i < size; i++) {
- pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i);
- pn_socket_t fd = pn_selectable_fd(sel);
- if (pn_selectable_capacity(sel) > 0) {
- FD_SET(fd, &selector->readfds);
- }
- if (pn_selectable_pending(sel) > 0) {
- FD_SET(fd, &selector->writefds);
+ if (deadline) {
+ int delta = deadline - now;
+ if (delta < 0) {
+ delta = 0;
+ }
+ if (timeout < 0)
+ timeout = delta;
+ else if (timeout > delta)
+ timeout = delta;
+ }
+ deadline = (timeout >= 0) ? now + timeout : 0;
+
+ // Process all currently available completions, even if matched events available
+ pni_iocp_drain_completions(selector->iocp);
+ pni_zombie_check(selector->iocp, now);
+ // Loop until an interested event is matched, or until deadline
+ while (true) {
+ if (selector->triggered_list_head)
+ break;
+ if (deadline && deadline <= now)
+ break;
+ pn_timestamp_t completion_deadline = deadline;
+ pn_timestamp_t zd = pni_zombie_deadline(selector->iocp);
+ if (zd)
+ completion_deadline = completion_deadline ? pn_min(zd, completion_deadline) : zd;
+
+ int completion_timeout = (!completion_deadline) ? -1 : completion_deadline - now;
+ int rv = pni_iocp_wait_one(selector->iocp, completion_timeout, selector->error);
+ if (rv < 0)
+ return pn_error_code(selector->error);
+
+ now = pn_i_now();
+ if (zd && zd <= now) {
+ pni_zombie_check(selector->iocp, now);
}
}
- int result = select(0 /* ignored in win32 */, &selector->readfds, &selector->writefds, &selector->exceptfds, to_arg);
- if (result == -1) {
- pn_i_error_from_errno(selector->error, "select");
- } else {
- selector->current = 0;
- selector->awoken = pn_i_now();
+ selector->current = 0;
+ selector->awoken = now;
+ selector->current_triggered = selector->triggered_list_head;
+ for (iocpdesc_t *iocpd = selector->deadlines_head; iocpd; iocpd = iocpd->deadlines_next) {
+ if (iocpd->deadline <= now)
+ pni_events_update(iocpd, iocpd->events | PN_EXPIRED);
+ else
+ break;
}
-
return pn_error_code(selector->error);
}
pn_selectable_t *pn_selector_next(pn_selector_t *selector, int *events)
{
- pn_list_t *l = selector->selectables;
- size_t size = pn_list_size(l);
- while (selector->current < size) {
- pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(l, selector->current);
- pn_timestamp_t deadline = selector->deadlines[selector->current];
- int ev = 0;
- pn_socket_t fd = pn_selectable_fd(sel);
- if (FD_ISSET(fd, &selector->readfds)) {
- ev |= PN_READABLE;
- }
- if (FD_ISSET(fd, &selector->writefds)) {
- ev |= PN_WRITABLE;
- }
- if (deadline && selector->awoken >= deadline) {
- ev |= PN_EXPIRED;
- }
- selector->current++;
- if (ev) {
- *events = ev;
- return sel;
- }
+ if (selector->current_triggered) {
+ iocpdesc_t *iocpd = selector->current_triggered;
+ *events = iocpd->interests & iocpd->events;
+ selector->current_triggered = iocpd->triggered_list_next;
+ return iocpd->selectable;
}
return NULL;
}
@@ -252,3 +271,91 @@ void pn_selector_free(pn_selector_t *sel
assert(selector);
pn_free(selector);
}
+
+
+static void triggered_list_add(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+ if (iocpd->triggered_list_prev || selector->triggered_list_head == iocpd)
+ return; // already in list
+ LL_ADD(selector, triggered_list, iocpd);
+}
+
+static void triggered_list_remove(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+ if (!iocpd->triggered_list_prev && selector->triggered_list_head != iocpd)
+ return; // not in list
+ LL_REMOVE(selector, triggered_list, iocpd);
+ iocpd->triggered_list_prev = NULL;
+ iocpd->triggered_list_next = NULL;
+}
+
+
+void pni_events_update(iocpdesc_t *iocpd, int events)
+{
+ int old_events = iocpd->events;
+ if (old_events == events)
+ return;
+ iocpd->events = events;
+ if (iocpd->selector) {
+ if (iocpd->events & iocpd->interests)
+ triggered_list_add(iocpd->selector, iocpd);
+ else
+ triggered_list_remove(iocpd->selector, iocpd);
+ }
+}
+
+static void interests_update(iocpdesc_t *iocpd, int interests)
+{
+ int old_interests = iocpd->interests;
+ if (old_interests == interests)
+ return;
+ iocpd->interests = interests;
+ if (iocpd->selector) {
+ if (iocpd->events & iocpd->interests)
+ triggered_list_add(iocpd->selector, iocpd);
+ else
+ triggered_list_remove(iocpd->selector, iocpd);
+ }
+}
+
+static void deadlines_remove(pn_selector_t *selector, iocpdesc_t *iocpd)
+{
+ if (!iocpd->deadlines_prev && selector->deadlines_head != iocpd)
+ return; // not in list
+ LL_REMOVE(selector, deadlines, iocpd);
+ iocpd->deadlines_prev = NULL;
+ iocpd->deadlines_next = NULL;
+}
+
+
+static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t deadline)
+{
+ if (deadline == iocpd->deadline)
+ return;
+ iocpd->deadline = deadline;
+ pn_selector_t *selector = iocpd->selector;
+ if (!deadline) {
+ deadlines_remove(selector, iocpd);
+ pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED);
+ interests_update(iocpd, iocpd->interests & ~PN_EXPIRED);
+ } else {
+ if (iocpd->deadlines_prev || selector->deadlines_head == iocpd) {
+ deadlines_remove(selector, iocpd);
+ pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED);
+ }
+ interests_update(iocpd, iocpd->interests | PN_EXPIRED);
+ iocpdesc_t *dl_iocpd = LL_HEAD(selector, deadlines);
+ while (dl_iocpd && dl_iocpd->deadline <= deadline)
+ dl_iocpd = dl_iocpd->deadlines_next;
+ if (dl_iocpd) {
+ // insert
+ iocpd->deadlines_prev = dl_iocpd->deadlines_prev;
+ iocpd->deadlines_next = dl_iocpd;
+ dl_iocpd->deadlines_prev = iocpd;
+ if (selector->deadlines_head == dl_iocpd)
+ selector->deadlines_head = iocpd;
+ } else {
+ LL_ADD(selector, deadlines, iocpd); // append
+ }
+ }
+}
Added: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c (added)
+++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c Sat Sep 6 11:23:10 2014
@@ -0,0 +1,312 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * A simple write buffer pool. Each socket has a dedicated "primary"
+ * buffer and can borrow from a shared pool with limited size tuning.
+ * Could enhance e.g. with separate pools per network interface and fancier
+ * memory tuning based on interface speed, system resources, and
+ * number of connections, etc.
+ */
+
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <Ws2tcpip.h>
+#define PN_WINAPI
+
+#include "platform.h"
+#include <proton/object.h>
+#include <proton/io.h>
+#include <proton/selector.h>
+#include <proton/error.h>
+#include <assert.h>
+#include "selectable.h"
+#include "util.h"
+#include "iocp.h"
+
+// Max overlapped writes per socket
+#define IOCP_MAX_OWRITES 16
+// Write buffer size
+#define IOCP_WBUFSIZE 16384
+
+static void pipeline_log(const char *fmt, ...)
+{
+ va_list ap;
+ va_start(ap, fmt);
+ vfprintf(stderr, fmt, ap);
+ va_end(ap);
+ fflush(stderr);
+}
+
+void pni_shared_pool_create(iocp_t *iocp)
+{
+ // TODO: more pools (or larger one) when using multiple non-loopback interfaces
+ iocp->shared_pool_size = 16;
+ char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging
+ if (env) {
+ int sz = atoi(env);
+ if (sz >= 0 && sz < 256) {
+ iocp->shared_pool_size = sz;
+ }
+ }
+ iocp->loopback_bufsize = 0;
+ env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging
+ if (env) {
+ int sz = atoi(env);
+ if (sz >= 0 && sz <= 128 * 1024) {
+ iocp->loopback_bufsize = sz;
+ }
+ }
+
+ if (iocp->shared_pool_size) {
+ iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
+ HRESULT status = GetLastError();
+ if (!iocp->shared_pool_memory) {
+ perror("Proton write buffer pool allocation failure\n");
+ iocp->shared_pool_size = 0;
+ iocp->shared_available_count = 0;
+ return;
+ }
+
+ iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+ iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+ iocp->shared_available_count = iocp->shared_pool_size;
+ char *mem = iocp->shared_pool_memory;
+ for (int i = 0; i < iocp->shared_pool_size; i++) {
+ iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE);
+ mem += IOCP_WBUFSIZE;
+ }
+ }
+}
+
+void pni_shared_pool_free(iocp_t *iocp)
+{
+ for (int i = 0; i < iocp->shared_pool_size; i++) {
+ write_result_t *result = iocp->shared_results[i];
+ if (result->in_use)
+ pipeline_log("Proton buffer pool leak\n");
+ else
+ free(result);
+ }
+ if (iocp->shared_pool_size) {
+ free(iocp->shared_results);
+ free(iocp->available_results);
+ if (iocp->shared_pool_memory) {
+ if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) {
+ perror("write buffers release failed");
+ }
+ iocp->shared_pool_memory = NULL;
+ }
+ }
+}
+
+static void shared_pool_push(write_result_t *result)
+{
+ iocp_t *iocp = result->base.iocpd->iocp;
+ assert(iocp->shared_available_count < iocp->shared_pool_size);
+ iocp->available_results[iocp->shared_available_count++] = result;
+}
+
+static write_result_t *shared_pool_pop(iocp_t *iocp)
+{
+ return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL;
+}
+
+struct write_pipeline_t {
+ iocpdesc_t *iocpd;
+ size_t pending_count;
+ write_result_t *primary;
+ size_t reserved_count;
+ size_t next_primary_index;
+ size_t depth;
+ bool is_writer;
+};
+
+#define write_pipeline_compare NULL
+#define write_pipeline_inspect NULL
+#define write_pipeline_hashcode NULL
+
+static void write_pipeline_initialize(void *object)
+{
+ write_pipeline_t *pl = (write_pipeline_t *) object;
+ pl->pending_count = 0;
+ const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE);
+ pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE);
+ pl->depth = 0;
+ pl->is_writer = false;
+}
+
+static void write_pipeline_finalize(void *object)
+{
+ write_pipeline_t *pl = (write_pipeline_t *) object;
+ free((void *)pl->primary->buffer.start);
+ free(pl->primary);
+}
+
+write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd)
+{
+ static const pn_class_t clazz = PN_CLASS(write_pipeline);
+ write_pipeline_t *pipeline = (write_pipeline_t *) pn_new(sizeof(write_pipeline_t), &clazz);
+ pipeline->iocpd = iocpd;
+ pipeline->primary->base.iocpd = iocpd;
+ return pipeline;
+}
+
+static void confirm_as_writer(write_pipeline_t *pl)
+{
+ if (!pl->is_writer) {
+ iocp_t *iocp = pl->iocpd->iocp;
+ iocp->writer_count++;
+ pl->is_writer = true;
+ }
+}
+
+static void remove_as_writer(write_pipeline_t *pl)
+{
+ if (!pl->is_writer)
+ return;
+ iocp_t *iocp = pl->iocpd->iocp;
+ assert(iocp->writer_count);
+ pl->is_writer = false;
+ iocp->writer_count--;
+}
+
+/*
+ * Optimal depth will depend on properties of the NIC, server, and driver. For now,
+ * just distinguish between loopback interfaces and the rest. Optimizations in the
+ * loopback stack allow decent performance with depth 1 and actually cause major
+ * performance hiccups if set to large values.
+ */
+static void set_depth(write_pipeline_t *pl)
+{
+ pl->depth = 1;
+ sockaddr_storage sa;
+ socklen_t salen = sizeof(sa);
+ char buf[INET6_ADDRSTRLEN];
+ DWORD buflen = sizeof(buf);
+
+ if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 &&
+ getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) {
+ if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) ||
+ (sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) {
+ // not loopback
+ pl->depth = IOCP_MAX_OWRITES;
+ } else {
+ iocp_t *iocp = pl->iocpd->iocp;
+ if (iocp->loopback_bufsize) {
+ const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize);
+ if (p) {
+ pl->primary->buffer.start = p;
+ pl->primary->buffer.size = iocp->loopback_bufsize;
+ }
+ }
+ }
+ }
+}
+
+// Reserve as many buffers as possible for count bytes.
+size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count)
+{
+ if (pl->primary->in_use)
+ return 0; // I.e. io->wouldblock
+ if (!pl->depth)
+ set_depth(pl);
+ if (pl->depth == 1) {
+ // always use the primary
+ pl->reserved_count = 1;
+ pl->next_primary_index = 0;
+ return 1;
+ }
+
+ iocp_t *iocp = pl->iocpd->iocp;
+ confirm_as_writer(pl);
+ int wanted = (count / IOCP_WBUFSIZE);
+ if (count % IOCP_WBUFSIZE)
+ wanted++;
+ size_t pending = pl->pending_count;
+ assert(pending < pl->depth);
+ int bufs = pn_min(wanted, pl->depth - pending);
+ // Can draw from shared pool or the primary... but share with others.
+ size_t writers = iocp->writer_count;
+ int shared_count = (iocp->shared_available_count + writers - 1) / writers;
+ bufs = pn_min(bufs, shared_count + 1);
+ pl->reserved_count = pending + bufs;
+
+ if (bufs == wanted &&
+ pl->reserved_count < (pl->depth / 2) &&
+ iocp->shared_available_count > (2 * writers + bufs)) {
+ // No shortage: keep the primary as spare for future use
+ pl->next_primary_index = pl->reserved_count;
+ } else if (bufs == 1) {
+ pl->next_primary_index = pending;
+ } else {
+ // let approx 1/3 drain before replenishing
+ pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1;
+ if (pl->next_primary_index < pending)
+ pl->next_primary_index = pending;
+ }
+ return bufs;
+}
+
+write_result_t *pni_write_pipeline_next(write_pipeline_t *pl)
+{
+ size_t sz = pl->pending_count;
+ if (sz >= pl->reserved_count)
+ return NULL;
+ write_result_t *result;
+ if (sz == pl->next_primary_index) {
+ result = pl->primary;
+ } else {
+ assert(pl->iocpd->iocp->shared_available_count > 0);
+ result = shared_pool_pop(pl->iocpd->iocp);
+ }
+
+ result->in_use = true;
+ pl->pending_count++;
+ return result;
+}
+
+void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result)
+{
+ result->in_use = false;
+ pl->pending_count--;
+ pl->reserved_count = 0;
+ if (result != pl->primary)
+ shared_pool_push(result);
+ if (pl->pending_count == 0)
+ remove_as_writer(pl);
+}
+
+bool pni_write_pipeline_writable(write_pipeline_t *pl)
+{
+ // Only writable if not full and we can guarantee a buffer:
+ return pl->pending_count < pl->depth && !pl->primary->in_use;
+}
+
+size_t pni_write_pipeline_size(write_pipeline_t *pl)
+{
+ return pl->pending_count;
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/write_pipeline.c
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/Proton.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/Proton.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/Proton.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/Proton.java Sat Sep 6 11:23:10 2014
@@ -30,87 +30,58 @@ import org.apache.qpid.proton.amqp.messa
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.Codec;
import org.apache.qpid.proton.codec.Data;
-import org.apache.qpid.proton.codec.DataFactory;
import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.driver.DriverFactory;
+import org.apache.qpid.proton.engine.Engine;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.EngineFactory;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.SslPeerDetails;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.MessageFactory;
import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.MessengerFactory;
-
-import org.apache.qpid.proton.engine.impl.CollectorImpl;
public final class Proton
{
- public static ProtonFactory.ImplementationType ANY = ProtonFactory.ImplementationType.ANY;
- public static ProtonFactory.ImplementationType PROTON_C = ProtonFactory.ImplementationType.PROTON_C;
- public static ProtonFactory.ImplementationType PROTON_J = ProtonFactory.ImplementationType.PROTON_J;
-
-
- private static final MessengerFactory MESSENGER_FACTORY =
- (new ProtonFactoryLoader<MessengerFactory>(MessengerFactory.class)).loadFactory();
- private static final DriverFactory DRIVER_FACTORY =
- (new ProtonFactoryLoader<DriverFactory>(DriverFactory.class)).loadFactory();
- private static final MessageFactory MESSAGE_FACTORY =
- (new ProtonFactoryLoader<MessageFactory>(MessageFactory.class)).loadFactory();
- private static final DataFactory DATA_FACTORY =
- (new ProtonFactoryLoader<DataFactory>(DataFactory.class)).loadFactory();
- private static final EngineFactory ENGINE_FACTORY =
- (new ProtonFactoryLoader<EngineFactory>(EngineFactory.class)).loadFactory();
-
- private static final ProtonFactory.ImplementationType DEFAULT_IMPLEMENTATION =
- ProtonFactoryLoader.getImpliedImplementationType();
-
private Proton()
{
}
- public static ProtonFactory.ImplementationType getDefaultImplementationType()
- {
- return DEFAULT_IMPLEMENTATION;
- }
-
public static Collector collector()
{
- return new CollectorImpl();
+ return Engine.collector();
}
public static Connection connection()
{
- return ENGINE_FACTORY.createConnection();
+ return Engine.connection();
}
public static Transport transport()
{
- return ENGINE_FACTORY.createTransport();
+ return Engine.transport();
}
public static SslDomain sslDomain()
{
- return ENGINE_FACTORY.createSslDomain();
+ return Engine.sslDomain();
}
public static SslPeerDetails sslPeerDetails(String hostname, int port)
{
- return ENGINE_FACTORY.createSslPeerDetails(hostname, port);
+ return Engine.sslPeerDetails(hostname, port);
}
public static Data data(long capacity)
{
- return DATA_FACTORY.createData(capacity);
+ return Codec.data(capacity);
}
public static Message message()
{
- return MESSAGE_FACTORY.createMessage();
+ return Message.Factory.create();
}
public static Message message(Header header,
@@ -118,134 +89,25 @@ public final class Proton
Properties properties, ApplicationProperties applicationProperties,
Section body, Footer footer)
{
- return MESSAGE_FACTORY.createMessage(header, deliveryAnnotations,
- messageAnnotations, properties,
- applicationProperties, body, footer);
+ return Message.Factory.create(header, deliveryAnnotations,
+ messageAnnotations, properties,
+ applicationProperties, body, footer);
}
public static Messenger messenger()
{
- return MESSENGER_FACTORY.createMessenger();
+ return Messenger.Factory.create();
}
public static Messenger messenger(String name)
{
- return MESSENGER_FACTORY.createMessenger(name);
+ return Messenger.Factory.create(name);
}
public static Driver driver() throws IOException
{
- return DRIVER_FACTORY.createDriver();
- }
-
-
-
- public static Connection connection(ProtonFactory.ImplementationType implementation)
- {
- return getEngineFactory(implementation).createConnection();
- }
-
- public static Transport transport(ProtonFactory.ImplementationType implementation)
- {
- return getEngineFactory(implementation).createTransport();
- }
-
- public static SslDomain sslDomain(ProtonFactory.ImplementationType implementation)
- {
- return getEngineFactory(implementation).createSslDomain();
- }
-
- public static SslPeerDetails sslPeerDetails(ProtonFactory.ImplementationType implementation, String hostname, int port)
- {
- return getEngineFactory(implementation).createSslPeerDetails(hostname, port);
- }
-
- public static Data data(ProtonFactory.ImplementationType implementation, long capacity)
- {
- return getDataFactory(implementation).createData(capacity);
- }
-
- public static Message message(ProtonFactory.ImplementationType implementation)
- {
- return getMessageFactory(implementation).createMessage();
- }
-
- public static Message message(ProtonFactory.ImplementationType implementation, Header header,
- DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations,
- Properties properties, ApplicationProperties applicationProperties,
- Section body, Footer footer)
- {
- return getMessageFactory(implementation).createMessage(header, deliveryAnnotations,
- messageAnnotations, properties,
- applicationProperties, body, footer);
+ return Driver.Factory.create();
}
-
- public static Messenger messenger(ProtonFactory.ImplementationType implementation)
- {
- return getMessengerFactory(implementation).createMessenger();
- }
-
- public static Messenger messenger(ProtonFactory.ImplementationType implementation, String name)
- {
- return getMessengerFactory(implementation).createMessenger(name);
- }
-
- public static Driver driver(ProtonFactory.ImplementationType implementation) throws IOException
- {
- return getDriverFactory(implementation).createDriver();
- }
-
-
- private static final ConcurrentMap<ProtonFactory.ImplementationType, EngineFactory> _engineFactories =
- new ConcurrentHashMap<ProtonFactory.ImplementationType, EngineFactory>();
- private static final ConcurrentMap<ProtonFactory.ImplementationType, MessageFactory> _messageFactories =
- new ConcurrentHashMap<ProtonFactory.ImplementationType, MessageFactory>();
- private static final ConcurrentMap<ProtonFactory.ImplementationType, MessengerFactory> _messengerFactories =
- new ConcurrentHashMap<ProtonFactory.ImplementationType, MessengerFactory>();
- private static final ConcurrentMap<ProtonFactory.ImplementationType, DataFactory> _dataFactories =
- new ConcurrentHashMap<ProtonFactory.ImplementationType, DataFactory>();
- private static final ConcurrentMap<ProtonFactory.ImplementationType, DriverFactory> _driverFactories =
- new ConcurrentHashMap<ProtonFactory.ImplementationType, DriverFactory>();
-
- private static EngineFactory getEngineFactory(ProtonFactory.ImplementationType implementation)
- {
- return getFactory(EngineFactory.class, implementation, _engineFactories);
- }
-
- private static MessageFactory getMessageFactory(ProtonFactory.ImplementationType implementation)
- {
- return getFactory(MessageFactory.class, implementation, _messageFactories);
- }
-
- private static MessengerFactory getMessengerFactory(ProtonFactory.ImplementationType implementation)
- {
- return getFactory(MessengerFactory.class, implementation, _messengerFactories);
- }
-
- private static DriverFactory getDriverFactory(ProtonFactory.ImplementationType implementation)
- {
- return getFactory(DriverFactory.class, implementation, _driverFactories);
- }
-
- private static DataFactory getDataFactory(ProtonFactory.ImplementationType implementation)
- {
- return getFactory(DataFactory.class, implementation, _dataFactories);
- }
-
- private static <T extends ProtonFactory> T getFactory(Class<T> factoryClass, ProtonFactory.ImplementationType implementation,
- ConcurrentMap<ProtonFactory.ImplementationType, T> factories)
- {
- T factory = factories.get(implementation);
- if(factory == null)
- {
- factories.putIfAbsent(implementation, (new ProtonFactoryLoader<T>(factoryClass,implementation)).loadFactory());
- factory = factories.get(implementation);
-
- }
- return factory;
- }
-
-
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/DeliveryAnnotations.java Sat Sep 6 11:23:10 2014
@@ -25,17 +25,18 @@ package org.apache.qpid.proton.amqp.mess
import java.util.Map;
-public final class DeliveryAnnotations
- implements Section
+import org.apache.qpid.proton.amqp.Symbol;
+
+public final class DeliveryAnnotations implements Section
{
- private final Map _value;
+ private final Map<Symbol, Object> _value;
- public DeliveryAnnotations(Map value)
+ public DeliveryAnnotations(Map<Symbol, Object> value)
{
_value = value;
}
- public Map getValue()
+ public Map<Symbol, Object> getValue()
{
return _value;
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/MessageAnnotations.java Sat Sep 6 11:23:10 2014
@@ -23,20 +23,22 @@
package org.apache.qpid.proton.amqp.messaging;
+import org.apache.qpid.proton.amqp.Symbol;
+
import java.util.Map;
public final class MessageAnnotations implements Section
{
- private final Map _value;
+ private final Map<Symbol, Object> _value;
- public MessageAnnotations(Map value)
+ public MessageAnnotations(Map<Symbol, Object> value)
{
_value = value;
}
- public Map getValue()
+ public Map<Symbol, Object> getValue()
{
return _value;
}
Added: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.codec;
+
+/**
+ * Codec
+ *
+ */
+
+public final class Codec
+{
+
+ private Codec()
+ {
+ }
+
+ public static Data data(long capacity)
+ {
+ return Data.Factory.create();
+ }
+
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Codec.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/codec/Data.java Sat Sep 6 11:23:10 2014
@@ -37,9 +37,19 @@ import org.apache.qpid.proton.amqp.Unsig
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.codec.impl.DataImpl;
+
public interface Data
{
+ public static final class Factory {
+
+ public static Data create() {
+ return new DataImpl();
+ }
+
+ }
+
enum DataType
{
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/Driver.java Sat Sep 6 11:23:10 2014
@@ -21,8 +21,10 @@
package org.apache.qpid.proton.driver;
+import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.ServerSocketChannel;
+import org.apache.qpid.proton.driver.impl.DriverImpl;
/**
* A driver for the proton engine.
@@ -40,6 +42,14 @@ import java.nio.channels.ServerSocketCha
*/
public interface Driver
{
+
+ public static final class Factory
+ {
+ public static Driver create() throws IOException {
+ return new DriverImpl();
+ }
+ }
+
/**
* Force {@link #doWait(long)} to return.
*
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java Sat Sep 6 11:23:10 2014
@@ -32,7 +32,6 @@ import org.apache.qpid.proton.engine.Con
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
-import org.apache.qpid.proton.engine.impl.TransportFactory;
class ConnectorImpl<C> implements Connector<C>
{
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java Sat Sep 6 11:23:10 2014
@@ -53,7 +53,7 @@ public class DriverImpl implements Drive
private Queue<ConnectorImpl> _selectedConnectors = new ArrayDeque<ConnectorImpl>();
private Queue<ListenerImpl> _selectedListeners = new ArrayDeque<ListenerImpl>();
- DriverImpl() throws IOException
+ public DriverImpl() throws IOException
{
_selector = Selector.open();
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java Sat Sep 6 11:23:10 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.proton.engine;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
/**
* Collector
@@ -29,6 +30,13 @@ package org.apache.qpid.proton.engine;
public interface Collector
{
+ public static final class Factory
+ {
+ public static Collector create() {
+ return new CollectorImpl();
+ }
+ }
+
Event peek();
void pop();
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java Sat Sep 6 11:23:10 2014
@@ -24,6 +24,8 @@ import java.util.EnumSet;
import java.util.Map;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+
/**
* Maintains lists of sessions, links and deliveries in a state
@@ -36,6 +38,13 @@ import org.apache.qpid.proton.amqp.Symbo
public interface Connection extends Endpoint
{
+ public static final class Factory
+ {
+ public static Connection create() {
+ return new ConnectionImpl();
+ }
+ }
+
/**
* Returns a newly created session
*
@@ -81,6 +90,8 @@ public interface Connection extends Endp
public void setHostname(String hostname);
+ public String getHostname();
+
public String getRemoteContainer();
public String getRemoteHostname();
Added: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.engine;
+
+/**
+ * Engine
+ *
+ */
+
+public final class Engine
+{
+
+ private Engine()
+ {
+ }
+
+ public static Collector collector()
+ {
+ return Collector.Factory.create();
+ }
+
+ public static Connection connection()
+ {
+ return Connection.Factory.create();
+ }
+
+ public static Transport transport()
+ {
+ return Transport.Factory.create();
+ }
+
+ public static SslDomain sslDomain()
+ {
+ return SslDomain.Factory.create();
+ }
+
+ public static SslPeerDetails sslPeerDetails(String hostname, int port)
+ {
+ return SslPeerDetails.Factory.create(hostname, port);
+ }
+
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Engine.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java Sat Sep 6 11:23:10 2014
@@ -29,19 +29,38 @@ package org.apache.qpid.proton.engine;
public interface Event
{
public enum Category {
- PROTOCOL;
+ CONNECTION,
+ SESSION,
+ LINK,
+ DELIVERY,
+ TRANSPORT;
}
public enum Type {
- CONNECTION_REMOTE_STATE(Category.PROTOCOL, 1),
- CONNECTION_LOCAL_STATE(Category.PROTOCOL, 2),
- SESSION_REMOTE_STATE(Category.PROTOCOL, 3),
- SESSION_LOCAL_STATE(Category.PROTOCOL, 4),
- LINK_REMOTE_STATE(Category.PROTOCOL, 5),
- LINK_LOCAL_STATE(Category.PROTOCOL, 6),
- LINK_FLOW(Category.PROTOCOL, 7),
- DELIVERY(Category.PROTOCOL, 8),
- TRANSPORT(Category.PROTOCOL, 9);
+ CONNECTION_INIT(Category.CONNECTION, 1),
+ CONNECTION_OPEN(Category.CONNECTION, 2),
+ CONNECTION_REMOTE_OPEN(Category.CONNECTION, 3),
+ CONNECTION_CLOSE(Category.CONNECTION, 4),
+ CONNECTION_REMOTE_CLOSE(Category.CONNECTION, 5),
+ CONNECTION_FINAL(Category.CONNECTION, 6),
+
+ SESSION_INIT(Category.SESSION, 1),
+ SESSION_OPEN(Category.SESSION, 2),
+ SESSION_REMOTE_OPEN(Category.SESSION, 3),
+ SESSION_CLOSE(Category.SESSION, 4),
+ SESSION_REMOTE_CLOSE(Category.SESSION, 5),
+ SESSION_FINAL(Category.SESSION, 6),
+
+ LINK_INIT(Category.LINK, 1),
+ LINK_OPEN(Category.LINK, 2),
+ LINK_REMOTE_OPEN(Category.LINK, 3),
+ LINK_CLOSE(Category.LINK, 4),
+ LINK_REMOTE_CLOSE(Category.LINK, 5),
+ LINK_FLOW(Category.LINK, 6),
+ LINK_FINAL(Category.LINK, 7),
+
+ DELIVERY(Category.DELIVERY, 1),
+ TRANSPORT(Category.TRANSPORT, 1);
private int _opcode;
private Category _category;
@@ -72,4 +91,6 @@ public interface Event
Transport getTransport();
+ Event copy();
+
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Sasl.java Sat Sep 6 11:23:10 2014
@@ -49,7 +49,8 @@ public interface Sasl
PN_SASL_SYS((byte)2),
/** failed due to unrecoverable error */
PN_SASL_PERM((byte)3),
- PN_SASL_TEMP((byte)4);
+ PN_SASL_TEMP((byte)4),
+ PN_SASL_SKIPPED((byte)5);
private final byte _code;
@@ -72,6 +73,7 @@ public interface Sasl
public static SaslOutcome PN_SASL_SYS = SaslOutcome.PN_SASL_SYS;
public static SaslOutcome PN_SASL_PERM = SaslOutcome.PN_SASL_PERM;
public static SaslOutcome PN_SASL_TEMP = SaslOutcome.PN_SASL_TEMP;
+ public static SaslOutcome PN_SASL_SKIPPED = SaslOutcome.PN_SASL_SKIPPED;
/**
* Access the current state of the layer.
@@ -156,4 +158,10 @@ public interface Sasl
void client();
void server();
+
+ /**
+ * Set whether servers may accept incoming connections
+ * that skip the SASL layer negotiation.
+ */
+ void allowSkip(boolean allowSkip);
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslDomain.java Sat Sep 6 11:23:10 2014
@@ -18,11 +18,21 @@
*/
package org.apache.qpid.proton.engine;
+import org.apache.qpid.proton.engine.impl.ssl.SslDomainImpl;
+
/**
* I store the details used to create SSL sessions.
*/
public interface SslDomain
{
+
+ public static final class Factory
+ {
+ public static SslDomain create() {
+ return new SslDomainImpl();
+ }
+ }
+
/**
* Determines whether the endpoint acts as a client or server.
*/
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/SslPeerDetails.java Sat Sep 6 11:23:10 2014
@@ -18,6 +18,8 @@
*/
package org.apache.qpid.proton.engine;
+import org.apache.qpid.proton.engine.impl.ssl.SslPeerDetailsImpl;
+
/**
* The details of the remote peer involved in an SSL session.
*
@@ -28,6 +30,14 @@ package org.apache.qpid.proton.engine;
*/
public interface SslPeerDetails
{
+
+ public static final class Factory
+ {
+ public static SslPeerDetails create(String hostname, int port) {
+ return new SslPeerDetailsImpl(hostname, port);
+ }
+ }
+
String getHostname();
int getPort();
-}
\ No newline at end of file
+}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java Sat Sep 6 11:23:10 2014
@@ -22,6 +22,8 @@ package org.apache.qpid.proton.engine;
import java.nio.ByteBuffer;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+
/**
* <p>
@@ -63,6 +65,19 @@ import java.nio.ByteBuffer;
*/
public interface Transport extends Endpoint
{
+
+ public static final class Factory
+ {
+ public static Transport create() {
+ return new TransportImpl();
+ }
+ }
+
+ public static final int TRACE_OFF = 0;
+ public static final int TRACE_RAW = 1;
+ public static final int TRACE_FRM = 2;
+ public static final int TRACE_DRV = 4;
+
public static final int DEFAULT_MAX_FRAME_SIZE = -1;
/** the lower bound for the agreed maximum frame size (in bytes). */
@@ -70,7 +85,10 @@ public interface Transport extends Endpo
public int SESSION_WINDOW = 16*1024;
public int END_OF_STREAM = -1;
+ public void trace(int levels);
+
public void bind(Connection connection);
+ public void unbind();
public int capacity();
public ByteBuffer tail();
@@ -83,6 +101,8 @@ public interface Transport extends Endpo
public void pop(int bytes);
public void close_head();
+ public boolean isClosed();
+
/**
* Processes the provided input.
*
@@ -156,7 +176,15 @@ public interface Transport extends Endpo
*/
void outputConsumed();
- Sasl sasl();
+ /**
+ * Signal the transport to expect SASL frames used to establish a SASL layer prior to
+ * performing the AMQP protocol version negotiation. This must first be performed before
+ * the transport is used for processing. Subsequent invocations will return the same
+ * {@link Sasl} object.
+ *
+ * @throws IllegalStateException if transport processing has already begun prior to initial invocation
+ */
+ Sasl sasl() throws IllegalStateException;
/**
* Wrap this transport's output and input to apply SSL encryption and decryption respectively.
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java Sat Sep 6 11:23:10 2014
@@ -35,25 +35,55 @@ import java.util.Queue;
public class CollectorImpl implements Collector
{
- private Queue<Event> events = new LinkedList<Event>();
+ private EventImpl head;
+ private EventImpl tail;
+ private EventImpl free;
public CollectorImpl()
{}
public Event peek()
{
- return events.peek();
+ return head;
}
public void pop()
{
- events.poll();
+ if (head != null) {
+ EventImpl next = head.next;
+ head.next = free;
+ free = head;
+ head.clear();
+ head = next;
+ }
}
- public EventImpl put(Event.Type type)
+ public EventImpl put(Event.Type type, Object context)
{
- EventImpl event = new EventImpl(type);
- events.add(event);
+ if (tail != null && tail.getType() == type &&
+ tail.getContext() == context) {
+ return null;
+ }
+
+ EventImpl event;
+ if (free == null) {
+ event = new EventImpl();
+ } else {
+ event = free;
+ free = free.next;
+ event.next = null;
+ }
+
+ event.init(type, context);
+
+ if (head == null) {
+ head = event;
+ tail = event;
+ } else {
+ tail.next = event;
+ tail = event;
+ }
+
return event;
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java Sat Sep 6 11:23:10 2014
@@ -52,6 +52,7 @@ public class ConnectionImpl extends Endp
private TransportImpl _transport;
private DeliveryImpl _transportWorkHead;
private DeliveryImpl _transportWorkTail;
+ private int _transportWorkSize = 0;
private String _localContainerId = "";
private String _localHostname = "";
private String _remoteContainer;
@@ -182,16 +183,30 @@ public class ConnectionImpl extends Endp
return this;
}
- public void free()
- {
- super.free();
- for(Session session : _sessions)
- {
+ @Override
+ void postFinal() {
+ put(Event.Type.CONNECTION_FINAL, this);
+ }
+
+ @Override
+ void doFree() {
+ for(Session session : _sessions) {
session.free();
}
_sessions = null;
}
+ void modifyEndpoints() {
+ if (_sessions != null) {
+ for (SessionImpl ssn: _sessions) {
+ ssn.modifyEndpoints();
+ }
+ }
+ if (!freed) {
+ modified();
+ }
+ }
+
void handleOpen(Open open)
{
// TODO - store state
@@ -201,10 +216,7 @@ public class ConnectionImpl extends Endp
setRemoteDesiredCapabilities(open.getDesiredCapabilities());
setRemoteOfferedCapabilities(open.getOfferedCapabilities());
setRemoteProperties(open.getProperties());
- EventImpl ev = put(Event.Type.CONNECTION_REMOTE_STATE);
- if (ev != null) {
- ev.init(this);
- }
+ put(Event.Type.CONNECTION_REMOTE_OPEN, this);
}
@@ -489,6 +501,10 @@ public class ConnectionImpl extends Endp
return _transportWorkHead;
}
+ int getTransportWorkSize() {
+ return _transportWorkSize;
+ }
+
public void removeTransportWork(DeliveryImpl delivery)
{
if (!delivery._transportWork) return;
@@ -517,6 +533,7 @@ public class ConnectionImpl extends Endp
}
delivery._transportWork = false;
+ _transportWorkSize--;
}
void addTransportWork(DeliveryImpl delivery)
@@ -538,6 +555,7 @@ public class ConnectionImpl extends Endp
}
delivery._transportWork = true;
+ _transportWorkSize++;
}
void workUpdate(DeliveryImpl delivery)
@@ -571,23 +589,40 @@ public class ConnectionImpl extends Endp
public void collect(Collector collector)
{
_collector = (CollectorImpl) collector;
+
+ put(Event.Type.CONNECTION_INIT, this);
+
+ LinkNode<SessionImpl> ssn = _sessionHead;
+ while (ssn != null) {
+ put(Event.Type.SESSION_INIT, ssn.getValue());
+ ssn = ssn.getNext();
+ }
+
+ LinkNode<LinkImpl> lnk = _linkHead;
+ while (lnk != null) {
+ put(Event.Type.LINK_INIT, lnk.getValue());
+ lnk = lnk.getNext();
+ }
}
- EventImpl put(Event.Type type)
+ EventImpl put(Event.Type type, Object context)
{
if (_collector != null) {
- return _collector.put(type);
+ return _collector.put(type, context);
} else {
return null;
}
}
@Override
- protected void localStateChanged()
+ void localOpen()
{
- EventImpl ev = put(Event.Type.CONNECTION_LOCAL_STATE);
- if (ev != null) {
- ev.init(this);
- }
+ put(Event.Type.CONNECTION_OPEN, this);
+ }
+
+ @Override
+ void localClose()
+ {
+ put(Event.Type.CONNECTION_CLOSE, this);
}
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java Sat Sep 6 11:23:10 2014
@@ -37,7 +37,27 @@ public abstract class EndpointImpl imple
private EndpointImpl _transportPrev;
private Object _context;
- protected abstract void localStateChanged();
+ private int refcount = 1;
+ boolean freed = false;
+
+ void incref() {
+ refcount++;
+ }
+
+ void decref() {
+ refcount--;
+ if (refcount == 0) {
+ postFinal();
+ } else if (refcount < 0) {
+ throw new IllegalStateException();
+ }
+ }
+
+ abstract void postFinal();
+
+ abstract void localOpen();
+
+ abstract void localClose();
public void open()
{
@@ -49,7 +69,7 @@ public abstract class EndpointImpl imple
// TODO
case UNINITIALIZED:
_localState = EndpointState.ACTIVE;
- localStateChanged();
+ localOpen();
}
modified();
}
@@ -65,7 +85,7 @@ public abstract class EndpointImpl imple
// TODO
case ACTIVE:
_localState = EndpointState.CLOSED;
- localStateChanged();
+ localClose();
}
modified();
}
@@ -129,9 +149,9 @@ public abstract class EndpointImpl imple
if (emit) {
ConnectionImpl conn = getConnectionImpl();
- EventImpl ev = conn.put(Event.Type.TRANSPORT);
- if (ev != null) {
- ev.init(conn);
+ TransportImpl trans = conn.getTransport();
+ if (trans != null) {
+ conn.put(Event.Type.TRANSPORT, trans);
}
}
}
@@ -162,16 +182,15 @@ public abstract class EndpointImpl imple
return _transportPrev;
}
- public void free()
+ abstract void doFree();
+
+ final public void free()
{
- if(_transportNext != null)
- {
- _transportNext.setTransportPrev(_transportPrev);
- }
- if(_transportPrev != null)
- {
- _transportPrev.setTransportNext(_transportNext);
- }
+ if (freed) return;
+ freed = true;
+
+ doFree();
+ decref();
}
void setTransportNext(EndpointImpl transportNext)
@@ -194,9 +213,4 @@ public abstract class EndpointImpl imple
_context = context;
}
- @Override
- public String toString()
- {
- return "EndpointImpl(" + System.identityHashCode(this) + ") [_localState=" + _localState + ", _remoteState=" + _remoteState + ", _localError=" + _localError + ", _remoteError=" + _remoteError + "]";
- }
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java Sat Sep 6 11:23:10 2014
@@ -36,15 +36,24 @@ class EventImpl implements Event
{
Type type;
- Connection connection;
- Session session;
- Link link;
- Delivery delivery;
- Transport transport;
+ Object context;
+ EventImpl next;
- EventImpl(Type type)
+ EventImpl()
+ {
+ this.type = null;
+ }
+
+ void init(Event.Type type, Object context)
{
this.type = type;
+ this.context = context;
+ }
+
+ void clear()
+ {
+ type = null;
+ context = null;
}
public Category getCategory()
@@ -57,58 +66,88 @@ class EventImpl implements Event
return type;
}
+ public Object getContext()
+ {
+ return context;
+ }
+
public Connection getConnection()
{
- return connection;
+ switch (type.getCategory()) {
+ case CONNECTION:
+ return (Connection) context;
+ case TRANSPORT:
+ Transport transport = getTransport();
+ if (transport == null) {
+ return null;
+ }
+ return ((TransportImpl) transport).getConnectionImpl();
+ default:
+ Session ssn = getSession();
+ if (ssn == null) {
+ return null;
+ }
+ return ssn.getConnection();
+ }
}
public Session getSession()
{
- return session;
+ switch (type.getCategory()) {
+ case SESSION:
+ return (Session) context;
+ default:
+ Link link = getLink();
+ if (link == null) {
+ return null;
+ }
+ return link.getSession();
+ }
}
public Link getLink()
{
- return link;
+ switch (type.getCategory()) {
+ case LINK:
+ return (Link) context;
+ default:
+ Delivery dlv = getDelivery();
+ if (dlv == null) {
+ return null;
+ }
+ return dlv.getLink();
+ }
}
public Delivery getDelivery()
{
- return delivery;
+ switch (type.getCategory()) {
+ case DELIVERY:
+ return (Delivery) context;
+ default:
+ return null;
+ }
}
public Transport getTransport()
{
- return transport;
- }
-
- void init(Transport transport)
- {
- this.transport = transport;
- }
-
- void init(Connection connection)
- {
- this.connection = connection;
- init(((ConnectionImpl) connection).getTransport());
+ switch (type.getCategory()) {
+ case TRANSPORT:
+ return (Transport) context;
+ default:
+ return null;
+ }
}
-
- void init(Session session)
+ public Event copy()
{
- this.session = session;
- init(session.getConnection());
+ EventImpl newEvent = new EventImpl();
+ newEvent.init(type, context);
+ return newEvent;
}
- void init(Link link)
+ @Override
+ public String toString()
{
- this.link = link;
- init(link.getSession());
+ return "EventImpl{" + "type=" + type + ", context=" + context + '}';
}
-
- void init(Delivery delivery)
- {
- this.delivery = delivery;
- init(delivery.getLink());
- }
-
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java Sat Sep 6 11:23:10 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.proton.engine.impl;
+import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.framing.TransportFrame;
public interface FrameHandler
@@ -31,7 +32,7 @@ public interface FrameHandler
*/
boolean handleFrame(TransportFrame frame);
- void closed();
+ void closed(TransportException error);
/**
* Returns whether I am currently able to handle frames.
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java Sat Sep 6 11:23:10 2014
@@ -40,6 +40,8 @@ class FrameParser implements TransportIn
{
private static final Logger TRACE_LOGGER = Logger.getLogger("proton.trace");
+ private static final ByteBuffer _emptyInputBuffer = newWriteableBuffer(0);
+
private enum State
{
HEADER0,
@@ -62,8 +64,9 @@ class FrameParser implements TransportIn
private final FrameHandler _frameHandler;
private final ByteBufferDecoder _decoder;
+ private final int _maxFrameSize;
- private final ByteBuffer _inputBuffer;
+ private ByteBuffer _inputBuffer = null;
private boolean _tail_closed = false;
private State _state = State.HEADER0;
@@ -87,11 +90,7 @@ class FrameParser implements TransportIn
{
_frameHandler = frameHandler;
_decoder = decoder;
- if (maxFrameSize > 0) {
- _inputBuffer = newWriteableBuffer(maxFrameSize);
- } else {
- _inputBuffer = newWriteableBuffer(4*1024);
- }
+ _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024;
}
private void input(ByteBuffer in) throws TransportException
@@ -372,6 +371,7 @@ class FrameParser implements TransportIn
_decoder.setByteBuffer(in);
Object val = _decoder.readObject();
+ _decoder.setByteBuffer(null);
Binary payload;
@@ -447,7 +447,7 @@ class FrameParser implements TransportIn
state = State.ERROR;
frameParsingError = new TransportException("connection aborted");
} else {
- _frameHandler.closed();
+ _frameHandler.closed(null);
}
}
@@ -460,7 +460,7 @@ class FrameParser implements TransportIn
if(frameParsingError != null)
{
_parsingError = frameParsingError;
- throw frameParsingError;
+ _frameHandler.closed(frameParsingError);
}
else
{
@@ -475,7 +475,11 @@ class FrameParser implements TransportIn
if (_tail_closed) {
return Transport.END_OF_STREAM;
} else {
- return _inputBuffer.remaining();
+ if (_inputBuffer != null) {
+ return _inputBuffer.remaining();
+ } else {
+ return _maxFrameSize;
+ }
}
}
@@ -483,27 +487,41 @@ class FrameParser implements TransportIn
public ByteBuffer tail()
{
if (_tail_closed) {
- if (_parsingError != null) {
- throw new TransportException(_parsingError.getMessage());
- } else {
- throw new TransportException("tail closed");
- }
+ throw new TransportException("tail closed");
+ }
+
+ if (_inputBuffer == null) {
+ _inputBuffer = newWriteableBuffer(_maxFrameSize);
}
+
return _inputBuffer;
}
@Override
public void process() throws TransportException
{
- _inputBuffer.flip();
-
- try
+ if (_inputBuffer != null)
{
- input(_inputBuffer);
+ _inputBuffer.flip();
+
+ try
+ {
+ input(_inputBuffer);
+ }
+ finally
+ {
+ if (_inputBuffer.hasRemaining()) {
+ _inputBuffer.compact();
+ } else if (_inputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) {
+ _inputBuffer = null;
+ } else {
+ _inputBuffer.clear();
+ }
+ }
}
- finally
+ else
{
- _inputBuffer.compact();
+ input(_emptyInputBuffer);
}
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java Sat Sep 6 11:23:10 2014
@@ -45,15 +45,15 @@ class FrameWriter
private WritableBuffer _buffer;
private int _maxFrameSize;
private byte _frameType;
- private ProtocolTracer _protocolTracer;
- private Object _logCtx;
+ final private Ref<ProtocolTracer> _protocolTracer;
+ private TransportImpl _transport;
private int _frameStart = 0;
private int _payloadStart;
private int _performativeSize;
FrameWriter(EncoderImpl encoder, int maxFrameSize, byte frameType,
- ProtocolTracer protocolTracer, Object logCtx)
+ Ref<ProtocolTracer> protocolTracer, TransportImpl transport)
{
_encoder = encoder;
_bbuf = ByteBuffer.allocate(1024);
@@ -62,7 +62,7 @@ class FrameWriter
_maxFrameSize = maxFrameSize;
_frameType = frameType;
_protocolTracer = protocolTracer;
- _logCtx = logCtx;
+ _transport = transport;
}
void setMaxFrameSize(int maxFrameSize)
@@ -155,11 +155,12 @@ class FrameWriter
// code, further refactor will fix this
if (_frameType == AMQP_FRAME_TYPE) {
TransportFrame frame = new TransportFrame(channel, (FrameBody) frameBody, Binary.create(originalPayload));
- TransportImpl.log(_logCtx, TransportImpl.OUTGOING, frame);
+ _transport.log(TransportImpl.OUTGOING, frame);
- if( _protocolTracer!=null )
+ ProtocolTracer tracer = _protocolTracer.get();
+ if(tracer != null)
{
- _protocolTracer.sentFrame(frame);
+ tracer.sentFrame(frame);
}
}
@@ -191,6 +192,11 @@ class FrameWriter
writeFrame(0, frameBody, null, null);
}
+ boolean isFull() {
+ // XXX: this should probably be tunable
+ return _bbuf.position() > 64*1024;
+ }
+
int readBytes(ByteBuffer dst)
{
ByteBuffer src = _bbuf.duplicate();
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Sat Sep 6 11:23:10 2014
@@ -53,14 +53,17 @@ public abstract class LinkImpl extends E
private ReceiverSettleMode _remoteReceiverSettleMode;
- private final LinkNode<LinkImpl> _node;
+ private LinkNode<LinkImpl> _node;
private boolean _drain;
LinkImpl(SessionImpl session, String name)
{
_session = session;
+ _session.incref();
_name = name;
- _node = session.getConnectionImpl().addLinkEndpoint(this);
+ ConnectionImpl conn = session.getConnectionImpl();
+ _node = conn.addLinkEndpoint(this);
+ conn.put(Event.Type.LINK_INIT, this);
}
@@ -103,11 +106,21 @@ public abstract class LinkImpl extends E
}
}
- public void free()
+ @Override
+ void postFinal() {
+ _session.getConnectionImpl().put(Event.Type.LINK_FINAL, this);
+ _session.decref();
+ }
+
+ @Override
+ void doFree()
{
- super.free();
_session.getConnectionImpl().removeLinkEndpoint(_node);
- //TODO.
+ _node = null;
+ }
+
+ void modifyEndpoints() {
+ modified();
}
public void remove(DeliveryImpl delivery)
@@ -375,11 +388,14 @@ public abstract class LinkImpl extends E
}
@Override
- protected void localStateChanged()
+ void localOpen()
{
- EventImpl ev = getConnectionImpl().put(Event.Type.LINK_LOCAL_STATE);
- if (ev != null) {
- ev.init(this);
- }
+ getConnectionImpl().put(Event.Type.LINK_OPEN, this);
+ }
+
+ @Override
+ void localClose()
+ {
+ getConnectionImpl().put(Event.Type.LINK_CLOSE, this);
}
}
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java Sat Sep 6 11:23:10 2014
@@ -96,12 +96,11 @@ public class ReceiverImpl extends LinkIm
return consumed;
}
- public void free()
+ @Override
+ void doFree()
{
getSession().freeReceiver(this);
-
- super.free();
- //TODO.
+ super.doFree();
}
boolean hasIncoming()
Added: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java?rev=1622849&view=auto
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java (added)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java Sat Sep 6 11:23:10 2014
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.engine.impl;
+
+
+/**
+ * Ref
+ *
+ */
+
+class Ref<T>
+{
+
+ T value;
+
+ public Ref(T initial) {
+ value = initial;
+ }
+
+ public T get() {
+ return value;
+ }
+
+ public void set(T value) {
+ this.value = value;
+ }
+
+}
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java?rev=1622849&r1=1622848&r2=1622849&view=diff
==============================================================================
--- qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java (original)
+++ qpid/proton/branches/fadams-javascript-binding/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java Sat Sep 6 11:23:10 2014
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.qpid.proton.ProtonUnsupportedOperationException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.security.SaslChallenge;
@@ -53,6 +54,8 @@ public class SaslImpl implements Sasl, S
private final DecoderImpl _decoder = new DecoderImpl();
private final EncoderImpl _encoder = new EncoderImpl(_decoder);
+ private final TransportImpl _transport;
+
private boolean _tail_closed = false;
private final ByteBuffer _inputBuffer;
private boolean _head_closed = false;
@@ -86,14 +89,15 @@ public class SaslImpl implements Sasl, S
* returned by {@link SaslTransportWrapper#getInputBuffer()} and
* {@link SaslTransportWrapper#getOutputBuffer()}.
*/
- SaslImpl(int maxFrameSize)
+ SaslImpl(TransportImpl transport, int maxFrameSize)
{
+ _transport = transport;
_inputBuffer = newWriteableBuffer(maxFrameSize);
_outputBuffer = newWriteableBuffer(maxFrameSize);
AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
_frameParser = new SaslFrameParser(this, _decoder);
- _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, this);
+ _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, _transport);
}
@Override
@@ -460,6 +464,13 @@ public class SaslImpl implements Sasl, S
_role = Role.SERVER;
}
+ @Override
+ public void allowSkip(boolean allowSkip)
+ {
+ //TODO: implement
+ throw new ProtonUnsupportedOperationException();
+ }
+
public TransportWrapper wrap(final TransportInput input, final TransportOutput output)
{
return new SaslTransportWrapper(input, output);
@@ -565,6 +576,9 @@ public class SaslImpl implements Sasl, S
_tail_closed = true;
if (isInputInSaslMode()) {
_head_closed = true;
+ _underlyingInput.close_tail();
+ } else {
+ _underlyingInput.close_tail();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org