You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/25 21:00:59 UTC

[03/48] qpid-proton git commit: PROTON-1350 PROTON-1351: Introduce proton-c core library - Created new core proton library qpid-proton-core which only contains protocol processsing and no IO. - Rearranged source tree to separate core protocol code and

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/windows/selector.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/selector.c b/proton-c/src/windows/selector.c
deleted file mode 100644
index f139aec..0000000
--- a/proton-c/src/windows/selector.c
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-#if _WIN32_WINNT < 0x0501
-#error "Proton requires Windows API support for XP or later."
-#endif
-#include <winsock2.h>
-#include <Ws2tcpip.h>
-
-#include "platform.h"
-#include <proton/object.h>
-#include <proton/io.h>
-#include <proton/selector.h>
-#include <proton/error.h>
-#include <assert.h>
-#include "selectable.h"
-#include "util.h"
-#include "iocp.h"
-
-static void interests_update(iocpdesc_t *iocpd, int interests);
-static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t);
-
-struct pn_selector_t {
-  iocp_t *iocp;
-  pn_list_t *selectables;
-  pn_list_t *iocp_descriptors;
-  size_t current;
-  iocpdesc_t *current_triggered;
-  pn_timestamp_t awoken;
-  pn_error_t *error;
-  iocpdesc_t *triggered_list_head;
-  iocpdesc_t *triggered_list_tail;
-  iocpdesc_t *deadlines_head;
-  iocpdesc_t *deadlines_tail;
-};
-
-void pn_selector_initialize(void *obj)
-{
-  pn_selector_t *selector = (pn_selector_t *) obj;
-  selector->iocp = NULL;
-  selector->selectables = pn_list(PN_WEAKREF, 0);
-  selector->iocp_descriptors = pn_list(PN_OBJECT, 0);
-  selector->current = 0;
-  selector->current_triggered = NULL;
-  selector->awoken = 0;
-  selector->error = pn_error();
-  selector->triggered_list_head = NULL;
-  selector->triggered_list_tail = NULL;
-  selector->deadlines_head = NULL;
-  selector->deadlines_tail = NULL;
-}
-
-void pn_selector_finalize(void *obj)
-{
-  pn_selector_t *selector = (pn_selector_t *) obj;
-  pn_free(selector->selectables);
-  pn_free(selector->iocp_descriptors);
-  pn_error_free(selector->error);
-  selector->iocp->selector = NULL;
-}
-
-#define pn_selector_hashcode NULL
-#define pn_selector_compare NULL
-#define pn_selector_inspect NULL
-
-pn_selector_t *pni_selector()
-{
-  static const pn_class_t clazz = PN_CLASS(pn_selector);
-  pn_selector_t *selector = (pn_selector_t *) pn_class_new(&clazz, sizeof(pn_selector_t));
-  return selector;
-}
-
-pn_selector_t *pni_selector_create(iocp_t *iocp)
-{
-  pn_selector_t *selector = pni_selector();
-  selector->iocp = iocp;
-  return selector;
-}
-
-void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable)
-{
-  assert(selector);
-  assert(selectable);
-  assert(pni_selectable_get_index(selectable) < 0);
-  pn_socket_t sock = pn_selectable_get_fd(selectable);
-  iocpdesc_t *iocpd = NULL;
-
-  if (pni_selectable_get_index(selectable) < 0) {
-    pn_list_add(selector->selectables, selectable);
-    pn_list_add(selector->iocp_descriptors, NULL);
-    size_t size = pn_list_size(selector->selectables);
-    pni_selectable_set_index(selectable, size - 1);
-  }
-
-  pn_selector_update(selector, selectable);
-}
-
-void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable)
-{
-  // A selectable's fd may switch from PN_INVALID_SCOKET to a working socket between
-  // update calls.  If a selectable without a valid socket has a deadline, we need
-  // a dummy iocpdesc_t to participate in the deadlines list.
-  int idx = pni_selectable_get_index(selectable);
-  assert(idx >= 0);
-  pn_timestamp_t deadline = pn_selectable_get_deadline(selectable);
-  pn_socket_t sock = pn_selectable_get_fd(selectable);
-  iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
-
-  if (!iocpd && deadline && sock == PN_INVALID_SOCKET) {
-    iocpd = pni_deadline_desc(selector->iocp);
-    assert(iocpd);
-    pn_list_set(selector->iocp_descriptors, idx, iocpd);
-    pn_decref(iocpd);  // life is solely tied to iocp_descriptors list
-    iocpd->selector = selector;
-    iocpd->selectable = selectable;
-  }
-  else if (iocpd && iocpd->deadline_desc && sock != PN_INVALID_SOCKET) {
-    // Switching to a real socket.  Stop using a deadline descriptor.
-    deadlines_update(iocpd, 0);
-    // decref descriptor in list and pick up a real iocpd below
-    pn_list_set(selector->iocp_descriptors, idx, NULL);
-    iocpd = NULL;
-  }
-
-  // The selectables socket may be set long after it has been added
-  if (!iocpd && sock != PN_INVALID_SOCKET) {
-    iocpd = pni_iocpdesc_map_get(selector->iocp, sock);
-    if (!iocpd) {
-      // Socket created outside proton.  Hook it up to iocp.
-      iocpd = pni_iocpdesc_create(selector->iocp, sock, true);
-      assert(iocpd);
-      if (iocpd)
-        pni_iocpdesc_start(iocpd);
-    }
-    if (iocpd) {
-      pn_list_set(selector->iocp_descriptors, idx, iocpd);
-      iocpd->selector = selector;
-      iocpd->selectable = selectable;
-    }
-  }
-
-  if (iocpd) {
-    assert(sock == iocpd->socket || iocpd->closing);
-    int interests = PN_ERROR; // Always
-    if (pn_selectable_is_reading(selectable)) {
-      interests |= PN_READABLE;
-    }
-    if (pn_selectable_is_writing(selectable)) {
-      interests |= PN_WRITABLE;
-    }
-    if (deadline) {
-      interests |= PN_EXPIRED;
-    }
-    interests_update(iocpd, interests);
-    deadlines_update(iocpd, deadline);
-  }
-}
-
-void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable)
-{
-  assert(selector);
-  assert(selectable);
-
-  int idx = pni_selectable_get_index(selectable);
-  assert(idx >= 0);
-  iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx);
-  if (iocpd) {
-    if (selector->current_triggered == iocpd)
-      selector->current_triggered = iocpd->triggered_list_next;
-    interests_update(iocpd, 0);
-    deadlines_update(iocpd, 0);
-    assert(selector->triggered_list_head != iocpd && !iocpd->triggered_list_prev);
-    assert(selector->deadlines_head != iocpd && !iocpd->deadlines_prev);
-    iocpd->selector = NULL;
-    iocpd->selectable = NULL;
-  }
-  pn_list_del(selector->selectables, idx, 1);
-  pn_list_del(selector->iocp_descriptors, idx, 1);
-  size_t size = pn_list_size(selector->selectables);
-  for (size_t i = idx; i < size; i++) {
-    pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i);
-    pni_selectable_set_index(sel, i);
-  }
-
-  pni_selectable_set_index(selectable, -1);
-
-  if (selector->current >= (size_t) idx) {
-    selector->current--;
-  }
-}
-
-size_t pn_selector_size(pn_selector_t *selector) {
-  assert(selector);
-  return pn_list_size(selector->selectables);
-}
-
-int pn_selector_select(pn_selector_t *selector, int timeout)
-{
-  assert(selector);
-  pn_error_clear(selector->error);
-  pn_timestamp_t deadline = 0;
-  pn_timestamp_t now = pn_i_now();
-
-  if (timeout) {
-    if (selector->deadlines_head)
-      deadline = selector->deadlines_head->deadline;
-  }
-  if (deadline) {
-    int64_t delta = deadline - now;
-    if (delta < 0) {
-      delta = 0;
-    }
-    if (timeout < 0)
-      timeout = delta;
-    else if (timeout > delta)
-      timeout = delta;
-  }
-  deadline = (timeout >= 0) ? now + timeout : 0;
-
-  // Process all currently available completions, even if matched events available
-  pni_iocp_drain_completions(selector->iocp);
-  pni_zombie_check(selector->iocp, now);
-  // Loop until an interested event is matched, or until deadline
-  while (true) {
-    if (selector->triggered_list_head)
-      break;
-    if (deadline && deadline <= now)
-      break;
-    pn_timestamp_t completion_deadline = deadline;
-    pn_timestamp_t zd = pni_zombie_deadline(selector->iocp);
-    if (zd)
-      completion_deadline = completion_deadline ? pn_min(zd, completion_deadline) : zd;
-
-    int completion_timeout = (!completion_deadline) ? -1 : completion_deadline - now;
-    int rv = pni_iocp_wait_one(selector->iocp, completion_timeout, selector->error);
-    if (rv < 0)
-      return pn_error_code(selector->error);
-
-    now = pn_i_now();
-    if (zd && zd <= now) {
-      pni_zombie_check(selector->iocp, now);
-    }
-  }
-
-  selector->current = 0;
-  selector->awoken = now;
-  for (iocpdesc_t *iocpd = selector->deadlines_head; iocpd; iocpd = iocpd->deadlines_next) {
-    if (iocpd->deadline <= now)
-      pni_events_update(iocpd, iocpd->events | PN_EXPIRED);
-    else
-      break;
-  }
-  selector->current_triggered = selector->triggered_list_head;
-  return pn_error_code(selector->error);
-}
-
-pn_selectable_t *pn_selector_next(pn_selector_t *selector, int *events)
-{
-  if (selector->current_triggered) {
-    iocpdesc_t *iocpd = selector->current_triggered;
-    *events = iocpd->interests & iocpd->events;
-    selector->current_triggered = iocpd->triggered_list_next;
-    return iocpd->selectable;
-  }
-  return NULL;
-}
-
-void pn_selector_free(pn_selector_t *selector)
-{
-  assert(selector);
-  pn_free(selector);
-}
-
-
-static void triggered_list_add(pn_selector_t *selector, iocpdesc_t *iocpd)
-{
-  if (iocpd->triggered_list_prev || selector->triggered_list_head == iocpd)
-    return; // already in list
-  LL_ADD(selector, triggered_list, iocpd);
-}
-
-static void triggered_list_remove(pn_selector_t *selector, iocpdesc_t *iocpd)
-{
-  if (!iocpd->triggered_list_prev && selector->triggered_list_head != iocpd)
-    return; // not in list
-  LL_REMOVE(selector, triggered_list, iocpd);
-  iocpd->triggered_list_prev = NULL;
-  iocpd->triggered_list_next = NULL;
-}
-
-
-void pni_events_update(iocpdesc_t *iocpd, int events)
-{
-  // If set, a poll error is permanent
-  if (iocpd->poll_error)
-    events |= PN_ERROR;
-  if (iocpd->events == events)
-    return;
-  iocpd->events = events;
-  if (iocpd->selector) {
-    if (iocpd->events & iocpd->interests)
-      triggered_list_add(iocpd->selector, iocpd);
-    else
-      triggered_list_remove(iocpd->selector, iocpd);
-  }
-}
-
-static void interests_update(iocpdesc_t *iocpd, int interests)
-{
-  int old_interests = iocpd->interests;
-  if (old_interests == interests)
-    return;
-  iocpd->interests = interests;
-  if (iocpd->selector) {
-    if (iocpd->events & iocpd->interests)
-      triggered_list_add(iocpd->selector, iocpd);
-    else
-      triggered_list_remove(iocpd->selector, iocpd);
-  }
-}
-
-static void deadlines_remove(pn_selector_t *selector, iocpdesc_t *iocpd)
-{
-  if (!iocpd->deadlines_prev && selector->deadlines_head != iocpd)
-    return; // not in list
-  LL_REMOVE(selector, deadlines, iocpd);
-  iocpd->deadlines_prev = NULL;
-  iocpd->deadlines_next = NULL;
-}
-
-
-static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t deadline)
-{
-  if (deadline == iocpd->deadline)
-    return;
-
-  iocpd->deadline = deadline;
-  pn_selector_t *selector = iocpd->selector;
-  if (!deadline) {
-    deadlines_remove(selector, iocpd);
-    pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED);
-  } else {
-    if (iocpd->deadlines_prev || selector->deadlines_head == iocpd) {
-      deadlines_remove(selector, iocpd);
-      pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED);
-    }
-    iocpdesc_t *dl_iocpd = LL_HEAD(selector, deadlines);
-    while (dl_iocpd && dl_iocpd->deadline <= deadline)
-      dl_iocpd = dl_iocpd->deadlines_next;
-    if (dl_iocpd) {
-      // insert
-      iocpd->deadlines_prev = dl_iocpd->deadlines_prev;
-      iocpd->deadlines_next = dl_iocpd;
-      dl_iocpd->deadlines_prev = iocpd;
-      if (selector->deadlines_head == dl_iocpd)
-        selector->deadlines_head = iocpd;
-    } else {
-      LL_ADD(selector, deadlines, iocpd);  // append
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/windows/write_pipeline.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/write_pipeline.c b/proton-c/src/windows/write_pipeline.c
deleted file mode 100644
index e14e714..0000000
--- a/proton-c/src/windows/write_pipeline.c
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * A simple write buffer pool.  Each socket has a dedicated "primary"
- * buffer and can borrow from a shared pool with limited size tuning.
- * Could enhance e.g. with separate pools per network interface and fancier
- * memory tuning based on interface speed, system resources, and
- * number of connections, etc.
- */
-
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-#if _WIN32_WINNT < 0x0501
-#error "Proton requires Windows API support for XP or later."
-#endif
-#include <winsock2.h>
-#include <Ws2tcpip.h>
-
-#include "platform.h"
-#include <proton/object.h>
-#include <proton/io.h>
-#include <proton/selector.h>
-#include <proton/error.h>
-#include <assert.h>
-#include "selectable.h"
-#include "util.h"
-#include "iocp.h"
-
-// Max overlapped writes per socket
-#define IOCP_MAX_OWRITES 16
-// Write buffer size
-#define IOCP_WBUFSIZE 16384
-
-static void pipeline_log(const char *fmt, ...)
-{
-    va_list ap;
-    va_start(ap, fmt);
-    vfprintf(stderr, fmt, ap);
-    va_end(ap);
-    fflush(stderr);
-}
-
-void pni_shared_pool_create(iocp_t *iocp)
-{
-  // TODO: more pools (or larger one) when using multiple non-loopback interfaces
-  iocp->shared_pool_size = 16;
-  char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging
-  if (env) {
-    int sz = atoi(env);
-    if (sz >= 0 && sz < 256) {
-      iocp->shared_pool_size = sz;
-    }
-  }
-  iocp->loopback_bufsize = 0;
-  env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging
-  if (env) {
-    int sz = atoi(env);
-    if (sz >= 0 && sz <= 128 * 1024) {
-      iocp->loopback_bufsize = sz;
-    }
-  }
-
-  if (iocp->shared_pool_size) {
-    iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
-    HRESULT status = GetLastError();
-    if (!iocp->shared_pool_memory) {
-      perror("Proton write buffer pool allocation failure\n");
-      iocp->shared_pool_size = 0;
-      iocp->shared_available_count = 0;
-      return;
-    }
-
-    iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
-    iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
-    iocp->shared_available_count = iocp->shared_pool_size;
-    char *mem = iocp->shared_pool_memory;
-    for (int i = 0; i < iocp->shared_pool_size; i++) {
-      iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE);
-      mem += IOCP_WBUFSIZE;
-    }
-  }
-}
-
-void pni_shared_pool_free(iocp_t *iocp)
-{
-  for (int i = 0; i < iocp->shared_pool_size; i++) {
-    write_result_t *result = iocp->shared_results[i];
-    if (result->in_use)
-      pipeline_log("Proton buffer pool leak\n");
-    else
-      free(result);
-  }
-  if (iocp->shared_pool_size) {
-    free(iocp->shared_results);
-    free(iocp->available_results);
-    if (iocp->shared_pool_memory) {
-      if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) {
-        perror("write buffers release failed");
-      }
-      iocp->shared_pool_memory = NULL;
-    }
-  }
-}
-
-static void shared_pool_push(write_result_t *result)
-{
-  iocp_t *iocp = result->base.iocpd->iocp;
-  assert(iocp->shared_available_count < iocp->shared_pool_size);
-  iocp->available_results[iocp->shared_available_count++] = result;
-}
-
-static write_result_t *shared_pool_pop(iocp_t *iocp)
-{
-  return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL;
-}
-
-struct write_pipeline_t {
-  iocpdesc_t *iocpd;
-  size_t pending_count;
-  write_result_t *primary;
-  size_t reserved_count;
-  size_t next_primary_index;
-  size_t depth;
-  bool is_writer;
-};
-
-#define write_pipeline_compare NULL
-#define write_pipeline_inspect NULL
-#define write_pipeline_hashcode NULL
-
-static void write_pipeline_initialize(void *object)
-{
-  write_pipeline_t *pl = (write_pipeline_t *) object;
-  pl->pending_count = 0;
-  const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE);
-  pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE);
-  pl->depth = 0;
-  pl->is_writer = false;
-}
-
-static void write_pipeline_finalize(void *object)
-{
-  write_pipeline_t *pl = (write_pipeline_t *) object;
-  free((void *)pl->primary->buffer.start);
-  free(pl->primary);
-}
-
-write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd)
-{
-  static const pn_cid_t CID_write_pipeline = CID_pn_void;
-  static const pn_class_t clazz = PN_CLASS(write_pipeline);
-  write_pipeline_t *pipeline = (write_pipeline_t *) pn_class_new(&clazz, sizeof(write_pipeline_t));
-  pipeline->iocpd = iocpd;
-  pipeline->primary->base.iocpd = iocpd;
-  return pipeline;
-}
-
-static void confirm_as_writer(write_pipeline_t *pl)
-{
-  if (!pl->is_writer) {
-    iocp_t *iocp = pl->iocpd->iocp;
-    iocp->writer_count++;
-    pl->is_writer = true;
-  }
-}
-
-static void remove_as_writer(write_pipeline_t *pl)
-{
-  if (!pl->is_writer)
-    return;
-  iocp_t *iocp = pl->iocpd->iocp;
-  assert(iocp->writer_count);
-  pl->is_writer = false;
-  iocp->writer_count--;
-}
-
-/*
- * Optimal depth will depend on properties of the NIC, server, and driver.  For now,
- * just distinguish between loopback interfaces and the rest.  Optimizations in the
- * loopback stack allow decent performance with depth 1 and actually cause major
- * performance hiccups if set to large values.
- */
-static void set_depth(write_pipeline_t *pl)
-{
-  pl->depth = 1;
-  sockaddr_storage sa;
-  socklen_t salen = sizeof(sa);
-  char buf[INET6_ADDRSTRLEN];
-  DWORD buflen = sizeof(buf);
-
-  if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 &&
-      getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) {
-    if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) ||
-        (sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) {
-      // not loopback
-      pl->depth = IOCP_MAX_OWRITES;
-    } else {
-      iocp_t *iocp = pl->iocpd->iocp;
-      if (iocp->loopback_bufsize) {
-        const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize);
-        if (p) {
-          pl->primary->buffer.start = p;
-          pl->primary->buffer.size = iocp->loopback_bufsize;
-        }
-      }
-    }
-  }
-}
-
-// Reserve as many buffers as possible for count bytes.
-size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count)
-{
-  if (pl->primary->in_use)
-    return 0;  // I.e. io->wouldblock
-  if (!pl->depth)
-    set_depth(pl);
-  if (pl->depth == 1) {
-    // always use the primary
-    pl->reserved_count = 1;
-    pl->next_primary_index = 0;
-    return 1;
-  }
-
-  iocp_t *iocp = pl->iocpd->iocp;
-  confirm_as_writer(pl);
-  size_t wanted = (count / IOCP_WBUFSIZE);
-  if (count % IOCP_WBUFSIZE)
-    wanted++;
-  size_t pending = pl->pending_count;
-  assert(pending < pl->depth);
-  size_t bufs = pn_min(wanted, pl->depth - pending);
-  // Can draw from shared pool or the primary... but share with others.
-  size_t writers = iocp->writer_count;
-  size_t shared_count = (iocp->shared_available_count + writers - 1) / writers;
-  bufs = pn_min(bufs, shared_count + 1);
-  pl->reserved_count = pending + bufs;
-
-  if (bufs == wanted &&
-      pl->reserved_count < (pl->depth / 2) &&
-      iocp->shared_available_count > (2 * writers + bufs)) {
-    // No shortage: keep the primary as spare for future use
-    pl->next_primary_index = pl->reserved_count;
-  } else if (bufs == 1) {
-    pl->next_primary_index = pending;
-  } else {
-    // let approx 1/3 drain before replenishing
-    pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1;
-    if (pl->next_primary_index < pending)
-      pl->next_primary_index = pending;
-  }
-  return bufs;
-}
-
-write_result_t *pni_write_pipeline_next(write_pipeline_t *pl)
-{
-  size_t sz = pl->pending_count;
-  if (sz >= pl->reserved_count)
-    return NULL;
-  write_result_t *result;
-  if (sz == pl->next_primary_index) {
-    result = pl->primary;
-  } else {
-    assert(pl->iocpd->iocp->shared_available_count > 0);
-    result = shared_pool_pop(pl->iocpd->iocp);
-  }
-
-  result->in_use = true;
-  pl->pending_count++;
-  return result;
-}
-
-void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result)
-{
-  result->in_use = false;
-  pl->pending_count--;
-  pl->reserved_count = 0;
-  if (result != pl->primary)
-    shared_pool_push(result);
-  if (pl->pending_count == 0)
-    remove_as_writer(pl);
-}
-
-bool pni_write_pipeline_writable(write_pipeline_t *pl)
-{
-  // Only writable if not full and we can guarantee a buffer:
-  return pl->pending_count < pl->depth && !pl->primary->in_use;
-}
-
-size_t pni_write_pipeline_size(write_pipeline_t *pl)
-{
-  return pl->pending_count;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/tools/cmake/Modules/WindowsC99SymbolCheck.py
----------------------------------------------------------------------
diff --git a/tools/cmake/Modules/WindowsC99SymbolCheck.py b/tools/cmake/Modules/WindowsC99SymbolCheck.py
index 8e81ad9..7c2c9f2 100644
--- a/tools/cmake/Modules/WindowsC99SymbolCheck.py
+++ b/tools/cmake/Modules/WindowsC99SymbolCheck.py
@@ -53,7 +53,7 @@ def symcheck(objfile):
         m = re.search(r'UNDEF.*\b([a-zA-Z_]*snprintf)\b', line)
         if m :
             sym = m.group(1)
-            if re.match(r'_*pn_i_v?snprintf', sym) is None :
+            if re.match(r'_*pni_v?snprintf', sym) is None :
                 raise Exception('Unsafe use of C99 violating function in  ' + objfile + ' : ' + sym)
 
 def main():


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org