You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@subversion.apache.org by ko...@apache.org on 2016/01/13 17:22:03 UTC

svn commit: r1724449 - in /subversion/trunk/subversion/libsvn_ra_serf: ra_serf.h request_body.c update.c

Author: kotkov
Date: Wed Jan 13 16:22:02 2016
New Revision: 1724449

URL: http://svn.apache.org/viewvc?rev=1724449&view=rev
Log:
In ra_serf, factor out the request body collecting code used by update
reporter (that keeps request bodies below certain size in memory) for
reuse.

This lays the groundwork required to keep small svndiffs in memory during
commit, instead of always spilling them to temporary files.  Can't use a
spillbuf for that, since a svn_ra_serf__request_body_delegate_t can be
called more than once in case a request needs to be resent (authentication
challenge or hitting a KeepAlive limit), and spillbuf is designed the way
that reading from it drains the contents.

* subversion/libsvn_ra_serf/ra_serf.h
  (SVN_RA_SERF__REQUEST_BODY_IN_MEM_SIZE): New constant, equal
   to the removed MAX_BODY_IN_RAM in update.c.
  (typedef struct svn_ra_serf__request_body_t): New typedef.
  (svn_ra_serf__request_body_create,
   svn_ra_serf__request_body_get_stream,
   svn_ra_serf__request_body_get_delegate): Declare these new functions.

* subversion/libsvn_ra_serf/request_body.c
  New file with svn_ra_serf__request_body_t implementation factored out
  from ...

* subversion/libsvn_ra_serf/update.c
  (struct body_create_baton_t, body_allocate_all, serf_free_no_error,
   body_write_fn, body_done_fn, create_update_report_body): ...this struct
   and corresponding functions.
  (MAX_BODY_IN_RAM): Remove this constant.
  (struct report_context_t.body_template): Adjust the comment.
  (struct report_context_t.body): Now is a svn_ra_serf__request_body_t.
  (finish_report): Ask svn_ra_serf__request_body_t for a request body
   delegate.
  (make_update_reporter): Create a svn_ra_serf__request_body_t, ask it for
   the writable stream.

Added:
    subversion/trunk/subversion/libsvn_ra_serf/request_body.c   (with props)
Modified:
    subversion/trunk/subversion/libsvn_ra_serf/ra_serf.h
    subversion/trunk/subversion/libsvn_ra_serf/update.c

Modified: subversion/trunk/subversion/libsvn_ra_serf/ra_serf.h
URL: http://svn.apache.org/viewvc/subversion/trunk/subversion/libsvn_ra_serf/ra_serf.h?rev=1724449&r1=1724448&r2=1724449&view=diff
==============================================================================
--- subversion/trunk/subversion/libsvn_ra_serf/ra_serf.h (original)
+++ subversion/trunk/subversion/libsvn_ra_serf/ra_serf.h Wed Jan 13 16:22:02 2016
@@ -1570,6 +1570,31 @@ svn_ra_serf__uri_parse(apr_uri_t *uri,
                        apr_pool_t *result_pool);
 
 
+/* Default limit for in-memory size of a request body. */
+#define SVN_RA_SERF__REQUEST_BODY_IN_MEM_SIZE 256 * 1024
+
+/* An opaque structure used to prepare a request body. */
+typedef struct svn_ra_serf__request_body_t svn_ra_serf__request_body_t;
+
+/* Constructor for svn_ra_serf__request_body_t.  Creates a new writable
+   buffer for the request body.  Request bodies under IN_MEMORY_SIZE
+   bytes will be held in memory, otherwise, the body content will be
+   spilled to a temporary file. */
+svn_ra_serf__request_body_t *
+svn_ra_serf__request_body_create(apr_size_t in_memory_size,
+                                 apr_pool_t *result_pool);
+
+/* Get the writable stream associated with BODY. */
+svn_stream_t *
+svn_ra_serf__request_body_get_stream(svn_ra_serf__request_body_t *body);
+
+/* Get a svn_ra_serf__request_body_delegate_t and baton for BODY. */
+void
+svn_ra_serf__request_body_get_delegate(svn_ra_serf__request_body_delegate_t *del,
+                                       void **baton,
+                                       svn_ra_serf__request_body_t *body);
+
+
 #if defined(SVN_DEBUG)
 /* Wrapper macros to collect file and line information */
 #define svn_ra_serf__wrap_err \

