You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@subversion.apache.org by st...@apache.org on 2016/01/22 00:34:55 UTC
svn commit: r1726130 - in
/subversion/branches/parallel-put/subversion/libsvn_fs_x: ./ dag_cache.c
fs.h transaction.c
Author: stefan2
Date: Thu Jan 21 23:34:55 2016
New Revision: 1726130
URL: http://svn.apache.org/viewvc?rev=1726130&view=rev
Log:
On the parallel-put branch:
Begin porting the new feature to FSX. Merge the prep work
(revisions 1719883, 1719884, 1719892 and 1719893) from FSFS
to FSX and resolve lots of text conflicts.
This makes a handful of svnadmin tests fail for FSX.
Modified:
subversion/branches/parallel-put/subversion/libsvn_fs_x/ (props changed)
subversion/branches/parallel-put/subversion/libsvn_fs_x/dag_cache.c
subversion/branches/parallel-put/subversion/libsvn_fs_x/fs.h
subversion/branches/parallel-put/subversion/libsvn_fs_x/transaction.c
Propchange: subversion/branches/parallel-put/subversion/libsvn_fs_x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 21 23:34:55 2016
@@ -62,6 +62,7 @@
/subversion/branches/multi-layer-moves/subversion/libsvn_fs_x:1239019-1300930
/subversion/branches/nfc-nfd-aware-client/subversion/libsvn_fs_x:870276,870376
/subversion/branches/node_pool/subversion/libsvn_fs_x:1304828-1305388
+/subversion/branches/parallel-put/subversion/libsvn_fs_fs:1719883-1719884,1719892-1719893
/subversion/branches/patch-exec/subversion/libsvn_fs_x:1692717-1705390
/subversion/branches/performance/subversion/libsvn_fs_x:979193,980118,981087,981090,981189,981194,981287,981684,981827,982043,982355,983398,983406,983430,983474,983488,983490,983760,983764,983766,983770,984927,984973,984984,985014,985037,985046,985472,985477,985482,985487-985488,985493,985497,985500,985514,985601,985603,985606,985669,985673,985695,985697,986453,986465,986485,986491-986492,986517,986521,986605,986608,986817,986832,987865,987868-987869,987872,987886-987888,987893,988319,988898,990330,990533,990535-990537,990541,990568,990572,990574-990575,990600,990759,992899,992904,992911,993127,993141,994956,995478,995507,995603,998012,998858,999098,1001413,1001417,1004291,1022668,1022670,1022676,1022715,1022719,1025660,1025672,1027193,1027203,1027206,1027214,1027227,1028077,1028092,1028094,1028104,1028107,1028111,1028354,1029038,1029042-1029043,1029054-1029055,1029062-1029063,1029078,1029080,1029090,1029092-1029093,1029111,1029151,1029158,1029229-1029230,1029232,1029335-1029336,102
9339-1029340,1029342,1029344,1030763,1030827,1031203,1031235,1032285,1032333,1033040,1033057,1033294,1035869,1035882,1039511,1043705,1053735,1056015,1066452,1067683,1067697-1078365
/subversion/branches/pin-externals/subversion/libsvn_fs_x:1643757-1659392
Modified: subversion/branches/parallel-put/subversion/libsvn_fs_x/dag_cache.c
URL: http://svn.apache.org/viewvc/subversion/branches/parallel-put/subversion/libsvn_fs_x/dag_cache.c?rev=1726130&r1=1726129&r2=1726130&view=diff
==============================================================================
--- subversion/branches/parallel-put/subversion/libsvn_fs_x/dag_cache.c (original)
+++ subversion/branches/parallel-put/subversion/libsvn_fs_x/dag_cache.c Thu Jan 21 23:34:55 2016
@@ -433,11 +433,15 @@ svn_fs_x__update_dag_cache(dag_node_t *n
cache_entry_t *bucket;
svn_string_t normalized;
+ svn_fs_x__change_set_t change_set = svn_fs_x__dag_get_id(node)->change_set;
- auto_clear_dag_cache(cache);
- bucket = cache_lookup(cache, svn_fs_x__dag_get_id(node)->change_set,
- normalize_path(&normalized, path));
- bucket->node = svn_fs_x__dag_dup(node, cache->pool);
+ if (!svn_fs_x__is_txn(change_set) || !ffd->concurrent_txns)
+ {
+ auto_clear_dag_cache(cache);
+ bucket = cache_lookup(cache, change_set,
+ normalize_path(&normalized, path));
+ bucket->node = svn_fs_x__dag_dup(node, cache->pool);
+ }
}
void
@@ -449,6 +453,11 @@ svn_fs_x__invalidate_dag_cache(svn_fs_ro
svn_fs_x__change_set_t change_set = svn_fs_x__root_change_set(root);
apr_size_t i;
+
+ /* The cache is not used for items in concurrent txns. */
+ if (ffd->concurrent_txns)
+ return;
+
for (i = 0; i < BUCKET_COUNT; ++i)
{
cache_entry_t *bucket = &cache->buckets[i];
Modified: subversion/branches/parallel-put/subversion/libsvn_fs_x/fs.h
URL: http://svn.apache.org/viewvc/subversion/branches/parallel-put/subversion/libsvn_fs_x/fs.h?rev=1726130&r1=1726129&r2=1726130&view=diff
==============================================================================
--- subversion/branches/parallel-put/subversion/libsvn_fs_x/fs.h (original)
+++ subversion/branches/parallel-put/subversion/libsvn_fs_x/fs.h Thu Jan 21 23:34:55 2016
@@ -159,6 +159,14 @@ typedef struct svn_fs_x__shared_txn_data
/* The pool in which this object has been allocated; a subpool of the
common pool. */
apr_pool_t *pool;
+
+ /* TRUE, if more than one thread or process may write to this transaction
+ at the same time. */
+ svn_boolean_t is_concurrent;
+
+ /* Transaction lock to be used when SUPPORTS_CONCURRENT_WRITE is set.
+ Otherwise, this becomes a dummy lock. */
+ svn_mutex__t *lock;
} svn_fs_x__shared_txn_data_t;
/* Private FSX-specific data shared between all svn_fs_t objects that
@@ -355,6 +363,10 @@ typedef struct svn_fs_x__data_t
/* TRUE while the we hold a lock on the write lock file. */
svn_boolean_t has_write_lock;
+ /* TRUE, if more than one thread or process may write to a transaction
+ at the same time. */
+ svn_boolean_t concurrent_txns;
+
/* Data shared between all svn_fs_t objects for a given filesystem. */
svn_fs_x__shared_data_t *shared;
Modified: subversion/branches/parallel-put/subversion/libsvn_fs_x/transaction.c
URL: http://svn.apache.org/viewvc/subversion/branches/parallel-put/subversion/libsvn_fs_x/transaction.c?rev=1726130&r1=1726129&r2=1726130&view=diff
==============================================================================
--- subversion/branches/parallel-put/subversion/libsvn_fs_x/transaction.c (original)
+++ subversion/branches/parallel-put/subversion/libsvn_fs_x/transaction.c Thu Jan 21 23:34:55 2016
@@ -83,13 +83,14 @@ svn_fs_x__txn_get_id(svn_fs_txn_t *txn)
/* Functions for working with shared transaction data. */
-/* Return the transaction object for transaction TXN_ID from the
+/* Set *TXN_P to the transaction object for transaction TXN_ID from the
transaction list of filesystem FS (which must already be locked via the
txn_list_lock mutex). If the transaction does not exist in the list,
then create a new transaction object and return it (if CREATE_NEW is
true) or return NULL (otherwise). */
-static svn_fs_x__shared_txn_data_t *
-get_shared_txn(svn_fs_t *fs,
+static svn_error_t *
+get_shared_txn(svn_fs_x__shared_txn_data_t **txn_p,
+ svn_fs_t *fs,
svn_fs_x__txn_id_t txn_id,
svn_boolean_t create_new)
{
@@ -101,8 +102,57 @@ get_shared_txn(svn_fs_t *fs,
if (txn->txn_id == txn_id)
break;
- if (txn || !create_new)
- return txn;
+ if (txn)
+ {
+ /* All users must adhere to the same locking scheme.
+ *
+ * Note that this check is "best effort" because, depending on the
+ * locking scheme, we will not always call this function for all
+ * possible conflicts. Modifying the same txn with e.g. different
+ * clients _at_the_same_time_ has always been illegal for the non-
+ * current setup. So, this is just being nice to tool developers ...
+ *
+ * This will also create "false positives" in case where a txn is
+ * being modified through two sessions with different concurrency
+ * setting where some external scheme prevents actual races between
+ * them. However, we consider such a setup illegal (and theoretical).
+ */
+ if (txn->is_concurrent != ffd->concurrent_txns)
+ {
+ if (txn->is_concurrent)
+ return svn_error_createf(SVN_ERR_FS_TXN_CONCURRENCY_MISMATCH,
+ NULL,
+ _("Cannot reopen transaction '%s' "
+ "without concurrent write support"),
+ svn_fs_x__txn_name(txn_id, fs->pool));
+ else
+ return svn_error_createf(SVN_ERR_FS_TXN_CONCURRENCY_MISMATCH,
+ NULL,
+ _("Cannot reopen transaction '%s' "
+ "with concurrent write support"),
+ svn_fs_x__txn_name(txn_id, fs->pool));
+ }
+
+ *txn_p = txn;
+ return SVN_NO_ERROR;
+ }
+
+ if (!create_new)
+ {
+ *txn_p = NULL;
+ return SVN_NO_ERROR;
+ }
+
+ /* We can only reuse txns with the same concurrency settings.
+ It is not much of a loss anyway because creating a new pool etc. is
+ quite cheap. */
+ while ( ffsd->free_txn
+ && (ffsd->free_txn->is_concurrent != ffd->concurrent_txns))
+ {
+ txn = ffsd->free_txn;
+ ffsd->free_txn = txn->next;
+ svn_pool_destroy(txn->pool);
+ }
/* Use the transaction object from the (single-object) freelist,
if one is available, or otherwise create a new object. */
@@ -116,6 +166,8 @@ get_shared_txn(svn_fs_t *fs,
apr_pool_t *subpool = svn_pool_create(ffsd->common_pool);
txn = apr_palloc(subpool, sizeof(*txn));
txn->pool = subpool;
+ txn->is_concurrent = ffd->concurrent_txns;
+ SVN_ERR(svn_mutex__init(&txn->lock, txn->is_concurrent, txn->pool));
}
txn->txn_id = txn_id;
@@ -128,7 +180,10 @@ get_shared_txn(svn_fs_t *fs,
txn->next = ffsd->txns;
ffsd->txns = txn;
- return txn;
+ /* Done. */
+ *txn_p = txn;
+
+ return SVN_NO_ERROR;
}
/* Free the transaction object for transaction TXN_ID, and remove it
@@ -521,8 +576,10 @@ unlock_proto_rev_body(svn_fs_t *fs,
{
const unlock_proto_rev_baton_t *b = baton;
apr_file_t *lockfile = b->lockcookie;
- svn_fs_x__shared_txn_data_t *txn = get_shared_txn(fs, b->txn_id, FALSE);
apr_status_t apr_err;
+ svn_fs_x__shared_txn_data_t *txn;
+
+ SVN_ERR(get_shared_txn(&txn, fs, b->txn_id, FALSE));
if (!txn)
return svn_error_createf(SVN_ERR_FS_CORRUPT, NULL,
@@ -548,6 +605,8 @@ unlock_proto_rev_body(svn_fs_t *fs,
txn->being_written = FALSE;
+ SVN_ERR(svn_mutex__unlock(txn->lock, NULL));
+
return SVN_NO_ERROR;
}
@@ -577,16 +636,14 @@ typedef struct get_writable_proto_rev_ba
svn_fs_x__txn_id_t txn_id;
} get_writable_proto_rev_baton_t;
-/* Callback used in the implementation of get_writable_proto_rev(). */
+/* Acquire the proto-rev lock for TXN in FS and set *LOCKCOOKIE to cookie.
+ * Use POOL for all allocations. */
static svn_error_t *
-get_writable_proto_rev_body(svn_fs_t *fs,
- const void *baton,
- apr_pool_t *scratch_pool)
-{
- const get_writable_proto_rev_baton_t *b = baton;
- void **lockcookie = b->lockcookie;
- svn_fs_x__shared_txn_data_t *txn = get_shared_txn(fs, b->txn_id, TRUE);
-
+lock_proto_rev_body(void **lockcookie,
+ svn_fs_t *fs,
+ svn_fs_x__shared_txn_data_t *txn,
+ apr_pool_t *scratch_pool)
+{
/* First, ensure that no thread in this process (including this one)
is currently writing to this transaction's proto-rev file. */
if (txn->being_written)
@@ -595,7 +652,7 @@ get_writable_proto_rev_body(svn_fs_t *fs
"of transaction '%s' because a previous "
"representation is currently being written by "
"this process"),
- svn_fs_x__txn_name(b->txn_id, scratch_pool));
+ svn_fs_x__txn_name(txn->txn_id, scratch_pool));
/* We know that no thread in this process is writing to the proto-rev
@@ -608,7 +665,7 @@ get_writable_proto_rev_body(svn_fs_t *fs
apr_file_t *lockfile;
apr_status_t apr_err;
const char *lockfile_path
- = svn_fs_x__path_txn_proto_rev_lock(fs, b->txn_id, scratch_pool);
+ = svn_fs_x__path_txn_proto_rev_lock(fs, txn->txn_id, scratch_pool);
/* Open the proto-rev lockfile, creating it if necessary, as it may
not exist if the transaction dates from before the lockfiles were
@@ -621,8 +678,12 @@ get_writable_proto_rev_body(svn_fs_t *fs
APR_WRITE | APR_CREATE, APR_OS_DEFAULT,
scratch_pool));
+ /* In concurrent mode, we will wait for our turn to write the file.
+ * Otherwise, an already locked file indicates an error. */
apr_err = apr_file_lock(lockfile,
- APR_FLOCK_EXCLUSIVE | APR_FLOCK_NONBLOCK);
+ txn->is_concurrent
+ ? APR_FLOCK_EXCLUSIVE
+ : APR_FLOCK_EXCLUSIVE | APR_FLOCK_NONBLOCK);
if (apr_err)
{
svn_error_clear(svn_io_file_close(lockfile, scratch_pool));
@@ -633,7 +694,7 @@ get_writable_proto_rev_body(svn_fs_t *fs
"file of transaction '%s' because a "
"previous representation is currently "
"being written by another process"),
- svn_fs_x__txn_name(b->txn_id,
+ svn_fs_x__txn_name(txn->txn_id,
scratch_pool));
return svn_error_wrap_apr(apr_err,
@@ -651,6 +712,25 @@ get_writable_proto_rev_body(svn_fs_t *fs
return SVN_NO_ERROR;
}
+/* Callback used in the implementation of get_writable_proto_rev(). */
+static svn_error_t *
+get_writable_proto_rev_body(svn_fs_t *fs, const void *baton, apr_pool_t *pool)
+{
+ const get_writable_proto_rev_baton_t *b = baton;
+ svn_fs_x__shared_txn_data_t *txn;
+ svn_error_t *err;
+
+ SVN_ERR(get_shared_txn(&txn, fs, b->txn_id, TRUE));
+
+ /* Lock mutex (if even enabled) and file. */
+ SVN_ERR(svn_mutex__lock(txn->lock));
+ err = lock_proto_rev_body(b->lockcookie, fs, txn, pool);
+ if (err)
+ return svn_mutex__unlock(txn->lock, err);
+
+ return SVN_NO_ERROR;
+}
+
/* Make sure the length ACTUAL_LENGTH of the proto-revision file PROTO_REV
of transaction TXN_ID in filesystem FS matches the proto-index file.
Trim any crash / failure related extra data from the proto-rev file.
@@ -1993,7 +2073,7 @@ typedef struct rep_write_baton_t
/* The FS we are writing to. */
svn_fs_t *fs;
- /* Actual file to which we are writing. */
+ /* Final representation stream that receives the deltified data. */
svn_stream_t *rep_stream;
/* A stream from the delta combiner. Data written here gets
@@ -2006,8 +2086,8 @@ typedef struct rep_write_baton_t
/* Start of the actual data. */
apr_off_t delta_start;
- /* How many bytes have been written to this rep already. */
- svn_filesize_t rep_size;
+ /* How many full-text bytes have been written to this rep already. */
+ svn_filesize_t expanded_size;
/* The node revision for which we're writing out info. */
svn_fs_x__noderev_t *noderev;
@@ -2018,6 +2098,7 @@ typedef struct rep_write_baton_t
writing to it. */
void *lockcookie;
+ /* Calculate full-text checksums. */
svn_checksum_ctx_t *md5_checksum_ctx;
svn_checksum_ctx_t *sha1_checksum_ctx;
@@ -2044,7 +2125,7 @@ rep_write_contents(void *baton,
SVN_ERR(svn_checksum_update(b->md5_checksum_ctx, data, *len));
SVN_ERR(svn_checksum_update(b->sha1_checksum_ctx, data, *len));
- b->rep_size += *len;
+ b->expanded_size += *len;
return svn_stream_write(b->delta_stream, data, len);
}
@@ -2244,6 +2325,32 @@ rep_write_cleanup(void *data)
return APR_SUCCESS;
}
+/* Open the proto-rev file and initialize elements of B such that we can
+ * append to that file. */
+static svn_error_t *
+rep_write_open_file(rep_write_baton_t *b)
+{
+ svn_fs_x__txn_id_t txn_id
+ = svn_fs_x__get_txn_id(b->noderev->noderev_id.change_set);
+
+ /* Open the prototype rev file and seek to its end. */
+ SVN_ERR(get_writable_proto_rev(&b->file, &b->lockcookie, b->fs, txn_id,
+ b->local_pool));
+ b->rep_stream = svn_checksum__wrap_write_stream_fnv1a_32x4(
+ &b->fnv1a_checksum,
+ svn_stream_from_aprfile2(b->file, TRUE,
+ b->local_pool),
+ b->local_pool);
+
+ SVN_ERR(svn_io_file_get_offset(&b->rep_offset, b->file, b->local_pool));
+
+ /* Cleanup in case something goes wrong. */
+ apr_pool_cleanup_register(b->local_pool, b, rep_write_cleanup,
+ apr_pool_cleanup_null);
+
+ return SVN_NO_ERROR;
+}
+
/* Get a rep_write_baton_t, allocated from RESULT_POOL, and store it in
WB_P for the representation indicated by NODEREV in filesystem FS.
Only appropriate for file contents, not for props or directory contents.
@@ -2256,15 +2363,12 @@ rep_write_get_baton(rep_write_baton_t **
{
svn_fs_x__data_t *ffd = fs->fsap_data;
rep_write_baton_t *b;
- apr_file_t *file;
svn_fs_x__representation_t *base_rep;
svn_stream_t *source;
svn_txdelta_window_handler_t wh;
void *whb;
int diff_version = 1;
svn_fs_x__rep_header_t header = { 0 };
- svn_fs_x__txn_id_t txn_id
- = svn_fs_x__get_txn_id(noderev->noderev_id.change_set);
b = apr_pcalloc(result_pool, sizeof(*b));
@@ -2276,21 +2380,11 @@ rep_write_get_baton(rep_write_baton_t **
b->fs = fs;
b->result_pool = result_pool;
b->local_pool = svn_pool_create(result_pool);
- b->rep_size = 0;
+ b->expanded_size = 0;
b->noderev = noderev;
- /* Open the prototype rev file and seek to its end. */
- SVN_ERR(get_writable_proto_rev(&file, &b->lockcookie, fs, txn_id,
- b->local_pool));
-
- b->file = file;
- b->rep_stream = svn_checksum__wrap_write_stream_fnv1a_32x4(
- &b->fnv1a_checksum,
- svn_stream_from_aprfile2(file, TRUE,
- b->local_pool),
- b->local_pool);
-
- SVN_ERR(svn_io_file_get_offset(&b->rep_offset, file, b->local_pool));
+ /* Set up the raw target stream. */
+ SVN_ERR(rep_write_open_file(b));
/* Get the base for this delta. */
SVN_ERR(choose_delta_base(&base_rep, fs, noderev, FALSE, b->local_pool));
@@ -2313,11 +2407,8 @@ rep_write_get_baton(rep_write_baton_t **
b->local_pool));
/* Now determine the offset of the actual svndiff data. */
- SVN_ERR(svn_io_file_get_offset(&b->delta_start, file, b->local_pool));
-
- /* Cleanup in case something goes wrong. */
- apr_pool_cleanup_register(b->local_pool, b, rep_write_cleanup,
- apr_pool_cleanup_null);
+ SVN_ERR(svn_io_file_get_offset(&b->delta_start, b->file,
+ b->local_pool));
/* Prepare to write the svndiff data. */
svn_txdelta_to_svndiff3(&wh,
@@ -2529,7 +2620,7 @@ rep_write_contents_close(void *baton)
rep->size = offset - b->delta_start;
/* Fill in the rest of the representation field. */
- rep->expanded_size = b->rep_size;
+ rep->expanded_size = b->expanded_size;
txn_id = svn_fs_x__get_txn_id(b->noderev->noderev_id.change_set);
rep->id.change_set = svn_fs_x__change_set_by_txn(txn_id);
@@ -2542,37 +2633,32 @@ rep_write_contents_close(void *baton)
SVN_ERR(get_shared_rep(&old_rep, b->fs, rep, NULL, b->result_pool,
b->local_pool));
+ /* Note: We must set the DATA_REP in the noderev structure passed in to
+ * the baton. It will be used later in upper layers to get the
+ * actual checksums. */
if (old_rep)
{
/* We need to erase from the protorev the data we just wrote. */
SVN_ERR(svn_io_file_trunc(b->file, b->rep_offset, b->local_pool));
/* Use the old rep for this content. */
- b->noderev->data_rep = old_rep;
+ rep = old_rep;
+ b->noderev->data_rep = rep;
}
else
{
+ svn_fs_x__p2l_entry_t entry;
+ svn_fs_x__id_t noderev_id;
+
/* Write out our cosmetic end marker. */
SVN_ERR(svn_stream_puts(b->rep_stream, "ENDREP\n"));
SVN_ERR(allocate_item_index(&rep->id.number, b->fs, txn_id,
b->local_pool));
+
+ /* Write index data. */
SVN_ERR(store_l2p_index_entry(b->fs, txn_id, b->rep_offset,
rep->id.number, b->local_pool));
- b->noderev->data_rep = rep;
- }
-
- SVN_ERR(svn_stream_close(b->rep_stream));
-
- /* Remove cleanup callback. */
- apr_pool_cleanup_kill(b->local_pool, b, rep_write_cleanup);
-
- /* Write out the new node-rev information. */
- SVN_ERR(svn_fs_x__put_node_revision(b->fs, b->noderev, b->local_pool));
- if (!old_rep)
- {
- svn_fs_x__p2l_entry_t entry;
- svn_fs_x__id_t noderev_id;
noderev_id.change_set = SVN_FS_X__INVALID_CHANGE_SET;
noderev_id.number = rep->id.number;
@@ -2584,11 +2670,19 @@ rep_write_contents_close(void *baton)
entry.items = &noderev_id;
entry.fnv1_checksum = b->fnv1a_checksum;
+ b->noderev->data_rep = rep;
SVN_ERR(store_sha1_rep_mapping(b->fs, b->noderev, b->local_pool));
SVN_ERR(store_p2l_index_entry(b->fs, txn_id, &entry, b->local_pool));
}
+ /* Write out the new node-rev information. */
+ SVN_ERR(svn_fs_x__put_node_revision(b->fs, b->noderev, b->local_pool));
+
+ /* The protorev file is now complete. */
+ apr_pool_cleanup_kill(b->local_pool, b, rep_write_cleanup);
SVN_ERR(svn_io_file_close(b->file, b->local_pool));
+
+ /* Unlock the txn. */
SVN_ERR(unlock_proto_rev(b->fs, txn_id, b->lockcookie, b->local_pool));
svn_pool_destroy(b->local_pool);