You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2009/01/09 23:20:49 UTC

svn commit: r733174 - in /couchdb/trunk: share/www/script/ src/couchdb/

Author: damien
Date: Fri Jan  9 14:20:48 2009
New Revision: 733174

URL: http://svn.apache.org/viewvc?rev=733174&view=rev
Log:
Added support so clients can detect if a server has potentially lost commits after multiple updates, like during bulk imports and so the replicator can detect lost commits on remote replications.

Modified:
    couchdb/trunk/share/www/script/couch_tests.js
    couchdb/trunk/src/couchdb/couch_db.erl
    couchdb/trunk/src/couchdb/couch_db.hrl
    couchdb/trunk/src/couchdb/couch_db_updater.erl
    couchdb/trunk/src/couchdb/couch_file.erl
    couchdb/trunk/src/couchdb/couch_httpd_db.erl
    couchdb/trunk/src/couchdb/couch_rep.erl
    couchdb/trunk/src/couchdb/couch_view_group.erl

Modified: couchdb/trunk/share/www/script/couch_tests.js
URL: http://svn.apache.org/viewvc/couchdb/trunk/share/www/script/couch_tests.js?rev=733174&r1=733173&r2=733174&view=diff
==============================================================================
--- couchdb/trunk/share/www/script/couch_tests.js [utf-8] (original)
+++ couchdb/trunk/share/www/script/couch_tests.js [utf-8] Fri Jan  9 14:20:48 2009
@@ -136,6 +136,89 @@
     // make sure we can still open
     T(db.open(existingDoc._id, {rev: existingDoc._rev}) != null);
   },
