You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/02/03 16:28:35 UTC

[2/4] couch commit: updated refs/heads/COUCHDB-3288-remove-public-db-record to c9fc361

Move calculate_start_seq and owner_of

These functions were originally implemented in fabric_rpc.erl where they
really didn't belong. Moving them to couch_db.erl allows us to keep the
unit tests intact rather than just removing them now that the #db record
is being made private.

COUCHDB-3288


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/3f8a0d0d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/3f8a0d0d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/3f8a0d0d

Branch: refs/heads/COUCHDB-3288-remove-public-db-record
Commit: 3f8a0d0d94dcaff170b7d81429c5ccb58cd055a2
Parents: 09aa667
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Feb 3 09:59:23 2017 -0600
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Fri Feb 3 10:11:13 2017 -0600

----------------------------------------------------------------------
 src/couch_db.erl | 103 +++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 102 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/3f8a0d0d/src/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couch_db.erl b/src/couch_db.erl
index 2adcd33..06a4e3a 100644
--- a/src/couch_db.erl
+++ b/src/couch_db.erl
@@ -83,6 +83,9 @@
     changes_since/5,
     count_changes_since/2,
 
+    calculate_start_seq/3,
+    owner_of/2,
+
     start_compact/1,
     cancel_compact/1,
     wait_for_compaction/1,
@@ -386,7 +389,9 @@ get_uuid(#db{}=Db) ->
     couch_db_header:uuid(Db#db.header).
 
 get_epochs(#db{}=Db) ->
-    couch_db_header:epochs(Db#db.header).
+    Epochs = couch_db_header:epochs(Db#db.header),
+    validate_epochs(Epochs),
+    Epochs.
 
 get_compacted_seq(#db{}=Db) ->
     couch_db_header:compacted_seq(Db#db.header).
@@ -1360,6 +1365,78 @@ enum_docs(Db, NS, InFun, InAcc, Options0) ->
         Db#db.id_tree, FoldFun, InAcc, Options),
     {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
 
+
+calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
+    Seq;
+calculate_start_seq(Db, Node, {Seq, Uuid}) ->
+    % Treat the current node as the epoch node
+    calculate_start_seq(Db, Node, {Seq, Uuid, Node});
+calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) ->
+    case is_prefix(Uuid, get_uuid(Db)) of
+        true ->
+            case is_owner(EpochNode, Seq, get_epochs(Db)) of
+                true -> Seq;
+                false -> 0
+            end;
+        false ->
+            %% The file was rebuilt, most likely in a different
+            %% order, so rewind.
+            0
+    end;
+calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) ->
+    case is_prefix(Uuid, couch_db:get_uuid(Db)) of
+        true ->
+            start_seq(get_epochs(Db), OriginalNode, Seq);
+        false ->
+            {replace, OriginalNode, Uuid, Seq}
+    end.
+
+
+validate_epochs(Epochs) ->
+    %% Assert uniqueness.
+    case length(Epochs) == length(lists:ukeysort(2, Epochs)) of
+        true  -> ok;
+        false -> erlang:error(duplicate_epoch)
+    end,
+    %% Assert order.
+    case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of
+        true  -> ok;
+        false -> erlang:error(epoch_order)
+    end.
+
+
+is_prefix(Pattern, Subject) ->
+     binary:longest_common_prefix([Pattern, Subject]) == size(Pattern).
+
+
+is_owner(Node, Seq, Epochs) ->
+    Node =:= owner_of(Epochs, Seq).
+
+
+owner_of(Db, Seq) when not is_list(Db) ->
+    owner_of(get_epochs(Db), Seq);
+owner_of([], _Seq) ->
+    undefined;
+owner_of([{EpochNode, EpochSeq} | _Rest], Seq) when Seq > EpochSeq ->
+    EpochNode;
+owner_of([_ | Rest], Seq) ->
+    owner_of(Rest, Seq).
+
+
+start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq > EpochSeq ->
+    %% OrigNode is the owner of the Seq so we can safely stream from there
+    Seq;
+start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq > NewSeq ->
+    %% We transferred this file before Seq was written on OrigNode, so we need
+    %% to stream from the beginning of the next epoch. Note that it is _not_
+    %% necessary for the current node to own the epoch beginning at NewSeq
+    NewSeq;
+start_seq([_ | Rest], OrigNode, Seq) ->
+    start_seq(Rest, OrigNode, Seq);
+start_seq([], OrigNode, Seq) ->
+    erlang:error({epoch_mismatch, OrigNode, Seq}).
+
+
 extract_namespace(Options0) ->
     case proplists:split(Options0, [namespace]) of
         {[[{namespace, NS}]], Options} ->
@@ -1698,6 +1775,30 @@ should_fail_validate_dbname(DbName) ->
         ok
     end)}.
 
+calculate_start_seq_test() ->
+    %% uuid mismatch is always a rewind.
+    Hdr1 = couch_db_header:new(),
+    Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]),
+    ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})),
+    %% uuid matches and seq is owned by node.
+    Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
+    ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})),
+    %% uuids match but seq is not owned by node.
+    Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
+    ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})),
+    %% return integer if we didn't get a vector.
+    ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)).
+
+is_owner_test() ->
+    ?assertNot(is_owner(foo, 1, [])),
+    ?assertNot(is_owner(foo, 1, [{foo, 1}])),
+    ?assert(is_owner(foo, 2, [{foo, 1}])),
+    ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])),
+    ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
+    ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])),
+    ?assertError(duplicate_epoch, validate_epochs([{foo, 1}, {bar, 1}])),
+    ?assertError(epoch_order, validate_epochs([{foo, 100}, {bar, 200}])).
+
 to_binary(DbName) when is_list(DbName) ->
     ?l2b(DbName);
 to_binary(DbName) when is_binary(DbName) ->