You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@subversion.apache.org by st...@apache.org on 2015/12/14 14:15:35 UTC

svn commit: r1719905 - /subversion/branches/parallel-put/subversion/libsvn_fs_fs/transaction.c

Author: stefan2
Date: Mon Dec 14 13:15:35 2015
New Revision: 1719905

URL: http://svn.apache.org/viewvc?rev=1719905&view=rev
Log:
On the parallel-put branch:
Actually allow concurrent puts in FSFS.

The idea is to simply stream into a svn_spillbuf_t and only to append
to the protorev file when the stream gets closed, i.e. when it is
completed.  We only need to serialize the last part and can allow any
number of text reps to be built up in parallel.

There is also no need to check for the same text being written twice
concurrently because that is semantically undefined anyway.  Our code
will let the last one win and prevents corruption. 

Note that this is not safe, yet, as prop sets and making the tree
node mutable have not been mutexed, yet.

* subversion/libsvn_fs_fs/transaction.c
  (rep_write_baton): Add spill buffer as an alternative to the file.
  (rep_write_open_buffer): New function similar to rep_write_open_file.
  (rep_write_get_baton): In concurrent mode, use the spill buffer.
  (rep_write_contents_close): Same. Also, in concurrent mode, we need
                              finally open the protorev file and copy
                              our data into it.  Don't lose updates
                              caused by concurrent prop sets.

Modified:
    subversion/branches/parallel-put/subversion/libsvn_fs_fs/transaction.c

Modified: subversion/branches/parallel-put/subversion/libsvn_fs_fs/transaction.c
URL: http://svn.apache.org/viewvc/subversion/branches/parallel-put/subversion/libsvn_fs_fs/transaction.c?rev=1719905&r1=1719904&r2=1719905&view=diff
==============================================================================
--- subversion/branches/parallel-put/subversion/libsvn_fs_fs/transaction.c (original)
+++ subversion/branches/parallel-put/subversion/libsvn_fs_fs/transaction.c Mon Dec 14 13:15:35 2015
@@ -1996,6 +1996,8 @@ struct rep_write_baton
   /* Lock 'cookie' used to unlock the output file once we've finished
      writing to it. */
   void *lockcookie;
+  /* Underlying data buffer. */
+  svn_spillbuf_t *buffer;
 
   /* Calculate full-text checksums. */
   svn_checksum_ctx_t *md5_checksum_ctx;
@@ -2222,6 +2224,32 @@ rep_write_cleanup(void *data)
   return APR_SUCCESS;
 }
 
+/* Create a spill buffer and initialize elements of B such that we will
+ * write to that buffer. */
+static svn_error_t *
+rep_write_open_buffer(struct rep_write_baton *b)
+{
+  const char *txn_dir;
+
+  txn_dir = svn_fs_fs__path_txn_dir(b->fs,
+                                    svn_fs_fs__id_txn_id(b->noderev->id),
+                                    b->scratch_pool);
+
+  /* Be careful not to create buffers that are too large, there could be
+   * a lot of them.
+   *
+   * Per delta stream, we already allocate 2 txdelta windows worth of memory,
+   * so adding at most 4 more of them should be acceptable. */
+  b->buffer = svn_spillbuf__create_extended(SVN_STREAM_CHUNK_SIZE,
+                                            4 * SVN_STREAM_CHUNK_SIZE,
+                                            TRUE, /* delete-on-close */
+                                            FALSE, /* spill-all-contents */
+                                            txn_dir, b->scratch_pool);
+  b->rep_stream = svn_stream__from_spillbuf(b->buffer, b->scratch_pool);
+
+  return SVN_NO_ERROR;
+}
+
 /* Open the proto-rev file and initialize elements of B such that we can
  * append to that file. */
 static svn_error_t *
@@ -2277,6 +2305,9 @@ rep_write_get_baton(struct rep_write_bat
   b->noderev = noderev;
 
   /* Set up the raw target stream. */
+  if (ffd->concurrent_txns)
+    SVN_ERR(rep_write_open_buffer(b));
+  else
     SVN_ERR(rep_write_open_file(b));
 
   /* Get the base for this delta. */
@@ -2300,6 +2331,9 @@ rep_write_get_baton(struct rep_write_bat
                                       b->scratch_pool));
 
   /* Now determine the offset of the actual svndiff data. */
