You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@subversion.apache.org by st...@apache.org on 2016/01/21 22:55:56 UTC
svn commit: r1726113 -
/subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c
Author: stefan2
Date: Thu Jan 21 21:55:56 2016
New Revision: 1726113
URL: http://svn.apache.org/viewvc?rev=1726113&view=rev
Log:
On the parallel-put branch:
Provide a test case that actually uses multiple threads to manipulate
text and props in a transaction concurrently. It had been used to
find all the issues that were fixed recently on this branch.
* subversion/tests/libsvn_fs/fs-test.c
(parallel_put_baton_t,
parallel_put_thread,
compose_apr_result,
parallel_put_contents,
verify_contents): New helper code for the new test.
(test_parallel_put): The new test case.
(test_funcs): Register the new test case.
Modified:
subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c
Modified: subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c
URL: http://svn.apache.org/viewvc/subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c?rev=1726113&r1=1726112&r2=1726113&view=diff
==============================================================================
--- subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c (original)
+++ subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c Thu Jan 21 21:55:56 2016
@@ -7141,6 +7141,282 @@ test_concurrent_txn_write(const svn_test
return SVN_NO_ERROR;
}
+#if APR_HAS_THREADS
+
+/* Parameter set passed to the thread function parallel_put_thread. */
+typedef struct parallel_put_baton_t
+{
+ /* Pool to use (APR does not provide one). */
+ apr_pool_t *pool;
+
+ /* Thread-specific FS instance (used for bookeeping in the main thread). */
+ svn_fs_t *fs;
+
+ /* Thread-specific TXN root. */
+ svn_fs_root_t *root;
+
+ /* Directories to process (all files have the name /$dirNum/FILE_NAME/ */
+ int count;
+
+ /* Name of the file to find (one per directory). */
+ char file_name;
+
+ /* The thread object as used by the main thread for bookeeping. */
+ apr_thread_t *thread;
+
+ /* The error object produced by the thread (or SVN_NO_ERROR). */
+ svn_error_t *err;
+
+ /* Return values of apr_thread_join. */
+ apr_status_t retval1;
+ apr_status_t retval2;
+} parallel_put_baton_t;
+
+/* APR thread taking a parallel_put_baton_t to write the contents of all
+ * files that have a certain name throughout the repo. */
+static void * APR_THREAD_FUNC
+parallel_put_thread(apr_thread_t *tid, void *data)
+{
+ parallel_put_baton_t *baton = data;
+ apr_pool_t *iterpool = svn_pool_create(baton->pool);
+ svn_revnum_t base = svn_fs_txn_root_base_revision(baton->root);
+ int i;
+
+ for (i = 0; i < baton->count && !baton->err; ++i)
+ {
+ const char *path;
+ const char *text;
+
+ svn_pool_clear(iterpool);
+ path = apr_psprintf(iterpool, "%d/%c", i, baton->file_name);
+ text = apr_psprintf(iterpool, "text %ld %s", base, path);
+
+ baton->err = svn_error_trace(svn_test__set_file_contents(baton->root,
+ path, text,
+ iterpool));
+ }
+
+ svn_pool_destroy(iterpool);
+ apr_thread_exit(tid, 0);
+
+ return NULL;
+}
+
+/* Utility functionn adding an error to CHAIN if RETVAL is not APR_SUCCESS. */
+static svn_error_t *
+compose_apr_result(svn_error_t *chain,
+ apr_status_t retval)
+{
+ if (retval == APR_SUCCESS)
+ return chain;
+
+ return svn_error_compose_create(chain, svn_error_wrap_apr(retval, " "));
+}
+
+/* Concurrently set new contents and props for all files under TXN_ROOT.
+ * There are COUNT directories with PROC_COUNT files each.
+ * Use POOL for allocations. */
+static svn_error_t *
+parallel_put_contents(svn_fs_root_t *txn_root,
+ int count,
+ int proc_count,
+ apr_pool_t *pool)
+{
+ svn_error_t *err = NULL;
+ int i;
+ char c;
+ svn_fs_t *fs = svn_fs_root_fs(txn_root);
+ apr_hash_t *config = svn_fs_config(fs, pool);
+ const char *txn_name = svn_fs_txn_root_name(txn_root, pool);
+ const char *repo_name = svn_fs_path(fs, pool);
+ svn_revnum_t base = svn_fs_txn_root_base_revision(txn_root);
+ apr_pool_t *iterpool = svn_pool_create(pool);
+ parallel_put_baton_t *params = apr_pcalloc(pool,
+ sizeof(*params) * proc_count);
+
+ /* Prepare the parameters for PROC_COUNT threads. */
+ for (i = 0; i < proc_count; ++i)
+ {
+ svn_fs_txn_t *txn;
+
+ SVN_ERR(svn_fs_open(¶ms[i].fs, repo_name, config, pool));
+ SVN_ERR(svn_fs_open_txn(&txn, params[i].fs, txn_name, pool));
+ SVN_ERR(svn_fs_txn_root(¶ms[i].root, txn, pool));
+
+ params[i].pool = pool;
+ params[i].count = count;
+ params[i].file_name = 'A' + i;
+ params[i].err = NULL;
+ }
+
+ /* Start the threads. They will set the text contents of every file
+ * in every directory. One thread per file name, covering all directories
+ * having 1 file of that name in it. */
+ for (i = 0; i < proc_count; ++i)
+ {
+ apr_status_t retval = apr_thread_create(¶ms[i].thread, NULL,
+ parallel_put_thread,
+ ¶ms[i], pool);
+ err = compose_apr_result(err, retval);
+ }
+
+ /* Main thread: modify properties of all files (concurrently to the text
+ * modifications going on in the background threads). */
+ for (i = 0; i < count; ++i)
+ for (c = 'A'; c < 'A' + proc_count && !err; ++c)
+ {
+ svn_error_t *local_err;
+ const char *path;
+ svn_string_t *value;
+ svn_pool_clear(iterpool);
+
+ path = apr_psprintf(iterpool, "%d/%c", i, c);
+ value = svn_string_createf(iterpool, "prop %ld %s", base, path);
+ local_err = svn_error_trace(svn_fs_change_node_prop(txn_root, path,
+ "userprop",
+ value, iterpool));
+
+ err = svn_error_compose_create(err, local_err);
+ }
+
+ /* Wait for all threads to end. If we were to SVN_ERR out here, we might
+ * see pool destruction racing with thread completion etc. */
+ for (i = 0; i < proc_count; ++i)
+ params[i].retval1 = apr_thread_join(¶ms[i].retval2, params[i].thread);
+
+ svn_pool_destroy(iterpool);
+
+ /* Collect all errors. */
+ for (i = 0; i < proc_count; ++i)
+ {
+ err = compose_apr_result(err, params[i].retval1);
+ err = compose_apr_result(err, params[i].retval2);
+ err = svn_error_compose_create(err, params[i].err);
+ }
+
+ /* Show them. */
+ SVN_ERR(err);
+
+ return SVN_NO_ERROR;
+}
+
+/* Verify that all data in FS's HEAD is correct. HEAD has been created
+ * based on BASE with COUNT directories, PROC_COUNT files each.
+ * Use POOL for temporary allocations. */
+static svn_error_t *
+verify_contents(svn_fs_t *fs,
+ svn_revnum_t base,
+ int count,
+ int proc_count,
+ apr_pool_t *pool)
+{
+ apr_pool_t *iterpool = svn_pool_create(pool);
+ int i;
+ char c;
+ svn_fs_root_t *root;
+
+ SVN_ERR(svn_fs_revision_root(&root, fs, base + 1, pool));
+ for (i = 0; i < count; ++i)
+ for (c = 'A'; c < 'A' + proc_count; ++c)
+ {
+ svn_string_t *prop;
+ svn_stringbuf_t *text;
+ const char *expected_prop;
+ const char *expected_text;
+ const char *path;
+ svn_pool_clear(iterpool);
+
+ path = apr_psprintf(iterpool, "%d/%c", i, c);
+ expected_prop = apr_psprintf(iterpool, "prop %ld %s", base, path);
+ expected_text = apr_psprintf(iterpool, "text %ld %s", base, path);
+
+ SVN_ERR(svn_fs_node_prop(&prop, root, path, "userprop", iterpool));
+ SVN_TEST_ASSERT(prop);
+ SVN_TEST_STRING_ASSERT(prop->data, expected_prop);
+
+ SVN_ERR(svn_test__get_file_contents(root, path, &text, iterpool));
+ SVN_TEST_STRING_ASSERT(text->data, expected_text);
+ }
+
+ return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+test_parallel_put(const svn_test_opts_t *opts,
+ apr_pool_t *pool)
+{
+ apr_hash_t *config;
+ svn_fs_t *fs;
+ svn_fs_txn_t *txn;
+ svn_fs_root_t *txn_root;
+ svn_revnum_t new_rev = 0;
+ apr_pool_t *iterpool = svn_pool_create(pool);
+ const char *repo_name = "test-repo-parallel-put";
+ int i;
+ const int COUNT = 100, PROC_COUNT = 4;
+ char c;
+
+ /* Create a new repo. */
+ config = apr_hash_make(pool);
+ svn_hash_sets(config, SVN_FS_CONFIG_CONCURRENT_WRITES, "1");
+ SVN_ERR(svn_test__create_fs2(&fs, repo_name, opts, config, pool));
+
+ /* Bail (with success) on known-untestable scenarios */
+ if (!svn_fs_supports_concurrent_writes(fs, pool))
+ return svn_error_create(SVN_ERR_TEST_SKIPPED, NULL,
+ "concurrent write is not supported by FS");
+
+ /* r1: Create a highly regular tree. Having the files deeper in the tree
+ * means we can check whether concurrent tree updates lose DAG / directory
+ * updates.
+ *
+ * Set the initial contents concurrently.
+ */
+ SVN_ERR(svn_fs_begin_txn(&txn, fs, new_rev, pool));
+ SVN_ERR(svn_fs_txn_root(&txn_root, txn, pool));
+
+ for (i = 0; i < COUNT; ++i)
+ {
+ svn_pool_clear(iterpool);
+ SVN_ERR(svn_fs_make_dir(txn_root, apr_itoa(iterpool, i), iterpool));
+
+ for (c = 'A'; c < 'A' + PROC_COUNT; ++c)
+ SVN_ERR(svn_fs_make_file(txn_root,
+ apr_psprintf(iterpool, "%d/%c", i, c),
+ iterpool));
+ }
+
+ SVN_ERR(parallel_put_contents(txn_root, COUNT, PROC_COUNT, pool));
+ SVN_ERR(test_commit_txn(&new_rev, txn, NULL, pool));
+
+ /* All contents as expected? */
+ SVN_ERR(verify_contents(fs, 0, COUNT, PROC_COUNT, pool));
+
+ /* r2: Write new contents. */
+ SVN_ERR(svn_fs_begin_txn(&txn, fs, new_rev, pool));
+ SVN_ERR(svn_fs_txn_root(&txn_root, txn, pool));
+ SVN_ERR(parallel_put_contents(txn_root, COUNT, PROC_COUNT, pool));
+ SVN_ERR(test_commit_txn(&new_rev, txn, NULL, pool));
+
+ /* All contents as expected? */
+ SVN_ERR(verify_contents(fs, 1, COUNT, PROC_COUNT, pool));
+
+ /* Done. */
+ svn_pool_destroy(iterpool);
+ return SVN_NO_ERROR;
+}
+
+#else /* No threading: */
+
+static svn_error_t *
+test_parallel_put(const svn_test_opts_t *opts,
+ apr_pool_t *pool)
+{
+ return svn_error_create(SVN_ERR_TEST_SKIPPED, NULL, "no thread support");
+}
+
+#endif /* APR_HAS_THREADS */
+
/* ------------------------------------------------------------------------ */
/* The test table. */
@@ -7277,6 +7553,8 @@ static struct svn_test_descriptor_t test
"freeze and commit"),
SVN_TEST_OPTS_PASS(test_concurrent_txn_write,
"test concurrent write to txn"),
+ SVN_TEST_OPTS_PASS(test_parallel_put,
+ "parallel put"),
SVN_TEST_NULL
};