You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@subversion.apache.org by st...@apache.org on 2016/01/21 22:55:56 UTC

svn commit: r1726113 - /subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c

Author: stefan2
Date: Thu Jan 21 21:55:56 2016
New Revision: 1726113

URL: http://svn.apache.org/viewvc?rev=1726113&view=rev
Log:
On the parallel-put branch:
Provide a test case that actually uses multiple threads to manipulate
text and props in a transaction concurrently.  It had been used to
find all the issues that were fixed recently on this branch.

* subversion/tests/libsvn_fs/fs-test.c
  (parallel_put_baton_t,
   parallel_put_thread,
   compose_apr_result,
   parallel_put_contents,
   verify_contents): New helper code for the new test.
  (test_parallel_put): The new test case.
  (test_funcs): Register the new test case.

Modified:
    subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c

Modified: subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c
URL: http://svn.apache.org/viewvc/subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c?rev=1726113&r1=1726112&r2=1726113&view=diff
==============================================================================
--- subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c (original)
+++ subversion/branches/parallel-put/subversion/tests/libsvn_fs/fs-test.c Thu Jan 21 21:55:56 2016
@@ -7141,6 +7141,282 @@ test_concurrent_txn_write(const svn_test
   return SVN_NO_ERROR;
 }
 