+  if (ffd->concurrent_txns)
+    b->delta_start = svn_spillbuf__get_size(b->buffer);
+  else
     SVN_ERR(svn_io_file_get_offset(&b->delta_start, b->file,
                                    b->scratch_pool));
 
@@ -2494,9 +2528,11 @@ static svn_error_t *
 rep_write_contents_close(void *baton)
 {
   struct rep_write_baton *b = baton;
+  fs_fs_data_t *ffd = b->fs->fsap_data;
   representation_t *rep;
   representation_t *old_rep;
   apr_off_t offset;
+  svn_stream_t *source = b->rep_stream;
   const svn_fs_fs__id_part_t *txn_id = svn_fs_fs__id_txn_id(b->noderev->id);
 
   rep = apr_pcalloc(b->result_pool, sizeof(*rep));
@@ -2507,7 +2543,11 @@ rep_write_contents_close(void *baton)
     SVN_ERR(svn_stream_close(b->delta_stream));
 
   /* Determine the length of the svndiff data. */
-  SVN_ERR(svn_io_file_get_offset(&offset, b->file, b->scratch_pool));
+  if (ffd->concurrent_txns)
+    offset = svn_spillbuf__get_size(b->buffer);
+  else
+    SVN_ERR(svn_io_file_get_offset(&offset, b->file, b->scratch_pool));
+
   rep->size = offset - b->delta_start;
 
   /* Fill in the rest of the representation field. */
@@ -2520,6 +2560,10 @@ rep_write_contents_close(void *baton)
   SVN_ERR(digests_final(rep, b->md5_checksum_ctx, b->sha1_checksum_ctx,
                         b->result_pool));
 
+  /* If not already done, open the prototype rev file and lock this txn. */
+  if (ffd->concurrent_txns)
+    SVN_ERR(rep_write_open_file(b));
+
   /* Check and see if we already have a representation somewhere that's
      identical to the one we just wrote out. */
   SVN_ERR(get_shared_rep(&old_rep, b->fs, rep, NULL, b->result_pool,
@@ -2531,7 +2575,8 @@ rep_write_contents_close(void *baton)
   if (old_rep)
     {
       /* We need to erase from the protorev the data we just wrote. */
-      SVN_ERR(svn_io_file_trunc(b->file, b->rep_offset, b->scratch_pool));
+      if (!ffd->concurrent_txns)
+        SVN_ERR(svn_io_file_trunc(b->file, b->rep_offset, b->scratch_pool));
 
       /* Use the old rep for this content. */
       rep = old_rep;
@@ -2541,6 +2586,11 @@ rep_write_contents_close(void *baton)
     {
       svn_fs_fs__p2l_entry_t entry;
 
+      /* We want to add this new representation to the protorev file. */
+      if (ffd->concurrent_txns)
+        SVN_ERR(svn_stream_copy3(source, b->rep_stream, NULL, NULL,
+                                 b->scratch_pool));
+
       /* Write out our cosmetic end marker. */
       SVN_ERR(svn_stream_puts(b->rep_stream, "ENDREP\n"));
       SVN_ERR(allocate_item_index(&rep->item_index, b->fs, txn_id,
@@ -2562,7 +2612,17 @@ rep_write_contents_close(void *baton)
       SVN_ERR(store_p2l_index_entry(b->fs, txn_id, &entry, b->scratch_pool));
     }
 
-  /* Write out the new node-rev information. */
+  /* Write out the new node-rev information.
+   *
+   * If concurrent writes are allowed, the noderev may have changed on disk.
+   * Re-read it to prevent lost updates. */
+  if (ffd->concurrent_txns)
+    {
+      SVN_ERR(svn_fs_fs__get_node_revision(&b->noderev, b->fs, b->noderev->id,
+                                           b->result_pool, b->scratch_pool));
+      b->noderev->data_rep = rep;
+    }
+
   SVN_ERR(svn_fs_fs__put_node_revision(b->fs, b->noderev->id, b->noderev,
                                        FALSE, b->scratch_pool));