+  
+  delayed_commits: function(debug) {
+    var db = new CouchDB("test_suite_db");
+    db.deleteDb();
+    db.createDb();
+    if (debug) debugger;
+    
+    // By default, couchdb doesn't fully commit documents to disk right away,
+    // it waits about a second to batch the full commit flush along with any 
+    // other updates. If it crashes or is restarted you may lose the most
+    // recent commits.
+    
+    T(db.save({_id:"1",a:2,b:4}).ok);
+    T(db.open("1") != null);
+    
+    restartServer();
+    
+    T(db.open("1") == null); // lost the update.
+    // note if we waited > 1 sec before the restart, the doc would likely
+    // commit.
+    
+    
+    // Retry the same thing but with full commits on.
+    
+    var db2 = new CouchDB("test_suite_db", {"X-Couch-Full-Commit":"true"});
+    
+    T(db2.save({_id:"1",a:2,b:4}).ok);
+    T(db2.open("1") != null);
+    
+    restartServer();
+    
+    T(db2.open("1") != null);
+    
+    // You can update but without committing immediately, and then ensure
+    // everything is commited in the last step.
+    
+    T(db.save({_id:"2",a:2,b:4}).ok);
+    T(db.open("2") != null);
+    T(db.ensureFullCommit().ok);
+    restartServer();
+    
+    T(db.open("2") != null);
+    
+    // However, it's possible even when flushed, that the server crashed between
+    // the update and the commit, and you don't want to check to make sure
+    // every doc you updated actually made it to disk. So record the instance
+    // start time of the database before the updates and then check it again
+    // after the flush (the instance start time is returned by the flush
+    // operation). if they are the same, we know everything was updated
+    // safely.
+    
+    // First try it with a crash.
+    
+    var instanceStartTime = db.info().instance_start_time;
+    
+    T(db.save({_id:"3",a:2,b:4}).ok);
+    T(db.open("3") != null);
+    
+    restartServer();
+    
+    var commitResult = db.ensureFullCommit();
+    T(commitResult.ok && commitResult.instance_start_time != instanceStartTime);
+    // start times don't match, meaning the server lost our change
+    
+    T(db.open("3") == null); // yup lost it
+    
+    // retry with no server restart
+    
+    var instanceStartTime = db.info().instance_start_time;
+    
+    T(db.save({_id:"4",a:2,b:4}).ok);
+    T(db.open("4") != null);
+    
+    var commitResult = db.ensureFullCommit();
+    T(commitResult.ok && commitResult.instance_start_time == instanceStartTime);
+    // Successful commit, start times match!
+    
+    restartServer();
+    
+    T(db.open("4") != null);
+    
+  },
+  
   all_docs: function(debug) {
     var db = new CouchDB("test_suite_db");
     db.deleteDb();
@@ -1881,97 +1964,153 @@
       dbA.createDb();
       dbB.deleteDb();
       dbB.createDb();
-
-      var docs = makeDocs(0, numDocs);
-      T(dbA.bulkSave(docs).ok);
-
-      T(CouchDB.replicate(A, B).ok);
-
-      for (var j = 0; j < numDocs; j++) {
-        docA = dbA.open("" + j);
-        docB = dbB.open("" + j);
-        T(docA._rev == docB._rev);
-      }
-
-      // check documents with a '/' in the ID
-      // need to re-encode the slash when replicating from a remote source
-      dbA.save({ _id:"abc/def", val:"one" });
       
-      T(CouchDB.replicate(A, B).ok);
-      T(CouchDB.replicate(B, A).ok);
+      var repTests = {
+        // copy and paste and put your code in. delete unused steps.
+        test_template: new function () {
+          this.init = function(dbA, dbB) {
+            // before anything has happened
+          }
+          this.afterAB1 = function(dbA, dbB) {
+            // called after replicating src=A  tgt=B first time.
+          };
+          this.afterBA1 = function(dbA, dbB) {
+            // called after replicating src=B  tgt=A first time.
+          };
+          this.afterAB2 = function(dbA, dbB) {
+            // called after replicating src=A  tgt=B second time. 
+          };
+          this.afterBA2 = function(dbA, dbB) {
+            // etc...
+          };
+        },
+        
+        simple_test: new function () {
+          this.init = function(dbA, dbB) {
+            var docs = makeDocs(0, numDocs);
+            T(dbA.bulkSave(docs).ok);
+          };
+        
+          this.afterAB1 = function(dbA, dbB) {          
+            for (var j = 0; j < numDocs; j++) {
+              var docA = dbA.open("" + j);
+              var docB = dbB.open("" + j);
+              T(docA._rev == docB._rev);
+            }
+          };
+        },
       
-      docA = dbA.open("abc/def");
-      docB = dbB.open("abc/def");
-      T(docA._rev == docB._rev);
+       deletes_test: new function () {
+          this.init = function(dbA, dbB) {
+            T(dbA.save({_id:"foo1",value:"a"}).ok);
+          };
+          
+          this.afterAB1 = function(dbA, dbB) {
+            var docA = dbA.open("foo1");
+            var docB = dbB.open("foo1");
+            T(docA._rev == docB._rev);
+
+            dbA.deleteDoc(docA);
+          };
+          
+          this.afterAB2 = function(dbA, dbB) {
+            T(dbA.open("foo1") == null);
+            T(dbB.open("foo1") == null);
+          };
+        },
+        
+        slashes_in_ids_test: new function () {
+          // make sure docs with slashes in id replicate properly
+          this.init = function(dbA, dbB) {
+            dbA.save({ _id:"abc/def", val:"one" });
+          };
+          
+          this.afterAB1 = function(dbA, dbB) {
+            var docA = dbA.open("abc/def");
+            var docB = dbB.open("abc/def");
+            T(docA._rev == docB._rev);
+          };
+        },
       
-      // now check binary attachments
-      var binDoc = {
-        _id:"bin_doc",
-        _attachments:{
-          "foo.txt": {
-            "type":"base64",
-            "data": "VGhpcyBpcyBhIGJhc2U2NCBlbmNvZGVkIHRleHQ="
-          }
+        attachments_test: new function () {
+          // Test attachments
+          this.init = function(dbA, dbB) {
+            dbA.save({
+              _id:"bin_doc",
+              _attachments:{
+                "foo.txt": {
+                  "type":"base64",
+                  "data": "VGhpcyBpcyBhIGJhc2U2NCBlbmNvZGVkIHRleHQ="
+                }
+              }
+            });
+          };
+          
+          this.afterAB1 = function(dbA, dbB) {
+            var xhr = CouchDB.request("GET", "/test_suite_db_a/bin_doc/foo.txt");
+            T(xhr.responseText == "This is a base64 encoded text")
+
+            xhr = CouchDB.request("GET", "/test_suite_db_b/bin_doc/foo.txt");
+            T(xhr.responseText == "This is a base64 encoded text")
+          };
+        },
+        
+        conflicts_test: new function () {
+          // test conflicts
+          this.init = function(dbA, dbB) {
+            dbA.save({_id:"foo",value:"a"});
+            dbB.save({_id:"foo",value:"b"});
+          };
+          
+          this.afterBA1 = function(dbA, dbB) {            
+            var docA = dbA.open("foo", {conflicts: true});
+            var docB = dbB.open("foo", {conflicts: true});
+
+            // make sure the same rev is in each db
+            T(docA._rev === docB._rev);
+
+            // make sure the conflicts are the same in each db
+            T(docA._conflicts[0] === docB._conflicts[0]);
+
+            // delete a conflict.
+            dbA.deleteDoc({_id:"foo", _rev:docA._conflicts[0]});
+          };
+          
+          this.afterBA2 = function(dbA, dbB) {            
+            // open documents and include the conflict meta data
+            var docA = dbA.open("foo", {conflicts: true});
+            var docB = dbB.open("foo", {conflicts: true});
+
+            // We should have no conflicts this time
+            T(docA._conflicts === undefined)
+            T(docB._conflicts === undefined);
+          };
         }
-      }
-
-      dbA.save(binDoc);
-
-      T(CouchDB.replicate(A, B).ok);
-      T(CouchDB.replicate(B, A).ok);
-
-      xhr = CouchDB.request("GET", "/test_suite_db_a/bin_doc/foo.txt");
-      T(xhr.responseText == "This is a base64 encoded text")
-
-      xhr = CouchDB.request("GET", "/test_suite_db_b/bin_doc/foo.txt");
-      T(xhr.responseText == "This is a base64 encoded text")
-
-      dbA.save({_id:"foo1",value:"a"});
-
-      T(CouchDB.replicate(A, B).ok);
-      T(CouchDB.replicate(B, A).ok);
-
-      docA = dbA.open("foo1");
-      docB = dbB.open("foo1");
-      T(docA._rev == docB._rev);
-
-      dbA.deleteDoc(docA);
-
+      };
+      var test;
+      for(test in repTests)
+        if(repTests[test].init) repTests[test].init(dbA, dbB);
+      
       T(CouchDB.replicate(A, B).ok);
+      
+      for(test in repTests)
+        if(repTests[test].afterAB1) repTests[test].afterAB1(dbA, dbB);
+        
       T(CouchDB.replicate(B, A).ok);
-
-      T(dbA.open("foo1") == null);
-      T(dbB.open("foo1") == null);
-
-      dbA.save({_id:"foo",value:"a"});
-      dbB.save({_id:"foo",value:"b"});
-
+      
+      for(test in repTests)
+        if(repTests[test].afterBA1) repTests[test].afterBA1(dbA, dbB);
+      
       T(CouchDB.replicate(A, B).ok);
+      
+      for(test in repTests)
+        if(repTests[test].afterAB2) repTests[test].afterAB2(dbA, dbB);
+        
       T(CouchDB.replicate(B, A).ok);
-
-      // open documents and include the conflict meta data
-      docA = dbA.open("foo", {conflicts: true});
-      docB = dbB.open("foo", {conflicts: true});
-
-      // make sure the same rev is in each db
-      T(docA._rev === docB._rev);
-
-      // make sure the conflicts are the same in each db
-      T(docA._conflicts[0] === docB._conflicts[0]);
-
-      // delete a conflict.
-      dbA.deleteDoc({_id:"foo", _rev:docA._conflicts[0]});
-
-      // replicate the change
-      T(CouchDB.replicate(A, B).ok);
-
-      // open documents and include the conflict meta data
-      docA = dbA.open("foo", {conflicts: true});
-      docB = dbB.open("foo", {conflicts: true});
-
-      // We should have no conflicts this time
-      T(docA._conflicts === undefined)
-      T(docB._conflicts === undefined);
+      
+      for(test in repTests)
+        if(repTests[test].afterBA2) repTests[test].afterBA2(dbA, dbB);
+      
     }
   },
 