Added: subversion/trunk/subversion/libsvn_ra_serf/request_body.c
URL: http://svn.apache.org/viewvc/subversion/trunk/subversion/libsvn_ra_serf/request_body.c?rev=1724449&view=auto
==============================================================================
--- subversion/trunk/subversion/libsvn_ra_serf/request_body.c (added)
+++ subversion/trunk/subversion/libsvn_ra_serf/request_body.c Wed Jan 13 16:22:02 2016
@@ -0,0 +1,222 @@
+/*
+ * request_body.c :  svn_ra_serf__request_body_t implementation
+ *
+ * ====================================================================
+ *    Licensed to the Apache Software Foundation (ASF) under one
+ *    or more contributor license agreements.  See the NOTICE file
+ *    distributed with this work for additional information
+ *    regarding copyright ownership.  The ASF licenses this file
+ *    to you under the Apache License, Version 2.0 (the
+ *    "License"); you may not use this file except in compliance
+ *    with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing,
+ *    software distributed under the License is distributed on an
+ *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *    KIND, either express or implied.  See the License for the
+ *    specific language governing permissions and limitations
+ *    under the License.
+ * ====================================================================
+ */
+
+#include <serf.h>
+
+#include "ra_serf.h"
+
+typedef struct svn_ra_serf__request_body_t
+{
+  svn_stream_t *stream;
+  apr_size_t in_memory_size;
+  apr_size_t total_bytes;
+  serf_bucket_alloc_t *alloc;
+  serf_bucket_t *collect_bucket;
+  const void *all_data;
+  apr_file_t *file;
+  apr_pool_t *result_pool;
+  apr_pool_t *scratch_pool;
+} svn_ra_serf__request_body_t;
+
+/* Fold all previously collected data in a single buffer allocated in
+   RESULT_POOL and clear all intermediate state. */
+static const char *
+allocate_all(svn_ra_serf__request_body_t *body,
+             apr_pool_t *result_pool)
+{
+  char *buffer = apr_pcalloc(result_pool, body->total_bytes);
+  const char *data;
+  apr_size_t sz;
+  apr_status_t s;
+  apr_size_t remaining = body->total_bytes;
+  char *next = buffer;
+
+  while (!(s = serf_bucket_read(body->collect_bucket, remaining, &data, &sz)))
+    {
+      memcpy(next, data, sz);
+      remaining -= sz;
+      next += sz;
+
+      if (! remaining)
+        break;
+    }
+
+  if (!SERF_BUCKET_READ_ERROR(s))
+    {
+      memcpy(next, data, sz);
+    }
+
+  serf_bucket_destroy(body->collect_bucket);
+  body->collect_bucket = NULL;
+
+  return (s != APR_EOF) ? NULL : buffer;
+}
+
+/* Noop function.  Make serf take care of freeing in error situations. */
+static void serf_free_no_error(void *unfreed_baton, void *block) {}
+
+/* Stream write function for body creation. */
+static svn_error_t *
+request_body_stream_write(void *baton,
+                          const char *data,
+                          apr_size_t *len)
+{
+  svn_ra_serf__request_body_t *b = baton;
+
+  if (!b->scratch_pool)
+    b->scratch_pool = svn_pool_create(b->result_pool);
+
+  if (b->file)
+    {
+      SVN_ERR(svn_io_file_write_full(b->file, data, *len, NULL,
+                                     b->scratch_pool));
+      svn_pool_clear(b->scratch_pool);
+
+      b->total_bytes += *len;
+    }
+  else if (*len + b->total_bytes > b->in_memory_size)
+    {
+      SVN_ERR(svn_io_open_unique_file3(&b->file, NULL, NULL,
+                                       svn_io_file_del_on_pool_cleanup,
+                                       b->result_pool, b->scratch_pool));
+
+      if (b->total_bytes)
+        {
+          const char *all = allocate_all(b, b->scratch_pool);
+
+          SVN_ERR(svn_io_file_write_full(b->file, all, b->total_bytes,
+                                         NULL, b->scratch_pool));
+        }
+
+      SVN_ERR(svn_io_file_write_full(b->file, data, *len, NULL,
+                                     b->scratch_pool));
+      b->total_bytes += *len;
+    }
+  else
+    {
+      if (!b->alloc)
+        b->alloc = serf_bucket_allocator_create(b->scratch_pool,
+                                                serf_free_no_error, NULL);
+
+      if (!b->collect_bucket)
+        b->collect_bucket = serf_bucket_aggregate_create(b->alloc);
+
+      serf_bucket_aggregate_append(b->collect_bucket,
+                                   serf_bucket_simple_copy_create(data, *len,
+                                                                  b->alloc));
+
+      b->total_bytes += *len;
+    }
+
+  return SVN_NO_ERROR;
+}
+
+/* Stream close function for collecting body. */
+static svn_error_t *
+request_body_stream_close(void *baton)
+{
+  svn_ra_serf__request_body_t *b = baton;
+
+  if (b->file)
+    {
+      /* We need to flush the file, make it unbuffered (so that it can be
+       * zero-copied via mmap), and reset the position before attempting
+       * to deliver the file.
+       *
+       * N.B. If we have APR 1.3+, we can unbuffer the file to let us use
+       * mmap and zero-copy the PUT body.  However, on older APR versions,
+       * we can't check the buffer status; but serf will fall through and
+       * create a file bucket for us on the buffered handle.
+       */
+
+      SVN_ERR(svn_io_file_flush(b->file, b->scratch_pool));
+      apr_file_buffer_set(b->file, NULL, 0);
+    }
+  else if (b->collect_bucket)
+    b->all_data = allocate_all(b, b->result_pool);
+
+  if (b->scratch_pool)
+    svn_pool_destroy(b->scratch_pool);
+
+  return SVN_NO_ERROR;
+}
+
+/* Implements svn_ra_serf__request_body_delegate_t. */
+static svn_error_t *
+request_body_delegate(serf_bucket_t **body_bkt,
+                      void *baton,
+                      serf_bucket_alloc_t *alloc,
+                      apr_pool_t *request_pool,
+                      apr_pool_t *scratch_pool)
+{
+  svn_ra_serf__request_body_t *b = baton;
+
+  if (b->file)
+    {
+      apr_off_t offset;
+
+      offset = 0;
+      SVN_ERR(svn_io_file_seek(b->file, APR_SET, &offset, scratch_pool));
+
+      *body_bkt = serf_bucket_file_create(b->file, alloc);
+    }
+  else
+    {
+      *body_bkt = serf_bucket_simple_create(b->all_data,
+                                            b->total_bytes,
+                                            NULL, NULL, alloc);
+    }
+
+  return SVN_NO_ERROR;
+}
+
+svn_ra_serf__request_body_t *
+svn_ra_serf__request_body_create(apr_size_t in_memory_size,
+                                 apr_pool_t *result_pool)
+{
+  svn_ra_serf__request_body_t *body = apr_pcalloc(result_pool, sizeof(*body));
+
+  body->in_memory_size = in_memory_size;
+  body->result_pool = result_pool;
+  body->stream = svn_stream_create(body, result_pool);
+
+  svn_stream_set_write(body->stream, request_body_stream_write);
+  svn_stream_set_close(body->stream, request_body_stream_close);
+
+  return body;
+}
+
+svn_stream_t *
+svn_ra_serf__request_body_get_stream(svn_ra_serf__request_body_t *body)
+{
+  return body->stream;
+}
+
+void
+svn_ra_serf__request_body_get_delegate(svn_ra_serf__request_body_delegate_t *del,
+                                       void **baton,
+                                       svn_ra_serf__request_body_t *body)
+{
+  *del = request_body_delegate;
+  *baton = body;
+}

