You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@subversion.apache.org by rh...@apache.org on 2014/01/12 20:02:47 UTC

svn commit: r1557599 - /subversion/trunk/subversion/libsvn_ra_serf/update.c

Author: rhuijben
Date: Sun Jan 12 19:02:47 2014
New Revision: 1557599

URL: http://svn.apache.org/r1557599
Log:
Following up on r1555716, r1556343 and the reversion of these in r1556933,
implement yet another create body to memory spool implementation. This time
using serf buckets to allow releasing all ram when we have to switch to a
file.

* subversion/libsvn_ra_serf/update.c
  (body_create_baton_t): New typedef.
  (report_context_t): Update variables.
  (MAX_BODY_IN_RAM): New define.
  (body_allocate_all,
   serf_free_no_error
   body_write_fn,
   body_done_fn): New functions.

  (set_path,
   delete_path,
   link_path): Write to stream. Once we can assume the serf bucket length
     support we can probably switch to writing buckets directly.

  (create_update_report_body): Implement passing body buffer.
  (finish_report): Update for stream. Remove mmap specifics.
  (make_update_reporter): Initialize body collector. Write to stream.

Modified:
    subversion/trunk/subversion/libsvn_ra_serf/update.c

Modified: subversion/trunk/subversion/libsvn_ra_serf/update.c
URL: http://svn.apache.org/viewvc/subversion/trunk/subversion/libsvn_ra_serf/update.c?rev=1557599&r1=1557598&r2=1557599&view=diff
==============================================================================
--- subversion/trunk/subversion/libsvn_ra_serf/update.c (original)
+++ subversion/trunk/subversion/libsvn_ra_serf/update.c Sun Jan 12 19:02:47 2014
@@ -241,7 +241,7 @@ static const svn_ra_serf__xml_transition
 
 /* Forward-declare our report context. */
 typedef struct report_context_t report_context_t;
-
+typedef struct body_create_baton_t body_create_baton_t;
 /*
  * This structure represents the information for a directory.
  */