Modified: couchdb/trunk/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.erl?rev=733174&r1=733173&r2=733174&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_db.erl Fri Jan  9 14:20:48 2009
@@ -64,8 +64,9 @@
 open(DbName, Options) ->
     couch_server:open(DbName, Options).
 
-ensure_full_commit(#db{update_pid=UpdatePid}) ->
-    gen_server:call(UpdatePid, full_commit, infinity).
+ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) ->
+    ok = gen_server:call(UpdatePid, full_commit, infinity),
+    {ok, StartTime}.
 
 close(#db{fd=Fd}) ->
     couch_file:drop_ref(Fd).
@@ -166,7 +167,8 @@
         compactor_pid=Compactor,
         update_seq=SeqNum,
         name=Name,
-        fulldocinfo_by_id_btree=FullDocBtree} = Db,
+        fulldocinfo_by_id_btree=FullDocBtree,
+        instance_start_time=StartTime} = Db,
     {ok, Size} = couch_file:bytes(Fd),
     {ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree),
     InfoList = [
@@ -176,7 +178,8 @@
         {update_seq, SeqNum},
         {purge_seq, couch_db:get_purge_seq(Db)},
         {compact_running, Compactor/=nil},
-        {disk_size, Size}
+        {disk_size, Size},
+        {instance_start_time, StartTime}
         ],
     {ok, InfoList}.
 