+#if APR_HAS_THREADS
+
+/* Parameter set passed to the thread function parallel_put_thread. */
+typedef struct parallel_put_baton_t
+{
+  /* Pool to use (APR does not provide one). */
+  apr_pool_t *pool;
+
+  /* Thread-specific FS instance (used for bookeeping in the main thread). */
+  svn_fs_t *fs;
+
+  /* Thread-specific TXN root. */
+  svn_fs_root_t *root;
+
+  /* Directories to process (all files have the name /$dirNum/FILE_NAME/ */
+  int count;
+
+  /* Name of the file to find (one per directory). */
+  char file_name;
+
+  /* The thread object as used by the main thread for bookeeping. */
+  apr_thread_t *thread;
+
+  /* The error object produced by the thread (or SVN_NO_ERROR). */
+  svn_error_t *err;
+
+  /* Return values of apr_thread_join. */
+  apr_status_t retval1;
+  apr_status_t retval2;
+} parallel_put_baton_t;
+
+/* APR thread taking a parallel_put_baton_t to write the contents of all
+ * files that have a certain name throughout the repo. */
+static void * APR_THREAD_FUNC
+parallel_put_thread(apr_thread_t *tid, void *data)
+{
+  parallel_put_baton_t *baton = data;
+  apr_pool_t *iterpool = svn_pool_create(baton->pool);
+  svn_revnum_t base = svn_fs_txn_root_base_revision(baton->root);
+  int i;
+
+  for (i = 0; i < baton->count && !baton->err; ++i)
+    {
+      const char *path;
+      const char *text;
+
+      svn_pool_clear(iterpool);
+      path = apr_psprintf(iterpool, "%d/%c", i, baton->file_name);
+      text = apr_psprintf(iterpool, "text %ld %s", base, path);
+
+      baton->err = svn_error_trace(svn_test__set_file_contents(baton->root,
+                                                               path, text,
+                                                               iterpool));
+    }
+
+  svn_pool_destroy(iterpool);
+  apr_thread_exit(tid, 0);
+
+  return NULL;
+}
+
+/* Utility functionn adding an error to CHAIN if RETVAL is not APR_SUCCESS. */
+static svn_error_t *
+compose_apr_result(svn_error_t *chain,
+                   apr_status_t retval)
+{
+  if (retval == APR_SUCCESS)
+    return chain;
+
+  return svn_error_compose_create(chain, svn_error_wrap_apr(retval, " "));
+}
+
+/* Concurrently set new contents and props for all files under TXN_ROOT.
+ * There are COUNT directories with PROC_COUNT files each.
+ * Use POOL for allocations. */
+static svn_error_t *
+parallel_put_contents(svn_fs_root_t *txn_root,
+                      int count,
+                      int proc_count,
+                      apr_pool_t *pool)
+{
+  svn_error_t *err = NULL;
+  int i;
+  char c;
+  svn_fs_t *fs = svn_fs_root_fs(txn_root);
+  apr_hash_t *config = svn_fs_config(fs, pool);
+  const char *txn_name = svn_fs_txn_root_name(txn_root, pool);
+  const char *repo_name = svn_fs_path(fs, pool);
+  svn_revnum_t base = svn_fs_txn_root_base_revision(txn_root);
+  apr_pool_t *iterpool = svn_pool_create(pool);
+  parallel_put_baton_t *params = apr_pcalloc(pool,
+                                             sizeof(*params) * proc_count);
+
+  /* Prepare the parameters for PROC_COUNT threads. */
+  for (i = 0; i < proc_count; ++i)
+    {
+      svn_fs_txn_t *txn;
+
+      SVN_ERR(svn_fs_open(&params[i].fs, repo_name, config, pool));
+      SVN_ERR(svn_fs_open_txn(&txn, params[i].fs, txn_name, pool));
+      SVN_ERR(svn_fs_txn_root(&params[i].root, txn, pool));
+
+      params[i].pool = pool;
+      params[i].count = count;
+      params[i].file_name = 'A' + i;
+      params[i].err = NULL;
+    }
+
+  /* Start the threads. They will set the text contents of every file
+   * in every directory.  One thread per file name, covering all directories
+   * having 1 file of that name in it. */
+  for (i = 0; i < proc_count; ++i)
+    {
+      apr_status_t retval = apr_thread_create(&params[i].thread, NULL,
+                                              parallel_put_thread,
+                                              &params[i], pool);
+      err = compose_apr_result(err, retval);
+    }
+
+  /* Main thread: modify properties of all files (concurrently to the text
+   * modifications going on in the background threads). */
+  for (i = 0; i < count; ++i)
+    for (c = 'A'; c < 'A' + proc_count && !err; ++c)
+      {
+        svn_error_t *local_err;
+        const char *path;
+        svn_string_t *value;
+        svn_pool_clear(iterpool);
+
+        path = apr_psprintf(iterpool, "%d/%c", i, c);
+        value = svn_string_createf(iterpool, "prop %ld %s", base, path);
+        local_err = svn_error_trace(svn_fs_change_node_prop(txn_root, path,
+                                                            "userprop",
+                                                            value, iterpool));
+
+        err = svn_error_compose_create(err, local_err);
+      }
+
+  /* Wait for all threads to end.  If we were to SVN_ERR out here, we might
+   * see pool destruction racing with thread completion etc. */
+  for (i = 0; i < proc_count; ++i)
+    params[i].retval1 = apr_thread_join(&params[i].retval2, params[i].thread);
+
+  svn_pool_destroy(iterpool);
+
+  /* Collect all errors. */
+  for (i = 0; i < proc_count; ++i)
+    {
+      err = compose_apr_result(err, params[i].retval1);
+      err = compose_apr_result(err, params[i].retval2);
+      err = svn_error_compose_create(err, params[i].err);
+    }
+
+  /* Show them. */
+  SVN_ERR(err);
+
+  return SVN_NO_ERROR;
+}
+
+/* Verify that all data in FS's HEAD is correct.  HEAD has been created
+ * based on BASE with COUNT directories, PROC_COUNT files each.
+ * Use POOL for temporary allocations. */
+static svn_error_t *
+verify_contents(svn_fs_t *fs,
+                svn_revnum_t base,
+                int count,
+                int proc_count,
+                apr_pool_t *pool)
+{
+  apr_pool_t *iterpool = svn_pool_create(pool);
+  int i;
+  char c;
+  svn_fs_root_t *root;
+
+  SVN_ERR(svn_fs_revision_root(&root, fs, base + 1, pool));
+  for (i = 0; i < count; ++i)
+    for (c = 'A'; c < 'A' + proc_count; ++c)
+      {
+        svn_string_t *prop;
+        svn_stringbuf_t *text;
+        const char *expected_prop;
+        const char *expected_text;
+        const char *path;
+        svn_pool_clear(iterpool);
+
+        path = apr_psprintf(iterpool, "%d/%c", i, c);
+        expected_prop = apr_psprintf(iterpool, "prop %ld %s", base, path);
+        expected_text = apr_psprintf(iterpool, "text %ld %s", base, path);
+
+        SVN_ERR(svn_fs_node_prop(&prop, root, path, "userprop", iterpool));
+        SVN_TEST_ASSERT(prop);
+        SVN_TEST_STRING_ASSERT(prop->data, expected_prop);
+
+        SVN_ERR(svn_test__get_file_contents(root, path, &text, iterpool));
+        SVN_TEST_STRING_ASSERT(text->data, expected_text);
+      }
+
+  return SVN_NO_ERROR;
+}
+
+static svn_error_t *
+test_parallel_put(const svn_test_opts_t *opts,
+                  apr_pool_t *pool)
+{
+  apr_hash_t *config;
+  svn_fs_t *fs;
+  svn_fs_txn_t *txn;
+  svn_fs_root_t *txn_root;
+  svn_revnum_t new_rev = 0;
+  apr_pool_t *iterpool = svn_pool_create(pool);
+  const char *repo_name = "test-repo-parallel-put";
+  int i;
+  const int COUNT = 100, PROC_COUNT = 4;
+  char c;
+
+  /* Create a new repo. */
+  config = apr_hash_make(pool);
+  svn_hash_sets(config, SVN_FS_CONFIG_CONCURRENT_WRITES, "1");
+  SVN_ERR(svn_test__create_fs2(&fs, repo_name, opts, config, pool));
+
+  /* Bail (with success) on known-untestable scenarios */
+  if (!svn_fs_supports_concurrent_writes(fs, pool))
+    return svn_error_create(SVN_ERR_TEST_SKIPPED, NULL,
+                            "concurrent write is not supported by FS");
+
+  /* r1: Create a highly regular tree.  Having the files deeper in the tree
+   * means we can check whether concurrent tree updates lose DAG / directory
+   * updates.
+   *
+   * Set the initial contents concurrently.
+   */
+  SVN_ERR(svn_fs_begin_txn(&txn, fs, new_rev, pool));
+  SVN_ERR(svn_fs_txn_root(&txn_root, txn, pool));
+
+  for (i = 0; i < COUNT; ++i)
+    {
+      svn_pool_clear(iterpool);
+      SVN_ERR(svn_fs_make_dir(txn_root, apr_itoa(iterpool, i), iterpool));
+
+      for (c = 'A'; c < 'A' + PROC_COUNT; ++c)
+        SVN_ERR(svn_fs_make_file(txn_root,
+                                 apr_psprintf(iterpool, "%d/%c", i, c),
+                                 iterpool));
+    }
+
+  SVN_ERR(parallel_put_contents(txn_root, COUNT, PROC_COUNT, pool));
+  SVN_ERR(test_commit_txn(&new_rev, txn, NULL, pool));
+
+  /* All contents as expected? */
+  SVN_ERR(verify_contents(fs, 0, COUNT, PROC_COUNT, pool));
+
+  /* r2: Write new contents. */
+  SVN_ERR(svn_fs_begin_txn(&txn, fs, new_rev, pool));
+  SVN_ERR(svn_fs_txn_root(&txn_root, txn, pool));
+  SVN_ERR(parallel_put_contents(txn_root, COUNT, PROC_COUNT, pool));
+  SVN_ERR(test_commit_txn(&new_rev, txn, NULL, pool));
+
+  /* All contents as expected? */
+  SVN_ERR(verify_contents(fs, 1, COUNT, PROC_COUNT, pool));
+
+  /* Done. */
+  svn_pool_destroy(iterpool);
+  return SVN_NO_ERROR;
+}
+
+#else /* No threading: */
+
+static svn_error_t *
+test_parallel_put(const svn_test_opts_t *opts,
+                  apr_pool_t *pool)
+{
+  return svn_error_create(SVN_ERR_TEST_SKIPPED, NULL, "no thread support");
+}
+
+#endif /* APR_HAS_THREADS */
+
 /* ------------------------------------------------------------------------ */
 
 /* The test table.  */
@@ -7277,6 +7553,8 @@ static struct svn_test_descriptor_t test
                        "freeze and commit"),
     SVN_TEST_OPTS_PASS(test_concurrent_txn_write,
                        "test concurrent write to txn"),
+    SVN_TEST_OPTS_PASS(test_parallel_put,
+                       "parallel put"),
     SVN_TEST_NULL
   };