You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/15 19:05:55 UTC
fabric commit: updated refs/heads/COUCHDB-3326-clustered-purge to
9b6662a
Repository: couchdb-fabric
Updated Branches:
refs/heads/COUCHDB-3326-clustered-purge [created] 9b6662aff
Implement clustered purge API
COUCHDB-3326
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/9b6662af
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/9b6662af
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/9b6662af
Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 9b6662aff94a2ea7239624c16ce5b97c1ccc088b
Parents: c4ae2f5
Author: Mayya Sharipova <ma...@ca.ibm.com>
Authored: Fri Oct 21 10:57:50 2016 -0400
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Wed Mar 15 14:05:21 2017 -0500
----------------------------------------------------------------------
src/fabric.erl | 34 +++-
src/fabric_db_meta.erl | 26 ++-
src/fabric_doc_open.erl | 46 +++--
src/fabric_doc_purge.erl | 424 ++++++++++++++++++++++++++++++++++++++++++
src/fabric_rpc.erl | 103 +++++++++-
5 files changed, 607 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9b6662af/src/fabric.erl
----------------------------------------------------------------------
diff --git a/src/fabric.erl b/src/fabric.erl
index c0f95df..7e0e777 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -21,12 +21,13 @@
delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
set_security/2, set_security/3, get_revs_limit/1, get_security/1,
get_security/2, get_all_security/1, get_all_security/2,
+ get_purged_docs_limit/1, set_purged_docs_limit/3, get_purge_seq/1,
compact/1, compact/2]).
% Documents
-export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3,
get_missing_revs/2, get_missing_revs/3, update_doc/3, update_docs/3,
- purge_docs/2, att_receiver/2]).
+ purge_docs/3, att_receiver/2]).
% Views
-export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4,
@@ -70,7 +71,8 @@ all_dbs(Prefix) when is_list(Prefix) ->
%% @doc returns a property list of interesting properties
%% about the database such as `doc_count', `disk_size',
-%% etc.
+%% purge_quality 0-100 - % of converged shards that share
+%% the largest purge_seq etc.
-spec get_db_info(dbname()) ->
{ok, [
{instance_start_time, binary()} |
@@ -127,6 +129,25 @@ get_revs_limit(DbName) ->
{ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
try couch_db:get_revs_limit(Db) after catch couch_db:close(Db) end.
+%% @doc sets the upper bound for the number of stored purge requests
+-spec set_purged_docs_limit(dbname(), pos_integer(), [option()]) -> ok.
+set_purged_docs_limit(DbName, Limit, Options)
+ when is_integer(Limit), Limit > 0 ->
+ fabric_db_meta:set_purged_docs_limit(dbname(DbName), Limit, opts(Options)).
+
+%% @doc retrieves the upper bound for the number of stored purge requests
+-spec get_purged_docs_limit(dbname()) -> pos_integer() | no_return().
+get_purged_docs_limit(DbName) ->
+ {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
+ try couch_db:get_purged_docs_limit(Db) after catch couch_db:close(Db) end.
+
+%% @doc retrieves the current purge_seq
+-spec get_purge_seq(dbname()) -> non_neg_integer() | no_return().
+get_purge_seq(DbName) ->
+ {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]),
+ try couch_db:get_purge_seq(Db) after catch couch_db:close(Db) end.
+
+
%% @doc sets the readers/writers/admin permissions for a database
-spec set_security(dbname(), SecObj::json_obj()) -> ok.
set_security(DbName, SecObj) ->
@@ -267,8 +288,13 @@ update_docs(DbName, Docs, Options) ->
{aborted, PreCommitFailures}
end.
-purge_docs(_DbName, _IdsRevs) ->
- not_implemented.
+%% @doc purge revisions for a list '{Id, Revs}'
+%% returns {ok, {PurgeSeq, [IdRevs]}}
+-spec purge_docs(dbname(), [{docid(), [revision()]}], [option()]) ->
+ {ok, {any(), [{docid(), [any()]}] }}.
+purge_docs(DbName, IdsRevs, Options) when is_list(IdsRevs) ->
+ IdsRevs2 = [idrevs(IdRs) || IdRs <- IdsRevs],
+ fabric_doc_purge:go(dbname(DbName), IdsRevs2, opts(Options)).
%% @doc spawns a process to upload attachment data and
%% returns a function that shards can use to communicate
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9b6662af/src/fabric_db_meta.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_meta.erl b/src/fabric_db_meta.erl
index 367ef06..4fd9365 100644
--- a/src/fabric_db_meta.erl
+++ b/src/fabric_db_meta.erl
@@ -12,7 +12,8 @@
-module(fabric_db_meta).
--export([set_revs_limit/3, set_security/3, get_all_security/2]).
+-export([set_revs_limit/3, set_security/3, get_all_security/2,
+ set_purged_docs_limit/3]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
@@ -48,6 +49,29 @@ handle_revs_message(Error, _, _Acc) ->
{error, Error}.
+set_purged_docs_limit(DbName, Limit, Options) ->
+ Shards = mem3:shards(DbName),
+ Workers = fabric_util:submit_jobs(Shards, set_purged_docs_limit, [Limit, Options]),
+ Handler = fun handle_purge_message/3,
+ Acc0 = {Workers, length(Workers) - 1},
+ case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of
+ {ok, ok} ->
+ ok;
+ {timeout, {DefunctWorkers, _}} ->
+ fabric_util:log_timeout(DefunctWorkers, "set_purged_docs_limit"),
+ {error, timeout};
+ Error ->
+ Error
+ end.
+
+handle_purge_message(ok, _, {_Workers, 0}) ->
+ {stop, ok};
+handle_purge_message(ok, Worker, {Workers, Waiting}) ->
+ {ok, {lists:delete(Worker, Workers), Waiting - 1}};
+handle_purge_message(Error, _, _Acc) ->
+ {error, Error}.
+
+
set_security(DbName, SecObj, Options) ->
Shards = mem3:shards(DbName),
RexiMon = fabric_util:create_monitors(Shards),
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9b6662af/src/fabric_doc_open.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl
index 9c45bd9..f57e317 100644
--- a/src/fabric_doc_open.erl
+++ b/src/fabric_doc_open.erl
@@ -25,7 +25,8 @@
r,
state,
replies,
- q_reply
+ q_reply,
+ replies_by_node=[] %[{Node, Reply}] used for checking purged docs
}).
@@ -83,7 +84,8 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
end;
handle_message(Reply, Worker, Acc) ->
NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
- NewAcc = Acc#acc{replies = NewReplies},
+ NewNReplies = [{Worker#shard.node, Reply}|Acc#acc.replies_by_node],
+ NewAcc = Acc#acc{replies = NewReplies, replies_by_node = NewNReplies},
case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
{true, QuorumReply} ->
fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
@@ -122,14 +124,15 @@ is_r_met(Workers, Replies, R) ->
no_more_workers
end.
-read_repair(#acc{dbname=DbName, replies=Replies}) ->
+read_repair(#acc{dbname=DbName, replies=Replies, replies_by_node=NReplies0}) ->
Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
+ NReplies = [{Node, Doc} || {Node, {ok, #doc{}=Doc}} <- NReplies0],
case Docs of
% omit local docs from read repair
[#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
choose_reply(Docs);
[#doc{id=Id} | _] ->
- Opts = [replicated_changes, ?ADMIN_CTX],
+ Opts = [replicated_changes, ?ADMIN_CTX, {read_repair, NReplies}],
Res = fabric:update_docs(DbName, Docs, Opts),
case Res of
{ok, []} ->
@@ -319,15 +322,17 @@ handle_message_reply_test() ->
?assertEqual(
{ok, Acc0#acc{
workers=[Worker0, Worker1],
- replies=[fabric_util:kv(foo,1)]
+ replies=[fabric_util:kv(foo,1)],
+ replies_by_node=[{undefined, foo}]
}},
handle_message(foo, Worker2, Acc0)
),
- ?assertEqual(
+ ?assertEqual(
{ok, Acc0#acc{
workers=[Worker0, Worker1],
- replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+ replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
+ replies_by_node =[{undefined, bar}]
}},
handle_message(bar, Worker2, Acc0#acc{
replies=[fabric_util:kv(foo,1)]
@@ -339,18 +344,24 @@ handle_message_reply_test() ->
% is returned. Bit subtle on the assertions here.
?assertEqual(
- {stop, Acc0#acc{workers=[],replies=[fabric_util:kv(foo,1)]}},
+ {stop, Acc0#acc{
+ workers=[],
+ replies=[fabric_util:kv(foo,1)],
+ replies_by_node=[{undefined, foo}]
+ }},
handle_message(foo, Worker0, Acc0#acc{workers=[Worker0]})
),
?assertEqual(
{stop, Acc0#acc{
workers=[],
- replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+ replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
+ replies_by_node =[{undefined, bar}, {undefined, foo}]
}},
handle_message(bar, Worker0, Acc0#acc{
workers=[Worker0],
- replies=[fabric_util:kv(foo,1)]
+ replies=[fabric_util:kv(foo,1)],
+ replies_by_node=[{undefined, foo}]
})
),
@@ -362,11 +373,13 @@ handle_message_reply_test() ->
workers=[],
replies=[fabric_util:kv(foo,2)],
state=r_met,
- q_reply=foo
+ q_reply=foo,
+ replies_by_node =[{undefined, foo}, {undefined, foo}]
}},
handle_message(foo, Worker1, Acc0#acc{
workers=[Worker0, Worker1],
- replies=[fabric_util:kv(foo,1)]
+ replies=[fabric_util:kv(foo,1)],
+ replies_by_node =[{undefined, foo}]
})
),
@@ -376,7 +389,8 @@ handle_message_reply_test() ->
r=1,
replies=[fabric_util:kv(foo,1)],
state=r_met,
- q_reply=foo
+ q_reply=foo,
+ replies_by_node =[{undefined, foo}]
}},
handle_message(foo, Worker0, Acc0#acc{r=1})
),
@@ -386,11 +400,13 @@ handle_message_reply_test() ->
workers=[],
replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,2)],
state=r_met,
- q_reply=foo
+ q_reply=foo,
+ replies_by_node =[{undefined, foo}, {undefined, foo}, {undefined, bar}]
}},
handle_message(foo, Worker0, Acc0#acc{
workers=[Worker0],
- replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)]
+ replies=[fabric_util:kv(bar,1), fabric_util:kv(foo,1)],
+ replies_by_node =[{undefined, foo}, {undefined, bar}]
})
),
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9b6662af/src/fabric_doc_purge.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_purge.erl b/src/fabric_doc_purge.erl
new file mode 100644
index 0000000..58e7a5e
--- /dev/null
+++ b/src/fabric_doc_purge.erl
@@ -0,0 +1,424 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_purge).
+
+-export([go/3]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+go(_, [], _) ->
+ {ok, []};
+go(DbName, AllIdsRevs, Opts) ->
+ % tag each purge request with UUId
+ {AllUUIDs, AllUUIDsIdsRevs, DocCount} = tag_docs(AllIdsRevs),
+
+ Options = lists:delete(all_or_nothing, Opts),
+ % Counters -> [{Worker, UUIDs}]
+ {Counters, Workers} = dict:fold(fun(Shard, UUIDsIdsRevs, {Cs,Ws}) ->
+ UUIDs = [UUID || {UUID, _Id, _Revs} <-UUIDsIdsRevs],
+ #shard{name=Name, node=Node} = Shard,
+ Ref = rexi:cast(Node, {fabric_rpc, purge_docs, [Name, UUIDsIdsRevs, Options]}),
+ Worker = Shard#shard{ref=Ref},
+ { [{Worker, UUIDs}|Cs], [Worker|Ws]}
+ end, {[], []}, group_idrevs_by_shard(DbName, AllUUIDsIdsRevs)),
+
+ RexiMon = fabric_util:create_monitors(Workers),
+ W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
+ % ResultsAcc -> {PSeqsCs, DocsDict}
+ % PSeqsCs -> [{Shard, PurgeSeq}]
+ % DocsDict -> UUID -> [{ok, PurgedRevs}]
+ ResultsAcc = {[], dict:new()},
+ Acc = {length(Workers), DocCount, list_to_integer(W), Counters, ResultsAcc},
+ Timeout = fabric_util:request_timeout(),
+ try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc, infinity, Timeout) of
+ {ok, {Health, PSeq, Results}} when Health =:= ok; Health =:= accepted ->
+ % Results-> [{UUID, {ok, Revs}}]
+ {Health, {PSeq, [R || R <-
+ couch_util:reorder_results(AllUUIDs, Results), R =/= noreply]}};
+ {timeout, Acc1} ->
+ {_, _, W1, Counters1, {PSeqsCs, DocsDict1}} = Acc1,
+ {DefunctWorkers, _} = lists:unzip(Counters1),
+ fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
+ {Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []}, DocsDict1),
+ FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
+ {Health, {FinalPSeq, [R || R <-
+ couch_util:reorder_results(AllUUIDs, Resp), R =/= noreply]}};
+ Else ->
+ Else
+ after
+ rexi_monitor:stop(RexiMon)
+ end.
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) ->
+ {_, DocCount, W, Counters, {PSeqsCs, DocsDict0}} = Acc0,
+ {FailCounters, NewCounters} = lists:partition(fun({#shard{node=N}, _}) ->
+ N == NodeRef
+ end, Counters),
+ % fill DocsDict with error messages for relevant Docs
+ DocsDict = lists:foldl(fun({_W, Docs}, CDocsDict) ->
+ Replies = [{error, internal_server_error} || _D <- Docs],
+ append_update_replies(Docs, Replies, CDocsDict)
+ end, DocsDict0, FailCounters),
+ Results = {PSeqsCs, DocsDict},
+ skip_message({length(NewCounters), DocCount, W, NewCounters, Results});
+handle_message({rexi_EXIT, _}, Worker, Acc0) ->
+ {WC, DocCount , W, Counters, {PSeqsCs, DocsDict0}} = Acc0,
+ % fill DocsDict with error messages for relevant Docs
+ {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
+ Replies = [{error, internal_server_error} || _D <- Docs],
+ DocsDict = append_update_replies(Docs, Replies, DocsDict0),
+ skip_message({WC-1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}});
+handle_message({ok, {PSeq, Replies0}}, Worker, Acc0) ->
+ {WCount, DocCount, W, Counters, {PSeqsCs0, DocsDict0}} = Acc0,
+ {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
+ DocsDict = append_update_replies(Docs, Replies0, DocsDict0),
+ PSeqsCs = [{Worker, PSeq}| PSeqsCs0],
+ case {WCount, dict:size(DocsDict)} of
+ {1, _} ->
+ % last message has arrived, we need to conclude things
+ {Health, W, Replies} = dict:fold(fun force_reply/3, {ok, W, []},
+ DocsDict),
+ FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
+ {stop, {Health, FinalPSeq, Replies}};
+ {_, DocCount} ->
+ % we've got at least one reply for each document, let's take a look
+ case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocsDict) of
+ continue ->
+ {ok, {WCount - 1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}}};
+ {stop, W, Replies} ->
+ FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
+ {stop, {ok, FinalPSeq, Replies}}
+ end;
+ _ ->
+ {ok, {WCount - 1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}}}
+ end;
+handle_message({error, purged_docs_limit_exceeded}=Error, Worker, Acc0) ->
+ {WC, DocCount , W, Counters, {PSeqsCs, DocsDict0}} = Acc0,
+ % fill DocsDict with error messages for relevant Docs
+ {value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
+ Replies = [Error || _D <- Docs],
+ DocsDict = append_update_replies(Docs, Replies, DocsDict0),
+ skip_message({WC-1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}});
+handle_message({bad_request, Msg}, _, _) ->
+ throw({bad_request, Msg}).
+
+
+tag_docs(AllIdsRevs) ->
+ lists:foldr(fun({Id, Revs}, {UUIDsAcc, UUIDsIdsRevsAcc, L}) ->
+ UUID = couch_uuids:new(),
+ NewUUIDsAcc = [UUID | UUIDsAcc],
+ NewUUIDsIdsRevsAcc = [{UUID, Id, Revs} | UUIDsIdsRevsAcc],
+ {NewUUIDsAcc, NewUUIDsIdsRevsAcc, L+1}
+ end, {[], [], 0}, AllIdsRevs).
+
+
+force_reply(Doc, Replies, {Health, W, Acc}) ->
+ case update_quorum_met(W, Replies) of
+ {true, FinalReply} ->
+ {Health, W, [{Doc, FinalReply} | Acc]};
+ false ->
+ case [Reply || {ok, Reply} <- Replies] of
+ [] ->
+ UReplies = lists:usort(Replies),
+ case UReplies of
+ [{error, internal_server_error}] ->
+ {error, W, [{Doc, {error, internal_server_error}} | Acc]};
+ [FirstReply|[]] ->
+ % check if all errors are identical, if so inherit health
+ {Health, W, [{Doc, FirstReply} | Acc]};
+ _ ->
+ {error, W, [{Doc, UReplies} | Acc]}
+ end;
+
+ AcceptedReplies0 ->
+ NewHealth = case Health of ok -> accepted; _ -> Health end,
+ AcceptedReplies = lists:usort(lists:flatten(AcceptedReplies0)),
+ {NewHealth, W, [{Doc, {accepted, AcceptedReplies}} | Acc]}
+ end
+ end.
+
+
+maybe_reply(_, _, continue) ->
+ % we didn't meet quorum for all docs, so we're fast-forwarding the fold
+ continue;
+maybe_reply(Doc, Replies, {stop, W, Acc}) ->
+ case update_quorum_met(W, Replies) of
+ {true, Reply} ->
+ {stop, W, [{Doc, Reply} | Acc]};
+ false ->
+ continue
+ end.
+
+update_quorum_met(W, Replies) ->
+ OkReplies = lists:foldl(fun(Reply, PrevsAcc) ->
+ case Reply of
+ {ok, PurgedRevs} -> [PurgedRevs | PrevsAcc];
+ _ -> PrevsAcc
+ end
+ end, [], Replies),
+ if length(OkReplies) < W -> false; true ->
+ % make a union of PurgedRevs
+ FinalReply = {ok, lists:usort(lists:flatten(OkReplies))},
+ {true, FinalReply}
+ end.
+
+
+group_idrevs_by_shard(DbName, UUIDsIdsRevs) ->
+ lists:foldl(fun({_UUID, Id, _Revs} = UUIDIdRevs, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, UUIDIdRevs, D1)
+ end, D0, mem3:shards(DbName, Id))
+ end, dict:new(), UUIDsIdsRevs).
+
+
+append_update_replies([], [], DocReplyDict) ->
+ DocReplyDict;
+append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
+ append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
+
+
+skip_message({0, _, W, _, {PSeqsCs, DocsDict}}) ->
+ {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocsDict),
+ FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
+ {stop, {Health, FinalPSeq, Reply}};
+skip_message(Acc0) ->
+ {ok, Acc0}.
+
+
+% eunits
+doc_purge_ok_test() ->
+ meck:new(couch_log),
+ meck:expect(couch_log, warning, fun(_,_) -> ok end),
+ meck:expect(couch_log, notice, fun(_,_) -> ok end),
+
+ Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+ UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+ Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+ UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+ UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+ Shards =
+ mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+ Counters = dict:to_list(
+ group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+ DocsDict = dict:new(),
+
+ % ***test for W = 2
+ AccW2 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+ Counters, {[], DocsDict}},
+ {ok, {WaitingCountW2_1,_,_,_,_} = AccW2_1} =
+ handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), AccW2),
+ ?assertEqual(2, WaitingCountW2_1),
+ {stop, FinalReplyW2 } =
+ handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}},
+ lists:nth(2,Shards), AccW2_1),
+ ?assertMatch(
+ {ok, _PSeq, Replies} when Replies ==
+ [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}],
+ FinalReplyW2
+ ),
+
+ % ***test for W = 3
+ AccW3 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+ Counters, {[], DocsDict}},
+ {ok, {WaitingCountW3_1,_,_,_,_} = AccW3_1} =
+ handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), AccW3),
+ ?assertEqual(2, WaitingCountW3_1),
+ {ok, {WaitingCountW3_2,_,_,_,_} = AccW3_2} =
+ handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}},
+ lists:nth(2,Shards), AccW3_1),
+ ?assertEqual(1, WaitingCountW3_2),
+ {stop, FinalReplyW3 } =
+ handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}},
+ lists:nth(3,Shards), AccW3_2),
+ ?assertMatch(
+ {ok, _PSeq, Replies} when Replies ==
+ [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}],
+ FinalReplyW3
+ ),
+
+ % *** test rexi_exit on 1 node
+ Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+ Counters, {[], DocsDict}},
+ {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+ handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), Acc0),
+ ?assertEqual(2, WaitingCount1),
+ {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+ handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+ ?assertEqual(1, WaitingCount2),
+ {stop, Reply} =
+ handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, lists:nth(3,Shards), Acc2),
+ ?assertMatch(
+ {ok, _PSeq, Replies} when Replies ==
+ [{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}],
+ Reply
+ ),
+
+ % *** test {error, purge_during_compaction_exceeded_limit} on all nodes
+ % *** still should return ok reply for the request
+ ErrPDCEL = {error, purge_during_compaction_exceeded_limit},
+ Acc20 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+ Counters, {[], DocsDict}},
+ {ok, {WaitingCount21,_,_,_,_} = Acc21} =
+ handle_message({ok,{0,[ErrPDCEL, ErrPDCEL]}}, hd(Shards), Acc20),
+ ?assertEqual(2, WaitingCount21),
+ {ok, {WaitingCount22,_,_,_,_} = Acc22} =
+ handle_message({ok,{0,[ErrPDCEL, ErrPDCEL]}},
+ lists:nth(2,Shards), Acc21),
+ ?assertEqual(1, WaitingCount22),
+ {stop, Reply2 } =
+ handle_message({ok,{0,[ErrPDCEL, ErrPDCEL]}},
+ lists:nth(3,Shards), Acc22),
+ ?assertMatch(
+ {ok, _PSeq, Replies2} when Replies2 ==
+ [{UUID1, ErrPDCEL}, {UUID2, ErrPDCEL}],
+ Reply2
+ ),
+
+ % *** test {error, purged_docs_limit_exceeded} on all nodes
+ % *** still should return ok reply for the request
+ ErrPDLE = {error, purged_docs_limit_exceeded},
+ Acc30 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
+ Counters, {[], DocsDict}},
+ {ok, {WaitingCount31,_,_,_,_} = Acc31} =
+ handle_message({ok,{0,[ErrPDLE, ErrPDLE]}}, hd(Shards), Acc30),
+ ?assertEqual(2, WaitingCount31),
+ {ok, {WaitingCount32,_,_,_,_} = Acc32} =
+ handle_message({ok,{0,[ErrPDLE, ErrPDLE]}},
+ lists:nth(2,Shards), Acc31),
+ ?assertEqual(1, WaitingCount32),
+ {stop, Reply3 } =
+ handle_message({ok,{0,[ErrPDLE, ErrPDLE]}},
+ lists:nth(3,Shards), Acc32),
+ ?assertMatch(
+ {ok, _PSeq, Replies3} when Replies3 ==
+ [{UUID1, ErrPDLE}, {UUID2, ErrPDLE}],
+ Reply3
+ ),
+ meck:unload(couch_log).
+
+
+doc_purge_accepted_test() ->
+ meck:new(couch_log),
+ meck:expect(couch_log, warning, fun(_,_) -> ok end),
+ meck:expect(couch_log, notice, fun(_,_) -> ok end),
+
+ Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+ UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+ Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+ UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+ UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+ Shards =
+ mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+ Counters = dict:to_list(
+ group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+ DocsDict = dict:new(),
+
+ % *** test rexi_exit on 2 nodes
+ Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+ Counters, {[], DocsDict}},
+ {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+ handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), Acc0),
+ ?assertEqual(2, WaitingCount1),
+ {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+ handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+ ?assertEqual(1, WaitingCount2),
+ {stop, Reply} =
+ handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
+ ?assertMatch(
+ {accepted, _PSeq, Replies} when Replies ==
+ [{UUID1, {accepted, Revs1}}, {UUID2, {accepted, Revs2}}],
+ Reply
+ ),
+ meck:unload(couch_log).
+
+
+doc_purge_error_test() ->
+ meck:new(couch_log),
+ meck:expect(couch_log, warning, fun(_,_) -> ok end),
+ meck:expect(couch_log, notice, fun(_,_) -> ok end),
+
+ Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
+ UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
+ Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
+ UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
+ UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
+ Shards =
+ mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
+ Counters = dict:to_list(
+ group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
+ DocsDict = dict:new(),
+
+ % *** test rexi_exit on all 3 nodes
+ Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
+ Counters, {[], DocsDict}},
+ {ok, {WaitingCount1,_,_,_,_} = Acc1} =
+ handle_message({rexi_EXIT, nil}, hd(Shards), Acc0),
+ ?assertEqual(2, WaitingCount1),
+ {ok, {WaitingCount2,_,_,_,_} = Acc2} =
+ handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
+ ?assertEqual(1, WaitingCount2),
+ {stop, Reply} =
+ handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
+ ?assertMatch(
+ {error, _PSeq, Replies} when Replies ==
+ [{UUID1, {error, internal_server_error}},
+ {UUID2, {error, internal_server_error}}],
+ Reply
+ ),
+
+ % ***test w quorum > # shards, which should fail immediately
+ Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
+ Counters2 = dict:to_list(
+ group_idrevs_by_shard_hack(<<"foo">>, Shards2, UUIDsIDdsRevs)),
+ AccW4 = {length(Shards2), length(UUIDsIDdsRevs), list_to_integer("2"),
+ Counters2, {[], DocsDict}},
+ Bool =
+ case handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), AccW4) of
+ {stop, _Reply} ->
+ true;
+ _ -> false
+ end,
+ ?assertEqual(true, Bool),
+
+ % *** test Docs with no replies should end up as {error, internal_server_error}
+ SA1 = #shard{node=a, range=1},
+ SA2 = #shard{node=a, range=2},
+ SB1 = #shard{node=b, range=1},
+ SB2 = #shard{node=b, range=2},
+ Counters3 = [{SA1,[UUID1]}, {SB1,[UUID1]},
+ {SA2,[UUID2]}, {SB2,[UUID2]}],
+ Acc30 = {length(Counters3), length(UUIDsIDdsRevs), 2,
+ Counters3, {[], DocsDict}},
+ {ok, Acc31} = handle_message({ok,{1,[{ok, Revs1}]}}, SA1, Acc30),
+ {ok, Acc32} = handle_message({rexi_EXIT, nil}, SB1, Acc31),
+ {ok, Acc33} = handle_message({rexi_EXIT, nil}, SA2, Acc32),
+ {stop, Acc34} = handle_message({rexi_EXIT, nil}, SB2, Acc33),
+ ?assertMatch(
+ {error, _PSeq, Replies3} when Replies3 ==
+ [{UUID1, {accepted, Revs1}},
+ {UUID2, {error, internal_server_error}}],
+ Acc34
+ ),
+ meck:unload(couch_log).
+
+
+% needed for testing to avoid having to start the mem3 application
+group_idrevs_by_shard_hack(_DbName, Shards, UUIDsIdsRevs) ->
+ lists:foldl(fun({UUID, _Id, _Revs}, Dict0) ->
+ lists:foldl(fun(Shard, Dict1) ->
+ dict:append(Shard, UUID, Dict1)
+ end, Dict0, Shards)
+ end, dict:new(), UUIDsIdsRevs).
+
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9b6662af/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 6dee870..f7c079b 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -20,6 +20,7 @@
set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]).
-export([get_all_security/2, open_shard/2]).
-export([compact/1, compact/2]).
+-export([get_purge_seq/2, purge_docs/3, set_purged_docs_limit/3]).
-export([get_db_info/2, get_doc_count/2, get_update_seq/2,
changes/4, map_view/5, reduce_view/5, group_info/3, update_mrview/4]).
@@ -197,6 +198,9 @@ get_all_security(DbName, Options) ->
set_revs_limit(DbName, Limit, Options) ->
with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).
+set_purged_docs_limit(DbName, Limit, Options) ->
+ with_db(DbName, Options, {couch_db, set_purged_docs_limit, [Limit]}).
+
open_doc(DbName, DocId, Options) ->
with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).
@@ -232,14 +236,33 @@ get_missing_revs(DbName, IdRevsList, Options) ->
end).
update_docs(DbName, Docs0, Options) ->
+ X = case proplists:get_value(replicated_changes, Options) of
+ true -> replicated_changes;
+ _ -> interactive_edit
+ end,
+ DocsByNode = couch_util:get_value(read_repair, Options),
+ case {X, DocsByNode} of
+ {_, undefined} ->
+ Docs = make_att_readers(Docs0),
+ with_db(DbName, Options,
+ {couch_db, update_docs, [Docs, Options, X]});
+ {replicated_changes, _} ->
+ update_docs_read_repair(DbName, DocsByNode, Options)
+ end.
+
+
+get_purge_seq(DbName, DbOptions) ->
+ with_db(DbName, DbOptions, {couch_db, get_purge_seq, []}).
+
+purge_docs(DbName, UUIdsIdsRevs, Options) ->
case proplists:get_value(replicated_changes, Options) of
- true ->
- X = replicated_changes;
- _ ->
- X = interactive_edit
+ true ->
+ X = replicated_changes;
+ _ ->
+ X = interactive_edit
end,
- Docs = make_att_readers(Docs0),
- with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
+ with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, X]}).
+
%% @equiv group_info(DbName, DDocId, [])
group_info(DbName, DDocId) ->
@@ -294,6 +317,74 @@ with_db(DbName, Options, {M,F,A}) ->
rexi:reply(Error)
end.
+
+update_docs_read_repair(DbName, DocsByNode, Options) ->
+ set_io_priority(DbName, Options),
+ case get_or_create_db(DbName, Options) of
+ {ok, Db} ->
+ % omit Revisions that have been purged
+ Docs = filter_purged_revs(Db, DocsByNode),
+ Docs2 = make_att_readers(Docs),
+ {M,F,A} = {couch_db, update_docs, [Docs2, Options, replicated_changes]},
+ rexi:reply(try
+ apply(M, F, [Db | A])
+ catch Exception ->
+ Exception;
+ error:Reason ->
+ couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason,
+ clean_stack()]),
+ {error, Reason}
+ end);
+ Error ->
+ rexi:reply(Error)
+ end.
+
+
+% given [{Node, Doc}] diff revs of the same DocID from diff nodes
+% returns [Doc] filtering out purged docs.
+% This is done for read-repair from fabric_doc_open,
+% so that not to recreate Docs that have been purged before
+% on this node() from Nodes that are out of sync.
+filter_purged_revs(Db, DocsByNode) ->
+ AllowedPSeqLag = config:get_integer("couchdb",
+ "allowed_purge_seq_lag", 100),
+ DbPSeq = couch_db:get_purge_seq(Db),
+ PurgeFoldFun = fun({_P,_U, Id, Revs}, Acc) -> [{Id, Revs}|Acc] end,
+ Opts = [{start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-mem3-")},
+ {end_key_gt, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-mem31")}],
+ % go through _local/purge-mem3-.. docs
+ % find Node that this LDoc corresponds to
+ % check if the update from Node has not been recently purged on current node
+ LDocsFoldFun = fun(#doc{body={Props}}, Acc) ->
+ {VOps} = couch_util:get_value(<<"verify_options">>, Props),
+ Node = couch_util:get_value(<<"node">>, VOps),
+ Result = lists:keyfind(Node, 1, DocsByNode),
+ NewAcc = if not Result -> Acc; true ->
+ {Node, Doc} = Result,
+ NodePSeq = couch_util:get_value(<<"purge_seq">>, Props),
+ if NodePSeq == DbPSeq ->
+ [Doc|Acc];
+ (NodePSeq+AllowedPSeqLag) < DbPSeq ->
+ % Node is very out of sync, ignore updates from it
+ Acc;
+ true -> %(NodePSeq+ClientAllowedPSeqLag) >= DbPSeq
+ % if Doc has been purged recently, than ignore it
+ {ok, PurgedIdsRevs} = couch_db:fold_purged_docs(Db,
+ NodePSeq, PurgeFoldFun, [], []),
+ {Start, [FirstRevId|_]} = Doc#doc.revs,
+ DocIdRevs = {Doc#doc.id, [{Start, FirstRevId}]},
+ case lists:member(DocIdRevs, PurgedIdsRevs) of
+ true -> Acc;
+ false -> [Doc|Acc]
+ end
+ end
+ end,
+ {ok, NewAcc}
+ end,
+ {ok, Docs} = couch_db_engine:fold_local_docs(Db, LDocsFoldFun, [], Opts),
+ Docs.
+
+
get_or_create_db(DbName, Options) ->
couch_db:open_int(DbName, [{create_if_missing, true} | Options]).