Modified: couchdb/trunk/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.hrl?rev=733174&r1=733173&r2=733174&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.hrl (original)
+++ couchdb/trunk/src/couchdb/couch_db.hrl Fri Jan  9 14:20:48 2009
@@ -120,6 +120,7 @@
     {main_pid=nil,
     update_pid=nil,
     compactor_pid=nil,
+    instance_start_time, % number of microsecs since jan 1 1970 as a binary string
     fd,
     header = #db_header{},
     summary_stream,

Modified: couchdb/trunk/src/couchdb/couch_db_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db_updater.erl?rev=733174&r1=733173&r2=733174&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db_updater.erl (original)
+++ couchdb/trunk/src/couchdb/couch_db_updater.erl Fri Jan  9 14:20:48 2009
@@ -279,6 +279,12 @@
     AdminsPtr ->
         {ok, Admins} = couch_file:pread_term(Fd, AdminsPtr)
     end,
+    
+    % convert start time tuple to microsecs and store as a binary string
+    {MegaSecs, Secs, MicroSecs} = now(),
+    StartTime = ?l2b(io_lib:format("~p",
+            [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
+    
     #db{
         update_pid=self(),
         fd=Fd,
@@ -291,7 +297,9 @@
         name = DbName,
         filepath = Filepath,
         admins = Admins,
-        admins_ptr = AdminsPtr}.
+        admins_ptr = AdminsPtr,
+        instance_start_time = StartTime
+        }.
 
 
 close_db(#db{fd=Fd,summary_stream=Ss}) ->

Modified: couchdb/trunk/src/couchdb/couch_file.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_file.erl?rev=733174&r1=733173&r2=733174&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_file.erl (original)
+++ couchdb/trunk/src/couchdb/couch_file.erl Fri Jan  9 14:20:48 2009
@@ -170,7 +170,8 @@
     Result.
     
 close_maybe(Fd) ->
-    gen_server:cast(Fd, {close_maybe, self()}).
+    catch unlink(Fd),
+    catch gen_server:cast(Fd, close_maybe).
 
 drop_ref(Fd) ->
     drop_ref(Fd, self()).
@@ -372,8 +373,7 @@
 
 handle_cast(close, Fd) ->
     {stop,normal,Fd};
-handle_cast({close_maybe, Pid}, Fd) ->
-    catch unlink(Pid),
+handle_cast(close_maybe, Fd) ->
     maybe_close_async(Fd);
 handle_cast({drop_ref, Pid}, Fd) ->
     case get(Pid) of

Modified: couchdb/trunk/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_db.erl?rev=733174&r1=733173&r2=733174&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_db.erl Fri Jan  9 14:20:48 2009
@@ -90,9 +90,10 @@
     send_method_not_allowed(Req, "DELETE,GET,HEAD,POST");
 
 db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) ->
-    ok = couch_db:ensure_full_commit(Db),
+    {ok, DbStartTime} = couch_db:ensure_full_commit(Db),
     send_json(Req, 201, {[
-            {ok, true}
+            {ok, true},
+            {instance_start_time, DbStartTime}
         ]});
     
 db_req(#httpd{path_parts=[_,<<"_ensure_full_commit">>]}=Req, _Db) ->

Modified: couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=733174&r1=733173&r2=733174&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep.erl Fri Jan  9 14:20:48 2009
@@ -69,7 +69,10 @@
     RepRecKey = <<?LOCAL_DOC_PREFIX, HostNameBin/binary, 
             ":", Source/binary, ":", Target/binary>>,
     
-    StartTime = httpd_util:rfc1123_date(),
+    ReplicationStartTime = httpd_util:rfc1123_date(),
+    
+    {ok, SrcInstanceStartTime} = get_db_info(DbSrc),
+    {ok, TgtInstanceStartTime} = get_db_info(DbTgt),
     
     case proplists:get_value(full, Options, false)
         orelse proplists:get_value("full", Options, false) of
@@ -115,9 +118,28 @@
         % nothing changed, don't record results
         {ok, {OldRepHistoryProps}};
     false ->
+        % commit changes to both src and tgt. The src because if changes
+        % we replicated are lost, we'll record the a seq number of ahead 
+        % of what was committed and therefore lose future changes with the
+        % same seq nums.
+        
+        {ok, SrcInstanceStartTime2} = ensure_full_commit(DbSrc),
+        {ok, TgtInstanceStartTime2} = ensure_full_commit(DbTgt),
+        
+        RecordSeqNum =
+        if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
+                TgtInstanceStartTime2 == TgtInstanceStartTime ->
+            NewSeqNum;
+        true ->
+            ?LOG_INFO("A server has restarted sinced replication start. "
+                "Not recording the new sequence number to ensure the "
+                "replication is redone and documents reexamined.", []),
+            SeqNum
+        end,
+        
         HistEntries =[
             {
-                [{<<"start_time">>, list_to_binary(StartTime)},
+                [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
                 {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
                 {<<"start_last_seq">>, SeqNum},
                 {<<"end_last_seq">>, NewSeqNum} | Stats]}
@@ -126,7 +148,7 @@
         NewRepHistory =
             {
                 [{<<"session_id">>, couch_util:new_uuid()},
-                {<<"source_last_seq">>, NewSeqNum},
+                {<<"source_last_seq">>, RecordSeqNum},
                 {<<"history">>, lists:sublist(HistEntries, 50)}]},
 
         {ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []),
@@ -276,6 +298,21 @@
 close_db(Db)->
     couch_db:close(Db).
 
+get_db_info(#http_db{uri=DbUrl, headers=Headers}) ->
+    {DbProps} = do_http_request(DbUrl, get, Headers),
+    {ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]};
+get_db_info(Db) ->
+    couch_db:get_db_info(Db).
+
+
+ensure_full_commit(#http_db{uri=DbUrl, headers=Headers}) ->
+    {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post, Headers, true),
+    true = proplists:get_value(<<"ok">>, ResultProps),
+    {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)};
+ensure_full_commit(Db) ->
+    couch_db:ensure_full_commit(Db).
+    
+    
 get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) ->
     Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey=" 
         ++ integer_to_list(StartSeq),

Modified: couchdb/trunk/src/couchdb/couch_view_group.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_group.erl?rev=733174&r1=733173&r2=733174&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_group.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view_group.erl Fri Jan  9 14:20:48 2009
@@ -149,7 +149,7 @@
         {noreply, State#group_state{waiting_commit=false}};
     true ->
         % We can't commit the header because the database seq that's fully
-        % committed to disk is still behind us. It we committed now and the
+        % committed to disk is still behind us. If we committed now and the
         % database lost those changes our view could be forever out of sync
         % with the database. But a crash before we commit these changes, no big
         % deal, we only lose incremental changes since last committal.