Propchange: subversion/trunk/subversion/libsvn_ra_serf/request_body.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: subversion/trunk/subversion/libsvn_ra_serf/update.c
URL: http://svn.apache.org/viewvc/subversion/trunk/subversion/libsvn_ra_serf/update.c?rev=1724449&r1=1724448&r2=1724449&view=diff
==============================================================================
--- subversion/trunk/subversion/libsvn_ra_serf/update.c (original)
+++ subversion/trunk/subversion/libsvn_ra_serf/update.c Wed Jan 13 16:22:02 2016
@@ -433,13 +433,11 @@ struct report_context_t {
   const svn_delta_editor_t *editor;
   void *editor_baton;
 
-  /* The file holding request body for the REPORT.
-   *
-   * ### todo: It will be better for performance to store small
-   * request bodies (like 4k) in memory and bigger bodies on disk.
-   */
+  /* Stream for collecting the request body. */
   svn_stream_t *body_template;
-  body_create_baton_t *body;
+
+  /* Buffer holding request body for the REPORT (can spill to disk). */
+  svn_ra_serf__request_body_t *body;
 
   /* number of pending GET requests */
   unsigned int num_active_fetches;
@@ -457,148 +455,6 @@ struct report_context_t {
   svn_boolean_t closed_root;
 };
 
-/* Baton for collecting REPORT body. Depending on the size this
-   work is backed by a memory buffer (via serf buckets) or by
-   a file */
-struct body_create_baton_t
-{
-  apr_pool_t *result_pool;
-  apr_size_t total_bytes;
-
-  apr_pool_t *scratch_pool;
-
-  serf_bucket_alloc_t *alloc;
-  serf_bucket_t *collect_bucket;
-
-  const void *all_data;
-  apr_file_t *file;
-};
-
-
-#define MAX_BODY_IN_RAM (256*1024)
-
-/* Fold all previously collected data in a single buffer allocated in
-   RESULT_POOL and clear all intermediate state */
-static const char *
-body_allocate_all(body_create_baton_t *body,
-                  apr_pool_t *result_pool)
-{
-  char *buffer = apr_pcalloc(result_pool, body->total_bytes);
-  const char *data;
-  apr_size_t sz;
-  apr_status_t s;
-  apr_size_t remaining = body->total_bytes;
-  char *next = buffer;
-
-  while (!(s = serf_bucket_read(body->collect_bucket, remaining, &data, &sz)))
-    {
-      memcpy(next, data, sz);
-      remaining -= sz;
-      next += sz;
-
-      if (! remaining)
-        break;
-    }
-
-  if (!SERF_BUCKET_READ_ERROR(s))
-    {
-      memcpy(next, data, sz);
-    }
-
-  serf_bucket_destroy(body->collect_bucket);
-  body->collect_bucket = NULL;
-
-  return (s != APR_EOF) ? NULL : buffer;
-}
-
-/* Noop function. Make serf take care of freeing in error situations */
-static void serf_free_no_error(void *unfreed_baton, void *block) {}
-
-/* Stream write function for body creation */
-static svn_error_t *
-body_write_fn(void *baton,
-              const char *data,
-              apr_size_t *len)
-{
-  body_create_baton_t *bcb = baton;
-
-  if (!bcb->scratch_pool)
-    bcb->scratch_pool = svn_pool_create(bcb->result_pool);
-
-  if (bcb->file)
-    {
-      SVN_ERR(svn_io_file_write_full(bcb->file, data, *len, NULL,
-                                     bcb->scratch_pool));
-      svn_pool_clear(bcb->scratch_pool);
-
-      bcb->total_bytes += *len;
-    }
-  else if (*len + bcb->total_bytes > MAX_BODY_IN_RAM)
-    {
-      SVN_ERR(svn_io_open_unique_file3(&bcb->file, NULL, NULL,
-                                       svn_io_file_del_on_pool_cleanup,
-                                       bcb->result_pool, bcb->scratch_pool));
-
-      if (bcb->total_bytes)
-        {
-          const char *all = body_allocate_all(bcb, bcb->scratch_pool);
-
-          SVN_ERR(svn_io_file_write_full(bcb->file, all, bcb->total_bytes,
-                                         NULL, bcb->scratch_pool));
-        }
-
-      SVN_ERR(svn_io_file_write_full(bcb->file, data, *len, NULL,
-                                     bcb->scratch_pool));
-      bcb->total_bytes += *len;
-    }
-  else
-    {
-      if (!bcb->alloc)
-        bcb->alloc = serf_bucket_allocator_create(bcb->scratch_pool,
-                                                  serf_free_no_error, NULL);
-
-      if (!bcb->collect_bucket)
-        bcb->collect_bucket = serf_bucket_aggregate_create(bcb->alloc);
-
-      serf_bucket_aggregate_append(bcb->collect_bucket,
-                                   serf_bucket_simple_copy_create(data, *len,
-                                                                  bcb->alloc));
-
-      bcb->total_bytes += *len;
-    }
-
-  return SVN_NO_ERROR;
-}
-
-/* Stream close function for collecting body */
-static svn_error_t *
-body_done_fn(void *baton)
-{
-  body_create_baton_t *bcb = baton;
-  if (bcb->file)
-    {
-      /* We need to flush the file, make it unbuffered (so that it can be
-        * zero-copied via mmap), and reset the position before attempting
-        * to deliver the file.
-        *
-        * N.B. If we have APR 1.3+, we can unbuffer the file to let us use
-        * mmap and zero-copy the PUT body.  However, on older APR versions,
-        * we can't check the buffer status; but serf will fall through and
-        * create a file bucket for us on the buffered handle.
-        */
-
-      SVN_ERR(svn_io_file_flush(bcb->file, bcb->scratch_pool));
-      apr_file_buffer_set(bcb->file, NULL, 0);
-    }
-  else if (bcb->collect_bucket)
-    bcb->all_data = body_allocate_all(bcb, bcb->result_pool);
-
-  if (bcb->scratch_pool)
-    svn_pool_destroy(bcb->scratch_pool);
-
-  return SVN_NO_ERROR;
-}
-
 static svn_error_t *
 create_dir_baton(dir_baton_t **new_dir,
                  report_context_t *ctx,
@@ -2313,37 +2169,6 @@ link_path(void *report_baton,
   return APR_SUCCESS;
 }
 
-/* Serf callback to create update request body bucket.
-   Implements svn_ra_serf__request_body_delegate_t */
-static svn_error_t *
-create_update_report_body(serf_bucket_t **body_bkt,
-                          void *baton,
-                          serf_bucket_alloc_t *alloc,
-                          apr_pool_t *pool /* request pool */,
-                          apr_pool_t *scratch_pool)
-{
-  report_context_t *report = baton;
-  body_create_baton_t *body = report->body;
-
-  if (body->file)
-    {
-      apr_off_t offset;
-
-      offset = 0;
-      SVN_ERR(svn_io_file_seek(body->file, APR_SET, &offset, pool));
-
-      *body_bkt = serf_bucket_file_create(report->body->file, alloc);
-    }
-  else
-    {
-      *body_bkt = serf_bucket_simple_create(body->all_data,
-                                            body->total_bytes,
-                                            NULL, NULL, alloc);
-    }
-
-  return SVN_NO_ERROR;
-}
-
 /* Serf callback to setup update request headers. */
 static svn_error_t *
 setup_update_report_headers(serf_bucket_t *headers,
@@ -2684,10 +2509,11 @@ finish_report(void *report_baton,
   handler = svn_ra_serf__create_expat_handler(sess, xmlctx, NULL,
                                               scratch_pool);
 
+  svn_ra_serf__request_body_get_delegate(&handler->body_delegate,
+                                         &handler->body_delegate_baton,
+                                         report->body);
   handler->method = "REPORT";
   handler->path = report_target;
-  handler->body_delegate = create_update_report_body;
-  handler->body_delegate_baton = report;
   handler->body_type = "text/xml";
   handler->custom_accept_encoding = TRUE;
   handler->header_delegate = setup_update_report_headers;
@@ -2802,11 +2628,10 @@ make_update_reporter(svn_ra_session_t *r
   *reporter = &ra_serf_reporter;
   *report_baton = report;
 
-  report->body = apr_pcalloc(report->pool, sizeof(*report->body));
-  report->body->result_pool = report->pool;
-  report->body_template = svn_stream_create(report->body, report->pool);
-  svn_stream_set_write(report->body_template, body_write_fn);
-  svn_stream_set_close(report->body_template, body_done_fn);
+  report->body =
+    svn_ra_serf__request_body_create(SVN_RA_SERF__REQUEST_BODY_IN_MEM_SIZE,
+                                     report->pool);
+  report->body_template = svn_ra_serf__request_body_get_stream(report->body);
 
   if (sess->bulk_updates == svn_tristate_true)
     {