@@ -420,7 +420,8 @@ struct report_context_t {
    * ### todo: It will be better for performance to store small
    * request bodies (like 4k) in memory and bigger bodies on disk.
    */
-  apr_file_t *body_file;
+  svn_stream_t *body_template;
+  body_create_baton_t *body;
 
   /* number of pending GET requests */
   unsigned int num_active_fetches;
@@ -438,6 +439,150 @@ struct report_context_t {
   svn_boolean_t closed_root;
 };
 
+/* Baton for collecting REPORT body. Depending on the size this
+   work is backed by a memory buffer (via serf buckets) or by
+   a file */
+typedef struct body_create_baton_t
+{
+  apr_pool_t *result_pool;
+  apr_size_t total_bytes;
+
+  apr_pool_t *scratch_pool;
+
+  serf_bucket_alloc_t *alloc;
+  serf_bucket_t *collect_bucket;
+
+  const void *all_data;
+  apr_file_t *file;
+} body_create_baton_t;
+
+
+#define MAX_BODY_IN_RAM (256*1024)
+
+/* Fold all previously collected data in a single buffer allocated in
+   RESULT_POOL and clear all intermediate state */
+static const char *
+body_allocate_all(body_create_baton_t *body,
+                  apr_pool_t *result_pool)
+{
+  char *buffer = apr_pcalloc(result_pool, body->total_bytes);
+  const char *data;
+  apr_size_t sz;
+  apr_status_t s;
+  apr_size_t remaining = body->total_bytes;
+  char *next = buffer;
+
+  while (!(s = serf_bucket_read(body->collect_bucket, remaining, &data, &sz)))
+    {
+      memcpy(next, data, sz);
+      remaining -= sz;
+      next += sz;
+
+      if (! remaining)
+        break;
+    }
+
+  if (!SERF_BUCKET_READ_ERROR(s) && sz >= 0)
+    {
+      memcpy(next, data, sz);
+    }
+
+  serf_bucket_destroy(body->collect_bucket);
+  body->collect_bucket = NULL;
+
+  return (s != APR_EOF) ? NULL : buffer;
+}
+
+/* Noop function. Make serf take care of freeing in error situations */
+static void serf_free_no_error(void *unfreed_baton, void *block) {}
+
+/* Stream write function for body creation */
+static svn_error_t *
+body_write_fn(void *baton,
+              const char *data,
+              apr_size_t *len)
+{
+  body_create_baton_t *bcb = baton;
+
+  if (!bcb->scratch_pool)
+    bcb->scratch_pool = svn_pool_create(bcb->result_pool);
+
+  if (bcb->file)
+    {
+      SVN_ERR(svn_io_file_write_full(bcb->file, data, *len, NULL,
+                                     bcb->scratch_pool));
+      svn_pool_clear(bcb->scratch_pool);
+
+      bcb->total_bytes += *len;
+    }
+  else if (*len + bcb->total_bytes > MAX_BODY_IN_RAM)
+    {
+      SVN_ERR(svn_io_open_unique_file3(&bcb->file, NULL, NULL,
+                                       svn_io_file_del_on_pool_cleanup,
+                                       bcb->result_pool, bcb->scratch_pool));
+
+      if (bcb->total_bytes)
+        {
+          const char *all = body_allocate_all(bcb, bcb->scratch_pool);
+
+          SVN_ERR(svn_io_file_write_full(bcb->file, all, bcb->total_bytes,
+                                         NULL, bcb->scratch_pool));
+        }
+
+      SVN_ERR(svn_io_file_write_full(bcb->file, data, *len, NULL,
+                                     bcb->scratch_pool));
+      bcb->total_bytes += *len;
+    }
+  else
+    {
+      if (!bcb->alloc)
+        bcb->alloc = serf_bucket_allocator_create(bcb->scratch_pool,
+                                                  serf_free_no_error, NULL);
+
+      if (!bcb->collect_bucket)
+        bcb->collect_bucket = serf_bucket_aggregate_create(bcb->alloc);
+
+      serf_bucket_aggregate_append(bcb->collect_bucket,
+                                   serf_bucket_simple_copy_create(data, *len,
+                                                                  bcb->alloc));
+
+      bcb->total_bytes += *len;
+    }
+
+  return SVN_NO_ERROR;
+}
+
+/* Stream close function for collecting body */
+static svn_error_t *
+body_done_fn(void *baton)
+{
+  body_create_baton_t *bcb = baton;
+  if (bcb->file)
+    {
+      /* We need to flush the file, make it unbuffered (so that it can be
+        * zero-copied via mmap), and reset the position before attempting
+        * to deliver the file.
+        *
+        * N.B. If we have APR 1.3+, we can unbuffer the file to let us use
+        * mmap and zero-copy the PUT body.  However, on older APR versions,
+        * we can't check the buffer status; but serf will fall through and
+        * create a file bucket for us on the buffered handle.
+        */
+
+      SVN_ERR(svn_io_file_flush(bcb->file, bcb->scratch_pool));
+#if APR_VERSION_AT_LEAST(1, 3, 0)
+      apr_file_buffer_set(bcb->file, NULL, 0);
+#endif
+    }
+  else if (bcb->collect_bucket)
+    bcb->all_data = body_allocate_all(bcb, bcb->result_pool);
+
+  if (bcb->scratch_pool)
+    svn_pool_destroy(bcb->scratch_pool);
+
+  return SVN_NO_ERROR;
+}
+
 static svn_error_t *
 create_dir_baton(dir_baton_t **new_dir,
                  report_context_t *ctx,
@@ -1958,8 +2103,7 @@ set_path(void *report_baton,
   svn_xml_escape_cdata_cstring(&buf, path, pool);
   svn_xml_make_close_tag(&buf, pool, "S:entry");
 
-  SVN_ERR(svn_io_file_write_full(report->body_file, buf->data, buf->len,
-                                 NULL, pool));
+  SVN_ERR(svn_stream_write(report->body_template, buf->data, &buf->len));
 
   if (lock_token)
     {
@@ -1981,8 +2125,7 @@ delete_path(void *report_baton,
 
   make_simple_xml_tag(&buf, "S:missing", path, pool);
 
-  SVN_ERR(svn_io_file_write_full(report->body_file, buf->data, buf->len,
-                                 NULL, pool));
+  SVN_ERR(svn_stream_write(report->body_template, buf->data, &buf->len));
 
   return SVN_NO_ERROR;
 }
@@ -2031,8 +2174,7 @@ link_path(void *report_baton,
   svn_xml_escape_cdata_cstring(&buf, path, pool);
   svn_xml_make_close_tag(&buf, pool, "S:entry");
 
-  SVN_ERR(svn_io_file_write_full(report->body_file, buf->data, buf->len,
-                                 NULL, pool));
+  SVN_ERR(svn_stream_write(report->body_template, buf->data, &buf->len));
 
   /* Store the switch roots to allow generating repos_relpaths from just
      the working copy paths. (Needed for HTTPv2) */
@@ -2065,12 +2207,23 @@ create_update_report_body(serf_bucket_t 
                           apr_pool_t *pool)
 {
   report_context_t *report = baton;
-  apr_off_t offset;
+  body_create_baton_t *body = report->body;
 
-  offset = 0;
-  SVN_ERR(svn_io_file_seek(report->body_file, APR_SET, &offset, pool));
+  if (body->file)
+    {
+      apr_off_t offset;
 
-  *body_bkt = serf_bucket_file_create(report->body_file, alloc);
+      offset = 0;
+      SVN_ERR(svn_io_file_seek(body->file, APR_SET, &offset, pool));
+
+      *body_bkt = serf_bucket_file_create(report->body->file, alloc);
+    }
+  else
+    {
+      *body_bkt = serf_bucket_simple_create(body->all_data,
+                                            body->total_bytes,
+                                            NULL, NULL, alloc);
+    }
 
   return SVN_NO_ERROR;
 }
@@ -2315,22 +2468,8 @@ finish_report(void *report_baton,
   update_delay_baton_t *ud;
 
   svn_xml_make_close_tag(&buf, iterpool, "S:update-report");
-  SVN_ERR(svn_io_file_write_full(report->body_file, buf->data, buf->len,
-                                 NULL, iterpool));
-
-  /* We need to flush the file, make it unbuffered (so that it can be
-   * zero-copied via mmap), and reset the position before attempting to
-   * deliver the file.
-   *
-   * N.B. If we have APR 1.3+, we can unbuffer the file to let us use mmap
-   * and zero-copy the PUT body.  However, on older APR versions, we can't
-   * check the buffer status; but serf will fall through and create a file
-   * bucket for us on the buffered svndiff handle.
-   */
-  SVN_ERR(svn_io_file_flush(report->body_file, iterpool));
-#if APR_VERSION_AT_LEAST(1, 3, 0)
-  apr_file_buffer_set(report->body_file, NULL, 0);
-#endif
+  SVN_ERR(svn_stream_write(report->body_template, buf->data, &buf->len));
+  SVN_ERR(svn_stream_close(report->body_template));
 
   SVN_ERR(svn_ra_serf__report_resource(&report_target, sess, NULL, pool));
 
@@ -2518,9 +2657,11 @@ make_update_reporter(svn_ra_session_t *r
   *reporter = &ra_serf_reporter;
   *report_baton = report;
 
-  SVN_ERR(svn_io_open_unique_file3(&report->body_file, NULL, NULL,
-                                   svn_io_file_del_on_pool_cleanup,
-                                   report->pool, scratch_pool));
+  report->body = apr_pcalloc(report->pool, sizeof(*report->body));
+  report->body->result_pool = report->pool;
+  report->body_template = svn_stream_create(report->body, report->pool);
+  svn_stream_set_write(report->body_template, body_write_fn);
+  svn_stream_set_close(report->body_template, body_done_fn);
 
   if (sess->bulk_updates == svn_tristate_true)
     {
@@ -2653,8 +2794,7 @@ make_update_reporter(svn_ra_session_t *r
 
   make_simple_xml_tag(&buf, "S:depth", svn_depth_to_word(depth), scratch_pool);
 
-  SVN_ERR(svn_io_file_write_full(report->body_file, buf->data, buf->len,
-                                 NULL, scratch_pool));
+  SVN_ERR(svn_stream_write(report->body_template, buf->data, &buf->len));
 
   return SVN_NO_ERROR;
 }