You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/08/04 19:05:24 UTC

svn commit: r982330 - in /couchdb/trunk: etc/couchdb/ share/ share/www/script/ share/www/script/test/ src/couchdb/

Author: fdmanana
Date: Wed Aug  4 17:05:22 2010
New Revision: 982330

URL: http://svn.apache.org/viewvc?rev=982330&view=rev
Log:
Add replicator DB (_replicator).
Part of ticket COUCHDB-776.


Added:
    couchdb/trunk/share/www/script/test/replicator_db.js
    couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
Modified:
    couchdb/trunk/etc/couchdb/default.ini.tpl.in
    couchdb/trunk/share/Makefile.am
    couchdb/trunk/share/www/script/couch_tests.js
    couchdb/trunk/src/couchdb/Makefile.am
    couchdb/trunk/src/couchdb/couch_changes.erl
    couchdb/trunk/src/couchdb/couch_db.hrl
    couchdb/trunk/src/couchdb/couch_js_functions.hrl
    couchdb/trunk/src/couchdb/couch_rep.erl
    couchdb/trunk/src/couchdb/couch_server.erl

Modified: couchdb/trunk/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/trunk/etc/couchdb/default.ini.tpl.in?rev=982330&r1=982329&r2=982330&view=diff
==============================================================================
--- couchdb/trunk/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/trunk/etc/couchdb/default.ini.tpl.in Wed Aug  4 17:05:22 2010
@@ -62,6 +62,7 @@ stats_aggregator={couch_stats_aggregator
 stats_collector={couch_stats_collector, start, []}
 uuids={couch_uuids, start, []}
 auth_cache={couch_auth_cache, start_link, []}
+rep_db_changes_listener={couch_rep_db_listener, start_link, []}
 
 [httpd_global_handlers]
 / = {couch_httpd_misc_handlers, handle_welcome_req, <<"Welcome">>}
@@ -123,5 +124,6 @@ compression_level = 8 ; from 1 (lowest, 
 compressible_types = text/*, application/javascript, application/json,  application/xml
 
 [replicator]
+db = _replicator
 max_http_sessions = 10
-max_http_pipeline_size = 10
\ No newline at end of file
+max_http_pipeline_size = 10

Modified: couchdb/trunk/share/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/share/Makefile.am?rev=982330&r1=982329&r2=982330&view=diff
==============================================================================
--- couchdb/trunk/share/Makefile.am (original)
+++ couchdb/trunk/share/Makefile.am Wed Aug  4 17:05:22 2010
@@ -152,6 +152,7 @@ nobase_dist_localdata_DATA = \
     www/script/test/reduce_false.js \
     www/script/test/reduce_false_temp.js \
     www/script/test/replication.js \
+    www/script/test/replicator_db.js \
     www/script/test/rev_stemming.js \
     www/script/test/rewrite.js \
     www/script/test/security_validation.js \

Modified: couchdb/trunk/share/www/script/couch_tests.js
URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/couch_tests.js?rev=982330&r1=982329&r2=982330&view=diff
==============================================================================
--- couchdb/trunk/share/www/script/couch_tests.js [utf-8] (original)
+++ couchdb/trunk/share/www/script/couch_tests.js [utf-8] Wed Aug  4 17:05:22 2010
@@ -75,6 +75,7 @@ loadTest("reduce_builtin.js");
 loadTest("reduce_false.js");
 loadTest("reduce_false_temp.js");
 loadTest("replication.js");
+loadTest("replicator_db.js");
 loadTest("rev_stemming.js");
 loadTest("rewrite.js");
 loadTest("security_validation.js");

Added: couchdb/trunk/share/www/script/test/replicator_db.js
URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/test/replicator_db.js?rev=982330&view=auto
==============================================================================
--- couchdb/trunk/share/www/script/test/replicator_db.js (added)
+++ couchdb/trunk/share/www/script/test/replicator_db.js Wed Aug  4 17:05:22 2010
@@ -0,0 +1,750 @@
+// Licensed under the Apache License, Version 2.0 (the "License"); you may not
+// use this file except in compliance with the License. You may obtain a copy of
+// the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations under
+// the License.
+
+couchTests.replicator_db = function(debug) {
+
+  if (debug) debugger;
+
+  var wait_rep_doc = 500; // number of millisecs to wait after saving a Rep Doc
+  var host = CouchDB.host;
+  var dbA = new CouchDB("test_suite_rep_db_a", {"X-Couch-Full-Commit":"false"});
+  var dbB = new CouchDB("test_suite_rep_db_b", {"X-Couch-Full-Commit":"false"});
+  var repDb = new CouchDB("test_suite_rep_db", {"X-Couch-Full-Commit":"false"});
+  var usersDb = new CouchDB("test_suite_auth", {"X-Couch-Full-Commit":"false"});
+
+  var docs1 = [
+    {
+      _id: "foo1",
+      value: 11
+    },
+    {
+      _id: "foo2",
+      value: 22
+    },
+    {
+      _id: "foo3",
+      value: 33
+    }
+  ];
+
+  function waitForRep(repDb, repDoc, state) {
+    var newRep,
+        t0 = new Date(),
+        t1,
+        ms = 1000;
+
+    do {
+      newRep = repDb.open(repDoc._id);
+      t1 = new Date();
+    } while (((t1 - t0) <= ms) && newRep.state !== state);
+  }
+
+  function waitForSeq(sourceDb, targetDb) {
+    var targetSeq,
+        sourceSeq = sourceDb.info().update_seq,
+        t0 = new Date(),
+        t1,
+        ms = 1000;
+
+    do {
+      targetSeq = targetDb.info().update_seq;
+      t1 = new Date();
+    } while (((t1 - t0) <= ms) && targetSeq < sourceSeq);
+  }
+
+  function wait(ms) {
+    var t0 = new Date(), t1;
+    do {
+      CouchDB.request("GET", "/");
+      t1 = new Date();
+    } while ((t1 - t0) <= ms);
+  }
+
+
+  function populate_db(db, docs) {
+    db.deleteDb();
+    db.createDb();
+    for (var i = 0; i < docs.length; i++) {
+      var d = docs[i];
+      delete d._rev;
+      T(db.save(d).ok);
+    }
+  }
+
+  function simple_replication() {
+    populate_db(dbA, docs1);
+    populate_db(dbB, []);
+
+    var repDoc = {
+      _id: "foo_simple_rep",
+      source: dbA.name,
+      target: dbB.name
+    };
+    T(repDb.save(repDoc).ok);
+
+    waitForRep(repDb, repDoc, "completed");
+    for (var i = 0; i < docs1.length; i++) {
+      var doc = docs1[i];
+      var copy = dbB.open(doc._id);
+      T(copy !== null);
+      T(copy.value === doc.value);
+    }
+
+    var repDoc1 = repDb.open(repDoc._id);
+    T(repDoc1 !== null);
+    T(repDoc1.source === repDoc.source);
+    T(repDoc1.target === repDoc.target);
+    T(repDoc1.state === "completed", "simple");
+    T(typeof repDoc1.replication_id  === "string");
+  }
+
+
+  function filtered_replication() {
+    var docs2 = docs1.concat([
+      {
+        _id: "_design/mydesign",
+        language : "javascript",
+        filters : {
+          myfilter : (function(doc, req) {
+            return (doc.value % 2) !== Number(req.query.myparam);
+          }).toString()
+        }
+      }
+    ]);
+
+    populate_db(dbA, docs2);
+    populate_db(dbB, []);
+
+    var repDoc = {
+      _id: "foo_filt_rep_doc",
+      source: "http://" + host + "/" + dbA.name,
+      target: dbB.name,
+      filter: "mydesign/myfilter",
+      query_params: {
+        myparam: 1
+      }
+    };
+    T(repDb.save(repDoc).ok);
+
+    waitForRep(repDb, repDoc, "completed");
+    for (var i = 0; i < docs2.length; i++) {
+      var doc = docs2[i];
+      var copy = dbB.open(doc._id);
+
+      if (typeof doc.value === "number") {
+        if ((doc.value % 2) !== 1) {
+          T(copy !== null);
+          T(copy.value === doc.value);
+        } else {
+          T(copy === null);
+        }
+      }
+    }
+
+    var repDoc1 = repDb.open(repDoc._id);
+    T(repDoc1 !== null);
+    T(repDoc1.source === repDoc.source);
+    T(repDoc1.target === repDoc.target);
+    T(repDoc1.state === "completed", "filtered");
+    T(typeof repDoc1.replication_id  === "string");
+  }
+
+
+  function continuous_replication() {
+    populate_db(dbA, docs1);
+    populate_db(dbB, []);
+
+    var repDoc = {
+      _id: "foo_cont_rep_doc",
+      source: "http://" + host + "/" + dbA.name,
+      target: dbB.name,
+      continuous: true
+    };
+
+    T(repDb.save(repDoc).ok);
+
+    waitForSeq(dbA, dbB);
+    for (var i = 0; i < docs1.length; i++) {
+      var doc = docs1[i];
+      var copy = dbB.open(doc._id);
+      T(copy !== null);
+      T(copy.value === doc.value);
+    }
+
+    // add another doc to source, it will be replicated to target
+    var docX = {
+      _id: "foo1000",
+      value: 1001
+    };
+
+    T(dbA.save(docX).ok);
+
+    waitForSeq(dbA, dbB);
+    var copy = dbB.open("foo1000");
+    T(copy !== null);
+    T(copy.value === 1001);
+
+    var repDoc1 = repDb.open(repDoc._id);
+    T(repDoc1 !== null);
+    T(repDoc1.source === repDoc.source);
+    T(repDoc1.target === repDoc.target);
+    T(repDoc1.state === "triggered");
+    T(typeof repDoc1.replication_id  === "string");
+
+    // stop replication by deleting the replication document
+    T(repDb.deleteDoc(repDoc1).ok);
+
+    // add another doc to source, it will NOT be replicated to target
+    var docY = {
+      _id: "foo666",
+      value: 999
+    };
+
+    T(dbA.save(docY).ok);
+
+    wait(200); // is there a way to avoid wait here?
+    var copy = dbB.open("foo666");
+    T(copy === null);
+  }
+
+
+  function by_doc_ids_replication() {
+    // to test that we can replicate docs with slashes in their IDs
+    var docs2 = docs1.concat([
+      {
+        _id: "_design/mydesign",
+        language : "javascript"
+      }
+    ]);
+
+    populate_db(dbA, docs2);
+    populate_db(dbB, []);
+
+    var repDoc = {
+      _id: "foo_cont_rep_doc",
+      source: "http://" + host + "/" + dbA.name,
+      target: dbB.name,
+      doc_ids: ["foo666", "foo3", "_design/mydesign", "foo999", "foo1"]
+    };
+    T(repDb.save(repDoc).ok);
+
+    waitForRep(repDb, repDoc, "completed");
+    var copy = dbB.open("foo1");
+    T(copy !== null);
+    T(copy.value === 11);
+
+    copy = dbB.open("foo2");
+    T(copy === null);
+
+    copy = dbB.open("foo3");
+    T(copy !== null);
+    T(copy.value === 33);
+
+    copy = dbB.open("foo666");
+    T(copy === null);
+
+    copy = dbB.open("foo999");
+    T(copy === null);
+
+    copy = dbB.open("_design/mydesign");
+    T(copy !== null);
+    T(copy.language === "javascript");
+  }
+
+
+  function successive_identical_replications() {
+    populate_db(dbA, docs1);
+    populate_db(dbB, []);
+
+    var repDoc1 = {
+      _id: "foo_ident_rep_1",
+      source: dbA.name,
+      target: dbB.name
+    };
+    T(repDb.save(repDoc1).ok);
+
+    waitForRep(repDb, repDoc1, "completed");
+    for (var i = 0; i < docs1.length; i++) {
+      var doc = docs1[i];
+      var copy = dbB.open(doc._id);
+      T(copy !== null);
+      T(copy.value === doc.value);
+    }
+
+    var repDoc1_copy = repDb.open(repDoc1._id);
+    T(repDoc1_copy !== null);
+    T(repDoc1_copy.source === repDoc1.source);
+    T(repDoc1_copy.target === repDoc1.target);
+    T(repDoc1_copy.state === "completed");
+    T(typeof repDoc1_copy.replication_id  === "string");
+
+    var newDoc = {
+      _id: "doc666",
+      value: 666
+    };
+    T(dbA.save(newDoc).ok);
+
+    wait(200);
+    var newDoc_copy = dbB.open(newDoc._id);
+    // not replicated because first replication is complete (not continuous)
+    T(newDoc_copy === null);
+
+    var repDoc2 = {
+      _id: "foo_ident_rep_2",
+      source: dbA.name,
+      target: dbB.name
+    };
+    T(repDb.save(repDoc2).ok);
+
+    waitForRep(repDb, repDoc2, "completed");
+    var newDoc_copy = dbB.open(newDoc._id);
+    T(newDoc_copy !== null);
+    T(newDoc_copy.value === newDoc.value);
+
+    var repDoc2_copy = repDb.open(repDoc2._id);
+    T(repDoc2_copy !== null);
+    T(repDoc2_copy.source === repDoc1.source);
+    T(repDoc2_copy.target === repDoc1.target);
+    T(repDoc2_copy.state === "completed");
+    T(typeof repDoc2_copy.replication_id  === "string");
+    T(repDoc2_copy.replication_id  === repDoc1_copy.replication_id);
+  }
+
+
+  // test the case where multiple replication docs (different IDs)
+  // describe in fact the same replication (source, target, etc)
+  function identical_rep_docs() {
+    populate_db(dbA, docs1);
+    populate_db(dbB, []);
+
+    var repDoc1 = {
+      _id: "foo_dup_rep_doc_1",
+      source: "http://" + host + "/" + dbA.name,
+      target: dbB.name
+    };
+    var repDoc2 = {
+      _id: "foo_dup_rep_doc_2",
+      source: "http://" + host + "/" + dbA.name,
+      target: dbB.name
+    };
+
+    T(repDb.save(repDoc1).ok);
+    T(repDb.save(repDoc2).ok);
+
+    waitForRep(repDb, repDoc1, "completed");
+    for (var i = 0; i < docs1.length; i++) {
+      var doc = docs1[i];
+      var copy = dbB.open(doc._id);
+      T(copy !== null);
+      T(copy.value === doc.value);
+    }
+
+    repDoc1 = repDb.open("foo_dup_rep_doc_1");
+    T(repDoc1 !== null);
+    T(repDoc1.state === "completed", "identical");
+    T(typeof repDoc1.replication_id  === "string");
+
+    repDoc2 = repDb.open("foo_dup_rep_doc_2");
+    T(repDoc2 !== null);
+    T(typeof repDoc2.state === "undefined");
+    T(repDoc2.replication_id === repDoc1.replication_id);
+  }
+
+
+  // test the case where multiple replication docs (different IDs)
+  // describe in fact the same continuous replication (source, target, etc)
+  function identical_continuous_rep_docs() {
+    populate_db(dbA, docs1);
+    populate_db(dbB, []);
+
+    var repDoc1 = {
+      _id: "foo_dup_cont_rep_doc_1",
+      source: "http://" + host + "/" + dbA.name,
+      target: dbB.name,
+      continuous: true
+    };
+    var repDoc2 = {
+      _id: "foo_dup_cont_rep_doc_2",
+      source: "http://" + host + "/" + dbA.name,
+      target: dbB.name,
+      continuous: true
+    };
+
+    T(repDb.save(repDoc1).ok);
+    T(repDb.save(repDoc2).ok);
+
+    waitForSeq(dbA, dbB);
+    for (var i = 0; i < docs1.length; i++) {
+      var doc = docs1[i];
+      var copy = dbB.open(doc._id);
+      T(copy !== null);
+      T(copy.value === doc.value);
+    }
+
+    repDoc1 = repDb.open("foo_dup_cont_rep_doc_1");
+    T(repDoc1 !== null);
+    T(repDoc1.state === "triggered");
+    T(typeof repDoc1.replication_id  === "string");
+
+    repDoc2 = repDb.open("foo_dup_cont_rep_doc_2");
+    T(repDoc2 !== null);
+    T(typeof repDoc2.state === "undefined");
+    T(repDoc2.replication_id === repDoc1.replication_id);
+
+    var newDoc = {
+      _id: "foo666",
+      value: 999
+    };
+    T(dbA.save(newDoc).ok);
+
+    waitForSeq(dbA, dbB);
+    var copy = dbB.open("foo666");
+    T(copy !== null);
+    T(copy.value === 999);
+
+    // deleting second replication doc, doesn't affect the 1st one and
+    // neither it stops the replication
+    T(repDb.deleteDoc(repDoc2).ok);
+    repDoc1 = repDb.open("foo_dup_cont_rep_doc_1");
+    T(repDoc1 !== null);
+    T(repDoc1.state === "triggered");
+
+    var newDoc2 = {
+        _id: "foo5000",
+        value: 5000
+    };
+    T(dbA.save(newDoc2).ok);
+
+    waitForSeq(dbA, dbB);
+    var copy = dbB.open("foo5000");
+    T(copy !== null);
+    T(copy.value === 5000);
+
+    // deleting the 1st replication document stops the replication
+    T(repDb.deleteDoc(repDoc1).ok);
+    var newDoc3 = {
+        _id: "foo1983",
+        value: 1983
+    };
+    T(dbA.save(newDoc3).ok);
+
+    wait(wait_rep_doc); //how to remove wait?
+    var copy = dbB.open("foo1983");
+    T(copy === null);
+  }
+
+
+  function rep_db_write_authorization() {
+    populate_db(dbA, docs1);
+    populate_db(dbB, []);
+
+    var server_admins_config = [
+      {
+        section: "admins",
+        key: "fdmanana",
+        value: "qwerty"
+      }
+    ];
+
+    run_on_modified_server(server_admins_config, function() {
+      var repDoc = {
+        _id: "foo_rep_doc",
+        source: dbA.name,
+        target: dbB.name
+      };
+
+      try {
+        repDb.save(repDoc);
+        T(false && "Should have thrown an exception");
+      } catch (x) {
+        T(x["error"] === "forbidden");
+      }
+
+      T(CouchDB.login("fdmanana", "qwerty").ok);
+      T(CouchDB.session().userCtx.name === "fdmanana");
+      T(CouchDB.session().userCtx.roles.indexOf("_admin") !== -1);
+
+      T(repDb.save(repDoc).ok);
+
+      waitForRep(repDb, repDoc, "completed");
+      for (var i = 0; i < docs1.length; i++) {
+        var doc = docs1[i];
+        var copy = dbB.open(doc._id);
+        T(copy !== null);
+        T(copy.value === doc.value);
+      }
+
+      repDoc = repDb.open("foo_rep_doc");
+      T(repDoc !== null);
+
+      repDoc.target = "test_suite_foo_db";
+      repDoc.create_target = true;
+
+      // Only the replicator can update replication documents.
+      // Admins can only add and delete replication documents.
+      try {
+        repDb.save(repDoc);
+        T(false && "Should have thrown an exception");
+      } catch (x) {
+        T(x["error"] === "forbidden");
+      }
+    });
+  }
+
+
+  function test_replication_credentials_delegation() {
+    populate_db(usersDb, []);
+
+    var joeUserDoc = CouchDB.prepareUserDoc({
+      name: "joe",
+      roles: ["god", "erlanger"]
+    }, "erly");
+    T(usersDb.save(joeUserDoc).ok);
+
+    var ddoc = {
+      _id: "_design/beer",
+      language: "javascript"
+    };
+    populate_db(dbA, docs1.concat([ddoc]));
+    populate_db(dbB, []);
+
+    T(dbB.setSecObj({
+      admins: {
+        names: [],
+        roles: ["god"]
+      }
+    }).ok);
+
+    var server_admins_config = [
+      {
+        section: "admins",
+        key: "fdmanana",
+        value: "qwerty"
+      }
+    ];
+
+    run_on_modified_server(server_admins_config, function() {
+
+      T(CouchDB.login("fdmanana", "qwerty").ok);
+      T(CouchDB.session().userCtx.name === "fdmanana");
+      T(CouchDB.session().userCtx.roles.indexOf("_admin") !== -1);
+
+      var repDoc = {
+        _id: "foo_rep_del_doc_1",
+        source: dbA.name,
+        target: dbB.name,
+        user_ctx: {
+          name: "joe",
+          roles: ["erlanger"]
+        }
+      };
+
+      T(repDb.save(repDoc).ok);
+
+      waitForRep(repDb, repDoc, "completed");
+      for (var i = 0; i < docs1.length; i++) {
+        var doc = docs1[i];
+        var copy = dbB.open(doc._id);
+        T(copy !== null);
+        T(copy.value === doc.value);
+      }
+
+      // design doc was not replicated, because joe is not an admin of db B
+      var doc = dbB.open(ddoc._id);
+      T(doc === null);
+
+      // now test the same replication but putting the role "god" in the
+      // delegation user context property
+      var repDoc2 = {
+        _id: "foo_rep_del_doc_2",
+        source: dbA.name,
+        target: dbB.name,
+        user_ctx: {
+          name: "joe",
+          roles: ["erlanger", "god"]
+        }
+      };
+      T(repDb.save(repDoc2).ok);
+
+      waitForRep(repDb, repDoc2, "completed");
+      for (var i = 0; i < docs1.length; i++) {
+        var doc = docs1[i];
+        var copy = dbB.open(doc._id);
+        T(copy !== null);
+        T(copy.value === doc.value);
+      }
+
+      // because anyone with a 'god' role is an admin of db B, a replication
+      // that is delegated to a 'god' role can write design docs to db B
+      doc = dbB.open(ddoc._id);
+      T(doc !== null);
+      T(doc.language === ddoc.language);
+    });
+  }
+
+
+  function continuous_replication_survives_restart() {
+    var origRepDbName = CouchDB.request(
+      "GET", "/_config/replicator/db").responseText;
+
+    repDb.deleteDb();
+
+    var xhr = CouchDB.request("PUT", "/_config/replicator/db", {
+      body : JSON.stringify(repDb.name),
+      headers: {"X-Couch-Persist": "false"}
+    });
+    T(xhr.status === 200);
+
+    populate_db(dbA, docs1);
+    populate_db(dbB, []);
+
+    var repDoc = {
+      _id: "foo_cont_rep_survives_doc",
+      source: "http://" + host + "/" + dbA.name,
+      target: dbB.name,
+      continuous: true
+    };
+
+    T(repDb.save(repDoc).ok);
+
+    waitForSeq(dbA, dbB);
+    for (var i = 0; i < docs1.length; i++) {
+      var doc = docs1[i];
+      var copy = dbB.open(doc._id);
+      T(copy !== null);
+      T(copy.value === doc.value);
+    }
+
+    repDb.ensureFullCommit();
+    dbA.ensureFullCommit();
+
+    restartServer();
+
+    xhr = CouchDB.request("PUT", "/_config/replicator/db", {
+      body : JSON.stringify(repDb.name),
+      headers: {"X-Couch-Persist": "false"}
+    });
+
+    T(xhr.status === 200);
+
+    // add another doc to source, it will be replicated to target
+    var docX = {
+      _id: "foo1000",
+      value: 1001
+    };
+
+    T(dbA.save(docX).ok);
+
+    waitForSeq(dbA, dbB);
+    var copy = dbB.open("foo1000");
+    T(copy !== null);
+    T(copy.value === 1001);
+
+    repDoc = repDb.open("foo_cont_rep_survives_doc");
+    T(repDoc !== null);
+    T(repDoc.continuous === true);
+
+    // stop replication
+    T(repDb.deleteDoc(repDoc).ok);
+
+    xhr = CouchDB.request("PUT", "/_config/replicator/db", {
+      body : origRepDbName,
+      headers: {"X-Couch-Persist": "false"}
+    });
+    T(xhr.status === 200);
+  }
+
+
+  function error_state_replication() {
+    populate_db(dbA, docs1);
+
+    var repDoc = {
+      _id: "foo_error_rep",
+      source: dbA.name,
+      target: "nonexistent_test_db"
+    };
+    T(repDb.save(repDoc).ok);
+
+    waitForRep(repDb, repDoc, "error");
+    var repDoc1 = repDb.open(repDoc._id);
+    T(repDoc1 !== null);
+    T(repDoc1.state === "error");
+    T(typeof repDoc1.replication_id  === "string");
+  }
+
+
+  // run all the tests
+  var server_config = [
+    {
+      section: "replicator",
+      key: "db",
+      value: repDb.name
+    }
+  ];
+
+  repDb.deleteDb();
+  run_on_modified_server(server_config, simple_replication);
+
+  repDb.deleteDb();
+  restartServer();
+  run_on_modified_server(server_config, filtered_replication);
+
+  repDb.deleteDb();
+  restartServer();
+  run_on_modified_server(server_config, continuous_replication);
+
+  repDb.deleteDb();
+  restartServer();
+  run_on_modified_server(server_config, by_doc_ids_replication);
+
+  repDb.deleteDb();
+  restartServer();
+  run_on_modified_server(server_config, successive_identical_replications);
+
+  repDb.deleteDb();
+  restartServer();
+  run_on_modified_server(server_config, identical_rep_docs);
+
+  repDb.deleteDb();
+  restartServer();
+  run_on_modified_server(server_config, identical_continuous_rep_docs);
+
+  repDb.deleteDb();
+  restartServer();
+  run_on_modified_server(server_config, rep_db_write_authorization);
+
+  var server_config_2 = server_config.concat([
+    {
+      section: "couch_httpd_auth",
+      key: "authentication_db",
+      value: usersDb.name
+    }
+  ]);
+  repDb.deleteDb();
+  restartServer();
+  run_on_modified_server(server_config_2, test_replication_credentials_delegation);
+
+  repDb.deleteDb();
+  restartServer();
+  continuous_replication_survives_restart();
+
+  repDb.deleteDb();
+  restartServer();
+  run_on_modified_server(server_config, error_state_replication);
+
+
+  // cleanup
+  repDb.deleteDb();
+  usersDb.deleteDb();
+  dbA.deleteDb();
+  dbB.deleteDb();
+};

Modified: couchdb/trunk/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/Makefile.am?rev=982330&r1=982329&r2=982330&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/Makefile.am (original)
+++ couchdb/trunk/src/couchdb/Makefile.am Wed Aug  4 17:05:22 2010
@@ -66,6 +66,7 @@ source_files = \
     couch_rep_reader.erl \
     couch_rep_sup.erl \
     couch_rep_writer.erl \
+    couch_rep_db_listener.erl \
     couch_server.erl \
     couch_server_sup.erl \
     couch_stats_aggregator.erl \
@@ -124,6 +125,7 @@ compiled_files = \
     couch_rep_reader.beam \
     couch_rep_sup.beam \
     couch_rep_writer.beam \
+    couch_rep_db_listener.beam \
     couch_server.beam \
     couch_server_sup.beam \
     couch_stats_aggregator.beam \

Modified: couchdb/trunk/src/couchdb/couch_changes.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_changes.erl?rev=982330&r1=982329&r2=982330&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_changes.erl (original)
+++ couchdb/trunk/src/couchdb/couch_changes.erl Wed Aug  4 17:05:22 2010
@@ -165,7 +165,8 @@ keep_sending_changes(Args, Callback, Db,
     TimeoutFun) ->
     #changes_args{
         feed = ResponseType,
-        limit = Limit
+        limit = Limit,
+        db_open_options = DbOptions
     } = Args,
     % ?LOG_INFO("send_changes start ~p",[StartSeq]),
     {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(
@@ -179,7 +180,8 @@ keep_sending_changes(Args, Callback, Db,
         case wait_db_updated(Timeout, TimeoutFun) of
         updated ->
             % ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]),
-            case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of
+            DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
+            case couch_db:open(Db#db.name, DbOptions1) of
             {ok, Db2} ->
                 keep_sending_changes(
                     Args#changes_args{limit=NewLimit},

Modified: couchdb/trunk/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.hrl?rev=982330&r1=982329&r2=982330&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.hrl (original)
+++ couchdb/trunk/src/couchdb/couch_db.hrl Wed Aug  4 17:05:22 2010
@@ -289,6 +289,7 @@
     heartbeat,
     timeout,
     filter = "",
-    include_docs = false
+    include_docs = false,
+    db_open_options = []
 }).
 

Modified: couchdb/trunk/src/couchdb/couch_js_functions.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_js_functions.hrl?rev=982330&r1=982329&r2=982330&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_js_functions.hrl (original)
+++ couchdb/trunk/src/couchdb/couch_js_functions.hrl Wed Aug  4 17:05:22 2010
@@ -95,3 +95,76 @@
         }
     }
 ">>).
+
+
+-define(REP_DB_DOC_VALIDATE_FUN, <<"
+    function(newDoc, oldDoc, userCtx) {
+        var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
+        var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
+
+        if (oldDoc && !newDoc._deleted && !isReplicator) {
+            throw({forbidden:
+                'Only the replicator can edit replication documents. ' +
+                'Admins can only add and delete replication documents.'
+            });
+        } else if (!isAdmin) {
+            throw({forbidden:
+                'Only admins may add/delete replication documents.'
+            });
+        }
+
+        if (!oldDoc && newDoc.state) {
+            throw({forbidden:
+                'The state field can only be set by the replicator.'
+            });
+        }
+
+        if (!oldDoc && newDoc.replication_id) {
+            throw({forbidden:
+                'The replication_id field can only be set by the replicator.'
+            });
+        }
+
+        if (newDoc.user_ctx) {
+            var user_ctx = newDoc.user_ctx;
+
+            if (typeof user_ctx !== 'object') {
+                throw({forbidden: 'The user_ctx property must be an object.'});
+            }
+
+            if (!(user_ctx.name === null ||
+                    (typeof user_ctx.name === 'undefined') ||
+                    ((typeof user_ctx.name === 'string') &&
+                        user_ctx.name.length > 0))) {
+                throw({forbidden:
+                    'The name property of the user_ctx must be a ' +
+                    'non-empty string.'
+                });
+            }
+
+            if ((typeof user_ctx.roles !== 'undefined') &&
+                    (typeof user_ctx.roles.length !== 'number')) {
+                throw({forbidden:
+                    'The roles property of the user_ctx must be ' +
+                    'an array of strings.'
+                });
+            }
+
+            if (user_ctx.roles) {
+                for (var i = 0; i < user_ctx.roles.length; i++) {
+                    var role = user_ctx.roles[i];
+
+                    if (typeof role !== 'string' || role.length === 0) {
+                        throw({forbidden: 'Roles must be non-empty strings.'});
+                    }
+                    if (role[0] === '_') {
+                        throw({forbidden:
+                            'System roles (starting with underscore) ' +
+                            'are not allowed.'
+                        });
+                    }
+                }
+            }
+        }
+    }
+">>).

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=982330&r1=982329&r2=982330&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Wed Aug  4 17:05:22 2010
@@ -16,8 +16,12 @@
     code_change/3]).
 
 -export([replicate/2, checkpoint/1]).
+-export([ensure_rep_db_exists/0, make_replication_id/2]).
+-export([start_replication/3, end_replication/1, get_result/4]).
+-export([update_rep_doc/2]).
 
 -include("couch_db.hrl").
+-include("couch_js_functions.hrl").
 
 -define(REP_ID_VERSION, 2).
 
@@ -48,7 +52,8 @@
     committed_seq = 0,
 
     stats = nil,
-    doc_ids = nil
+    doc_ids = nil,
+    rep_doc = nil
 }).
 
 %% convenience function to do a simple replication from the shell
@@ -61,58 +66,63 @@ replicate(Source, Target) when is_binary
 
 %% function handling POST to _replicate
 replicate({Props}=PostBody, UserCtx) ->
-    BaseId = make_replication_id({Props}, UserCtx, ?REP_ID_VERSION),
-    Extension = maybe_append_options(
-        [<<"continuous">>, <<"create_target">>], Props),
-    Replicator = {BaseId ++ Extension,
-        {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
+    RepId = make_replication_id(PostBody, UserCtx),
+    case couch_util:get_value(<<"cancel">>, Props, false) of
+    true ->
+        end_replication(RepId);
+    false ->
+        Server = start_replication(PostBody, RepId, UserCtx),
+        get_result(Server, RepId, PostBody, UserCtx)
+    end.
+
+end_replication({BaseId, Extension}) ->
+    RepId = BaseId ++ Extension,
+    case supervisor:terminate_child(couch_rep_sup, RepId) of
+    {error, not_found} = R ->
+        R;
+    ok ->
+        ok = supervisor:delete_child(couch_rep_sup, RepId),
+        {ok, {cancelled, ?l2b(BaseId)}}
+    end.
+
+start_replication(RepDoc, {BaseId, Extension}, UserCtx) ->
+    Replicator = {
+        BaseId ++ Extension,
+        {gen_server, start_link,
+            [?MODULE, [BaseId, RepDoc, UserCtx], []]},
         temporary,
         1,
         worker,
         [?MODULE]
     },
-
-    case couch_util:get_value(<<"cancel">>, Props, false) of
-    true ->
- case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of
-        {error, not_found} ->
-     {error, not_found};
-        ok ->
-     ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension),
-            {ok, {cancelled, ?l2b(BaseId)}}
- end;
-    false ->
-        Server = start_replication_server(Replicator),
-
-        case couch_util:get_value(<<"continuous">>, Props, false) of
-        true ->
-            {ok, {continuous, ?l2b(BaseId)}};
-        false ->
-            get_result(Server, PostBody, UserCtx)
-        end
-    end.
+    start_replication_server(Replicator).
 
 checkpoint(Server) ->
     gen_server:cast(Server, do_checkpoint).
 
-get_result(Server, PostBody, UserCtx) ->
-    try gen_server:call(Server, get_result, infinity) of
-    retry -> replicate(PostBody, UserCtx);
-    Else -> Else
-    catch
-    exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} ->
-        %% oops, this replication just finished -- restart it.
-        replicate(PostBody, UserCtx);
-    exit:{normal, {gen_server, call, [Server, get_result , infinity]}} ->
-        %% we made the call during terminate
-        replicate(PostBody, UserCtx)
+get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) ->
+    case couch_util:get_value(<<"continuous">>, Props, false) of
+    true ->
+        {ok, {continuous, ?l2b(BaseId)}};
+    false ->
+        try gen_server:call(Server, get_result, infinity) of
+        retry -> replicate(PostBody, UserCtx);
+        Else -> Else
+        catch
+        exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} ->
+            %% oops, this replication just finished -- restart it.
+            replicate(PostBody, UserCtx);
+        exit:{normal, {gen_server, call, [Server, get_result, infinity]}} ->
+            %% we made the call during terminate
+            replicate(PostBody, UserCtx)
+        end
     end.
 
 init(InitArgs) ->
     try do_init(InitArgs)
     catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end.
 
-do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
+do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
     process_flag(trap_exit, true),
 
     SourceProps = couch_util:get_value(<<"source">>, PostProps),
@@ -130,6 +140,8 @@ do_init([RepId, {PostProps}, UserCtx] = 
     SourceInfo = dbinfo(Source),
     TargetInfo = dbinfo(Target),
 
+    maybe_set_triggered(RepDoc, RepId),
+
     case DocIds of
     List when is_list(List) ->
         % Fast replication using only a list of doc IDs to replicate.
@@ -199,7 +211,8 @@ do_init([RepId, {PostProps}, UserCtx] = 
         rep_starttime = httpd_util:rfc1123_date(),
         src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
         tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
-        doc_ids = DocIds
+        doc_ids = DocIds,
+        rep_doc = RepDoc
     },
     {ok, State}.
 
@@ -256,29 +269,34 @@ handle_info({'EXIT', _Pid, Reason}, Stat
     {stop, Reason, State}.
 
 terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
-    do_terminate(State);
+    do_terminate(State),
+    update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]);
     
 terminate(normal, State) ->
     timer:cancel(State#state.checkpoint_scheduled),
-    do_terminate(do_checkpoint(State));
+    do_terminate(do_checkpoint(State)),
+    update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"completed">>}]);
 
-terminate(Reason, State) ->
-    #state{
-        listeners = Listeners,
-        source = Source,
-        target = Target,
-        stats = Stats
-    } = State,
+terminate(shutdown, #state{listeners = Listeners} = State) ->
+    % continuous replication stopped
+    [gen_server:reply(L, {ok, stopped}) || L <- Listeners],
+    do_forced_terminate(State);
+
+terminate(Reason, #state{listeners = Listeners} = State) ->
     [gen_server:reply(L, {error, Reason}) || L <- Listeners],
-    ets:delete(Stats),
-    close_db(Target),
-    close_db(Source).
+    do_forced_terminate(State),
+    update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"error">>}]).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 % internal funs
 
+do_forced_terminate(#state{source = Source, target = Target, stats = Stats}) ->
+    ets:delete(Stats),
+    close_db(Target),
+    close_db(Source).
+
 start_replication_server(Replicator) ->
     RepId = element(1, Replicator),
     case supervisor:start_child(couch_rep_sup, Replicator) of
@@ -449,7 +467,7 @@ has_session_id(SessionId, [{Props} | Res
         has_session_id(SessionId, Rest)
     end.
 
-maybe_append_options(Options, Props) ->
+maybe_append_options(Options, {Props}) ->
     lists:foldl(fun(Option, Acc) ->
         Acc ++
         case couch_util:get_value(Option, Props, false) of
@@ -460,6 +478,12 @@ maybe_append_options(Options, Props) ->
         end
     end, [], Options).
 
+make_replication_id(RepProps, UserCtx) ->
+    BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION),
+    Extension = maybe_append_options(
+                  [<<"continuous">>, <<"create_target">>], RepProps),
+    {BaseId, Extension}.
+
 % Versioned clauses for generating replication ids
 % If a change is made to how replications are identified
 % add a new clause and increase ?REP_ID_VERSION at the top
@@ -785,3 +809,79 @@ parse_proxy_params(ProxyUrl) ->
         true ->
             [{proxy_user, User}, {proxy_password, Passwd}]
         end.
+
+update_rep_doc({Props} = _RepDoc, KVs) ->
+    case couch_util:get_value(<<"_id">>, Props) of
+    undefined ->
+        % replication triggered by POSTing to _replicate/
+        ok;
+    RepDocId ->
+        % replication triggered by adding a Rep Doc to the replicator DB
+        {ok, RepDb} = ensure_rep_db_exists(),
+        case couch_db:open_doc(RepDb, RepDocId, []) of
+        {ok, LatestRepDoc} ->
+            update_rep_doc(RepDb, LatestRepDoc, KVs);
+        _ ->
+            ok
+        end,
+        couch_db:close(RepDb)
+    end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+    NewRepDocBody = lists:foldl(
+        fun({K, _V} = KV, Body) ->
+            lists:keystore(K, 1, Body, KV)
+        end,
+        RepDocBody,
+        KVs
+    ),
+    % might not succeed - when the replication doc is deleted right
+    % before this update (not an error)
+    couch_db:update_doc(
+        RepDb,
+        RepDoc#doc{body = {NewRepDocBody}},
+        []
+    ).
+
+maybe_set_triggered({RepProps} = RepDoc, RepId) ->
+    case couch_util:get_value(<<"state">>, RepProps) of
+    <<"triggered">> ->
+        ok;
+    _ ->
+        update_rep_doc(
+            RepDoc,
+            [
+                {<<"state">>, <<"triggered">>},
+                {<<"replication_id">>, ?l2b(RepId)}
+            ]
+        )
+    end.
+
+ensure_rep_db_exists() ->
+    DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+    Opts = [
+        {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
+        sys_db
+    ],
+    case couch_db:open(DbName, Opts) of
+    {ok, Db} ->
+        Db;
+    _Error ->
+        {ok, Db} = couch_db:create(DbName, Opts)
+    end,
+    ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+    {ok, Db}.
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+    case couch_db:open_doc(RepDb, DDocID, []) of
+    {ok, _Doc} ->
+        ok;
+    _ ->
+        DDoc = couch_doc:from_json_obj({[
+            {<<"_id">>, DDocID},
+            {<<"language">>, <<"javascript">>},
+            {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+        ]}),
+        {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+    end,
+    ok.

Added: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_listener.erl?rev=982330&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (added)
+++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Wed Aug  4 17:05:22 2010
@@ -0,0 +1,232 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_db_listener).
+-behaviour(gen_server).
+
+-export([start_link/0, init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+
+-define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id).
+-define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id).
+
+-record(state, {
+    changes_feed_loop,
+    changes_queue,
+    changes_processor
+}).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init(_) ->
+    process_flag(trap_exit, true),
+    {ok, Queue} = couch_work_queue:new(1024 * 1024, 1000),
+    {ok, Processor} = changes_processor(Queue),
+    {ok, Loop} = changes_feed_loop(Queue),
+    Server = self(),
+    ok = couch_config:register(
+        fun("replicator", "db") ->
+            ok = gen_server:call(Server, rep_db_changed, infinity)
+        end
+    ),
+    {ok, #state{
+        changes_feed_loop = Loop,
+        changes_queue = Queue,
+        changes_processor = Processor}
+    }.
+
+handle_call(rep_db_changed, _From, State) ->
+    #state{
+        changes_feed_loop = Loop,
+        changes_queue = Queue
+    } = State,
+    exit(Loop, rep_db_changed),
+    {ok, NewLoop} = changes_feed_loop(Queue),
+    {reply, ok, State#state{changes_feed_loop = NewLoop}}.
+
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+
+handle_info({'EXIT', _OldChangesLoop, rep_db_changed}, State) ->
+    {noreply, State};
+
+handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) ->
+    ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]),
+    {stop, rep_db_changes_processor_error, State}.
+
+
+terminate(_Reason, State) ->
+    #state{
+        changes_feed_loop = Loop,
+        changes_queue = Queue
+    } = State,
+    exit(Loop, stop),
+    % closing the queue will cause changes_processor to shutdown
+    couch_work_queue:close(Queue),
+    ok.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+changes_feed_loop(ChangesQueue) ->
+    {ok, RepDb} = couch_rep:ensure_rep_db_exists(),
+    Pid = spawn_link(
+        fun() ->
+            ChangesFeedFun = couch_changes:handle_changes(
+                #changes_args{
+                    include_docs = true,
+                    feed = "continuous",
+                    timeout = infinity,
+                    db_open_options = [sys_db]
+                },
+                {json_req, null},
+                RepDb
+            ),
+            ChangesFeedFun(
+                fun({change, Change, _}, _) ->
+                    case has_valid_rep_id(Change) of
+                    true ->
+                        couch_work_queue:queue(ChangesQueue, Change);
+                    false ->
+                        ok
+                    end;
+                (_, _) ->
+                    ok
+                end
+            )
+        end
+    ),
+    couch_db:close(RepDb),
+    {ok, Pid}.
+
+
+changes_processor(ChangesQueue) ->
+    Pid = spawn_link(
+        fun() ->
+            ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]),
+            ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]),
+            consume_changes(ChangesQueue),
+            true = ets:delete(?REP_ID_TO_DOC_ID_MAP),
+            true = ets:delete(?DOC_TO_REP_ID_MAP)
+        end
+    ),
+    {ok, Pid}.
+
+
+consume_changes(ChangesQueue) ->
+    case couch_work_queue:dequeue(ChangesQueue) of
+    closed ->
+        ok;
+    {ok, Changes} ->
+        lists:foreach(fun process_change/1, Changes),
+        consume_changes(ChangesQueue)
+    end.
+
+
+has_valid_rep_id({Change}) ->
+    has_valid_rep_id(couch_util:get_value(<<"id">>, Change));
+has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
+    false;
+has_valid_rep_id(_Else) ->
+    true.
+
+
+process_change({Change}) ->
+    {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change),
+    case couch_util:get_value(<<"deleted">>, Change, false) of
+    true ->
+        maybe_stop_replication(JsonRepDoc);
+    false ->
+        case couch_util:get_value(<<"state">>, RepProps) of
+        <<"completed">> ->
+            maybe_stop_replication(JsonRepDoc);
+        <<"error">> ->
+            % cleanup ets table entries
+            maybe_stop_replication(JsonRepDoc);
+        <<"triggered">> ->
+            maybe_start_replication(JsonRepDoc);
+        undefined ->
+            case couch_util:get_value(<<"replication_id">>, RepProps) of
+            undefined ->
+                maybe_start_replication(JsonRepDoc);
+            _ ->
+                ok
+            end
+        end
+    end,
+    ok.
+
+
+rep_user_ctx({RepDoc}) ->
+    case couch_util:get_value(<<"user_ctx">>, RepDoc) of
+    undefined ->
+        #user_ctx{roles = [<<"_admin">>]};
+    {UserCtx} ->
+        #user_ctx{
+            name = couch_util:get_value(<<"name">>, UserCtx, null),
+            roles = couch_util:get_value(<<"roles">>, UserCtx, [])
+        }
+    end.
+
+
+maybe_start_replication({RepProps} = JsonRepDoc) ->
+    UserCtx = rep_user_ctx(JsonRepDoc),
+    RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
+    DocId = couch_util:get_value(<<"_id">>, RepProps),
+    case ets:lookup(?REP_ID_TO_DOC_ID_MAP, RepId) of
+    [] ->
+        true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {RepId, DocId}),
+        true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}),
+        spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end);
+    [{RepId, DocId}] ->
+        ok;
+    [{RepId, _OtherDocId}] ->
+        couch_rep:update_rep_doc(
+            JsonRepDoc, [{<<"replication_id">>, ?l2b(element(1, RepId))}]
+        )
+    end.
+
+
+start_replication(RepDoc, RepId, UserCtx) ->
+    case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
+    RepPid when is_pid(RepPid) ->
+        couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx);
+    Error ->
+        couch_rep:update_rep_doc(
+            RepDoc,
+            [
+                {<<"state">>, <<"error">>},
+                {<<"replication_id">>, ?l2b(element(1, RepId))}
+            ]
+        ),
+        ?LOG_ERROR("Error starting replication ~p: ~p", [RepId, Error])
+    end.
+
+
+maybe_stop_replication({RepProps}) ->
+    DocId = couch_util:get_value(<<"_id">>, RepProps),
+    case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of
+    [{DocId, RepId}] ->
+        couch_rep:end_replication(RepId),
+        true = ets:delete(?REP_ID_TO_DOC_ID_MAP, RepId),
+        true = ets:delete(?DOC_TO_REP_ID_MAP, DocId);
+    [] ->
+        ok
+    end.

Modified: couchdb/trunk/src/couchdb/couch_server.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_server.erl?rev=982330&r1=982329&r2=982330&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_server.erl (original)
+++ couchdb/trunk/src/couchdb/couch_server.erl Wed Aug  4 17:05:22 2010
@@ -76,6 +76,7 @@ check_dbname(#server{dbname_regexp=RegEx
     nomatch ->
         case DbName of
             "_users" -> ok;
+            "_replicator" -> ok;
             _Else ->
                 {error, illegal_database_name}
             end;