You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2017/03/15 19:17:00 UTC
[1/3] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to
8b3a6b1
Repository: couchdb-mem3
Updated Branches:
refs/heads/COUCHDB-3326-clustered-purge [created] 8b3a6b19a
Store and use the storage engine property
This adds an optional key to database documents that lists the
configured storage engine. This allows mem3_shards to create the shard
with the appropriate storage engine when recovering from a network
split.
BugzId: 45918
Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/b26e3947
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/b26e3947
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/b26e3947
Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: b26e3947fcdef6f0be97f588e3c2837ce94d8363
Parents: 5e5201a
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Apr 6 11:18:01 2016 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Tue Jun 7 13:00:01 2016 -0500
----------------------------------------------------------------------
include/mem3.hrl | 6 ++++--
src/mem3.erl | 20 +++++++++++++++++---
src/mem3_shards.erl | 28 +++++++++++++---------------
src/mem3_util.erl | 17 ++++++++++++++---
test/mem3_util_test.erl | 16 ++++++++--------
5 files changed, 56 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b26e3947/include/mem3.hrl
----------------------------------------------------------------------
diff --git a/include/mem3.hrl b/include/mem3.hrl
index d6ac0be..6579210 100644
--- a/include/mem3.hrl
+++ b/include/mem3.hrl
@@ -16,7 +16,8 @@
node :: node() | '_',
dbname :: binary(),
range :: [non_neg_integer() | '$1' | '$2'] | '_',
- ref :: reference() | 'undefined' | '_'
+ ref :: reference() | 'undefined' | '_',
+ opts :: list()
}).
%% Do not reference outside of mem3.
@@ -26,7 +27,8 @@
dbname :: binary(),
range :: [non_neg_integer() | '$1' | '$2'] | '_',
ref :: reference() | 'undefined' | '_',
- order :: non_neg_integer() | 'undefined' | '_'
+ order :: non_neg_integer() | 'undefined' | '_',
+ opts :: list()
}).
%% types
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b26e3947/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index 405d7e5..a6038d5 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -23,7 +23,7 @@
-export([get_placement/1]).
%% For mem3 use only.
--export([name/1, node/1, range/1]).
+-export([name/1, node/1, range/1, engine/1]).
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
@@ -99,7 +99,8 @@ shards_int(DbName, Options) ->
name = ShardDbName,
dbname = ShardDbName,
range = [0, (2 bsl 31)-1],
- order = undefined}];
+ order = undefined,
+ opts = []}];
ShardDbName ->
%% shard_db is treated as a single sharded db to support calls to db_info
%% and view_all_docs
@@ -107,7 +108,8 @@ shards_int(DbName, Options) ->
node = node(),
name = ShardDbName,
dbname = ShardDbName,
- range = [0, (2 bsl 31)-1]}];
+ range = [0, (2 bsl 31)-1],
+ opts = []}];
_ ->
mem3_shards:for_db(DbName, Options)
end.
@@ -306,3 +308,15 @@ name(#shard{name=Name}) ->
Name;
name(#ordered_shard{name=Name}) ->
Name.
+
+engine(#shard{opts=Opts}) ->
+ engine(Opts);
+engine(#ordered_shard{opts=Opts}) ->
+ engine(Opts);
+engine(Opts) when is_list(Opts) ->
+ case couch_util:get_value(engine, Opts) of
+ Engine when is_binary(Engine) ->
+ [{engine, Engine}];
+ _ ->
+ []
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b26e3947/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index 548ab3e..1227d5e 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -300,7 +300,7 @@ changes_callback({change, {Change}, _}, _) ->
{Doc} ->
Shards = mem3_util:build_ordered_shards(DbName, Doc),
gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
- [create_if_missing(mem3:name(S)) || S
+ [create_if_missing(mem3:name(S), mem3:engine(S)) || S
<- Shards, mem3:node(S) =:= node()]
end
end
@@ -337,20 +337,18 @@ in_range(Shard, HashKey) ->
[B, E] = mem3:range(Shard),
B =< HashKey andalso HashKey =< E.
-create_if_missing(Name) ->
- DbDir = config:get("couchdb", "database_dir"),
- Filename = filename:join(DbDir, ?b2l(Name) ++ ".couch"),
- case filelib:is_regular(Filename) of
- true ->
- ok;
- false ->
- case couch_server:create(Name, [?ADMIN_CTX]) of
- {ok, Db} ->
- couch_db:close(Db);
- Error ->
- couch_log:error("~p tried to create ~s, got ~p",
- [?MODULE, Name, Error])
- end
+create_if_missing(Name, Options) ->
+ case couch_server:exists(Name) of
+ true ->
+ ok;
+ false ->
+ case couch_server:create(Name, [?ADMIN_CTX] ++ Options) of
+ {ok, Db} ->
+ couch_db:close(Db);
+ Error ->
+ couch_log:error("~p tried to create ~s, got ~p",
+ [?MODULE, Name, Error])
+ end
end.
cache_insert(#st{cur_size=Cur}=St, DbName, Shards) ->
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b26e3947/src/mem3_util.erl
----------------------------------------------------------------------
diff --git a/src/mem3_util.erl b/src/mem3_util.erl
index 2cd444d..4e1b5fd 100644
--- a/src/mem3_util.erl
+++ b/src/mem3_util.erl
@@ -153,6 +153,10 @@ build_ordered_shards(DbName, DocProps) ->
build_shards_by_node(DbName, DocProps) ->
{ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+ EngineOpt = case couch_util:get_value(<<"engine">>, DocProps) of
+ Engine when is_binary(Engine) -> [{engine, Engine}];
+ _ -> []
+ end,
lists:flatmap(fun({Node, Ranges}) ->
lists:map(fun(Range) ->
[B,E] = string:tokens(?b2l(Range), "-"),
@@ -161,7 +165,8 @@ build_shards_by_node(DbName, DocProps) ->
name_shard(#shard{
dbname = DbName,
node = to_atom(Node),
- range = [Beg, End]
+ range = [Beg, End],
+ opts = EngineOpt
}, Suffix)
end, Ranges)
end, ByNode).
@@ -169,6 +174,10 @@ build_shards_by_node(DbName, DocProps) ->
build_shards_by_range(DbName, DocProps) ->
{ByRange} = couch_util:get_value(<<"by_range">>, DocProps, {[]}),
Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+ EngineOpt = case couch_util:get_value(<<"engine">>, DocProps) of
+ Engine when is_binary(Engine) -> [{engine, Engine}];
+ _ -> []
+ end,
lists:flatmap(fun({Range, Nodes}) ->
lists:map(fun({Node, Order}) ->
[B,E] = string:tokens(?b2l(Range), "-"),
@@ -178,7 +187,8 @@ build_shards_by_range(DbName, DocProps) ->
dbname = DbName,
node = to_atom(Node),
range = [Beg, End],
- order = Order
+ order = Order,
+ opts = EngineOpt
}, Suffix)
end, lists:zip(Nodes, lists:seq(1, length(Nodes))))
end, ByRange).
@@ -247,7 +257,8 @@ downcast(#ordered_shard{}=S) ->
node = S#ordered_shard.node,
dbname = S#ordered_shard.dbname,
range = S#ordered_shard.range,
- ref = S#ordered_shard.ref
+ ref = S#ordered_shard.ref,
+ opts = S#ordered_shard.opts
};
downcast(Shards) when is_list(Shards) ->
[downcast(Shard) || Shard <- Shards].
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b26e3947/test/mem3_util_test.erl
----------------------------------------------------------------------
diff --git a/test/mem3_util_test.erl b/test/mem3_util_test.erl
index 340a58a..42bc5c7 100644
--- a/test/mem3_util_test.erl
+++ b/test/mem3_util_test.erl
@@ -85,35 +85,35 @@ build_shards_test() ->
[{shard,<<"shards/00000000-1fffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[0,536870911],
- undefined},
+ undefined,[]},
{shard,<<"shards/20000000-3fffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[536870912,1073741823],
- undefined},
+ undefined,[]},
{shard,<<"shards/40000000-5fffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[1073741824,1610612735],
- undefined},
+ undefined,[]},
{shard,<<"shards/60000000-7fffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[1610612736,2147483647],
- undefined},
+ undefined,[]},
{shard,<<"shards/80000000-9fffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[2147483648,2684354559],
- undefined},
+ undefined,[]},
{shard,<<"shards/a0000000-bfffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[2684354560,3221225471],
- undefined},
+ undefined,[]},
{shard,<<"shards/c0000000-dfffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[3221225472,3758096383],
- undefined},
+ undefined,[]},
{shard,<<"shards/e0000000-ffffffff/testdb1">>,
'bigcouch@node.local',<<"testdb1">>,
[3758096384,4294967295],
- undefined}],
+ undefined,[]}],
?assertEqual(ExpectedShards1, Shards1),
ok.
[3/3] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to
8b3a6b1
Posted by da...@apache.org.
Add internal replication of purge requests
COUCHDB-3326
Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/8b3a6b19
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/8b3a6b19
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/8b3a6b19
Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 8b3a6b19add32dbfe3c78622e3e8176b0a0ccb6a
Parents: b26e394
Author: Mayya Sharipova <ma...@ca.ibm.com>
Authored: Mon Oct 17 17:22:16 2016 -0400
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Wed Mar 15 14:16:42 2017 -0500
----------------------------------------------------------------------
src/mem3_rep.erl | 136 ++++++++++++++++++++++++++++++++++++++++++++++----
src/mem3_rpc.erl | 94 ++++++++++++++++++++++++++++++++--
2 files changed, 214 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/8b3a6b19/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 64905e3..419af31 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -17,7 +17,9 @@
go/2,
go/3,
make_local_id/2,
- find_source_seq/4
+ make_local_purge_id/2,
+ find_source_seq/4,
+ mem3_sync_purge/1
]).
-export([
@@ -39,7 +41,8 @@
target,
filter,
db,
- history = {[]}
+ history = {[]},
+ purge_seq = 0
}).
@@ -119,6 +122,10 @@ make_local_id(SourceThing, TargetThing, Filter) ->
<<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
+make_local_purge_id(SourceUUID, TargetUUID) ->
+ <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
+
+
%% @doc Find and return the largest update_seq in SourceDb
%% that the client has seen from TargetNode.
%%
@@ -172,11 +179,55 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
repl(#db{name=DbName}=Db, Acc0) ->
erlang:put(io_priority, {internal_repl, DbName}),
- #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
- Fun = fun ?MODULE:changes_enumerator/2,
- {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
- {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
- {ok, couch_db:count_changes_since(Db, LastSeq)}.
+ #acc{source = Db2} = Acc1 = pull_purges_from_target(Db, Acc0),
+ #acc{seq=Seq} = Acc2 = calculate_start_seq(Acc1),
+ try
+ % this throws an exception: {invalid_start_purge_seq, PurgeSeq0},
+ % when oldest_purge_seq on source > the last source purge_seq known to target
+ Acc3 = replicate_purged_docs(Acc2),
+ Fun = fun ?MODULE:changes_enumerator/2,
+ {ok, Acc4} = couch_db:fold_changes(Db2, Seq, Fun, Acc3),
+ {ok, #acc{seq = LastSeq}} = replicate_batch(Acc4),
+ {ok, couch_db:count_changes_since(Db2, LastSeq)}
+ catch
+ throw:{invalid_start_purge_seq, PurgeSeq} ->
+ couch_log:error(
+ "oldest_purge_seq on source passed purge_seq: ~p known to target for db: ~p",
+ [PurgeSeq, DbName]
+ )
+ end.
+
+
+pull_purges_from_target(Db, #acc{target=#shard{node=TNode, name=DbName}}=Acc) ->
+ SourceUUID = couch_db:get_uuid(Db),
+ {TUUIDsIdsRevs, TargetPDocID, TargetPSeq} =
+ mem3_rpc:load_purges(TNode, DbName, SourceUUID),
+ Acc2 = case TUUIDsIdsRevs of
+ [] -> Acc#acc{source = Db};
+ _ ->
+ % check which Target UUIDs have not been applied to Source
+ UUIDs = [UUID || {UUID, _Id, _Revs} <- TUUIDsIdsRevs],
+ PurgedDocs = couch_db:open_purged_docs(Db, UUIDs),
+ Results = lists:zip(TUUIDsIdsRevs, PurgedDocs),
+ Unapplied = lists:filtermap(fun
+ ({UUIDIdRevs, not_found}) -> {true, UUIDIdRevs};
+ (_) -> false
+ end, Results),
+ Acc1 = case Unapplied of
+ [] -> Acc#acc{source = Db};
+ _ ->
+ % purge Db on Source and reopen it
+ couch_db:purge_docs(Db, Unapplied),
+ couch_db:close(Db),
+ {ok, Db2} = couch_db:open(DbName, [?ADMIN_CTX]),
+ Acc#acc{source = Db2}
+ end,
+ % update on Target target_purge_seq known to Source
+ mem3_rpc:save_purge_checkpoint(TNode, DbName, TargetPDocID,
+ TargetPSeq, node()),
+ Acc1
+ end,
+ Acc2.
calculate_start_seq(Acc) ->
@@ -210,7 +261,31 @@ calculate_start_seq(Acc) ->
Seq = TargetSeq,
History = couch_util:get_value(<<"history">>, TProps, {[]})
end,
- Acc1#acc{seq = Seq, history = History};
+ SourcePurgeSeq0 = couch_util:get_value(<<"purge_seq">>, SProps),
+ TargetPurgeSeq0 = couch_util:get_value(<<"purge_seq">>, TProps),
+ % before purge upgrade, purge_seq was not saved in checkpoint file,
+ % thus get purge_seq directly from dbs
+ SourcePurgeSeq = case is_integer(SourcePurgeSeq0) of
+ true ->
+ SourcePurgeSeq0;
+ false ->
+ {ok, SPS} = couch_db:get_purge_seq(Db),
+ SPS
+ end,
+ TargetPurgeSeq = case is_integer(TargetPurgeSeq0) of
+ true ->
+ TargetPurgeSeq0;
+ false ->
+ {ok, TPS} = mem3_rpc:get_purge_seq(Node, Name),
+ TPS
+ end,
+ case SourcePurgeSeq =< TargetPurgeSeq of
+ true ->
+ PurgeSeq = SourcePurgeSeq;
+ false ->
+ PurgeSeq = TargetPurgeSeq
+ end,
+ Acc1#acc{seq = Seq, history = History, purge_seq = PurgeSeq};
{not_found, _} ->
compare_epochs(Acc1)
end.
@@ -246,6 +321,27 @@ changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) ->
{Go, Acc1}.
+replicate_purged_docs(Acc0) ->
+ #acc{
+ source = Db,
+ target = #shard{node=Node, name=Name},
+ purge_seq = PurgeSeq0
+ } = Acc0,
+ PFoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) ->
+ [{UUID, Id, Revs} | Acc]
+ end,
+
+ {ok, UUIDsIdsRevs} = couch_db:fold_purged_docs(Db, PurgeSeq0, PFoldFun, [], []),
+ case UUIDsIdsRevs of
+ [] ->
+ Acc0;
+ _ ->
+ ok = purge_on_target(Node, Name, UUIDsIdsRevs),
+ {ok, PurgeSeq} = couch_db:get_purge_seq(Db),
+ Acc0#acc{purge_seq = PurgeSeq}
+ end.
+
+
replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
case find_missing_revs(Acc) of
[] ->
@@ -291,8 +387,19 @@ save_on_target(Node, Name, Docs) ->
ok.
+purge_on_target(Node, Name, UUIdsIdsRevs) ->
+ mem3_rpc:purge_docs(Node, Name, UUIdsIdsRevs,[
+ replicated_changes,
+ full_commit,
+ ?ADMIN_CTX,
+ {io_priority, {internal_repl, Name}}
+ ]),
+ ok.
+
+
update_locals(Acc) ->
- #acc{seq=Seq, source=Db, target=Target, localid=Id, history=History} = Acc,
+ #acc{seq=Seq, purge_seq = PurgeSeq, source=Db, target=Target,
+ localid=Id, history=History} = Acc,
#shard{name=Name, node=Node} = Target,
NewEntry = [
{<<"source_node">>, atom_to_binary(node(), utf8)},
@@ -300,8 +407,9 @@ update_locals(Acc) ->
{<<"source_seq">>, Seq},
{<<"timestamp">>, list_to_binary(iso8601_timestamp())}
],
- NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History),
- {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
+ NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, PurgeSeq,
+ NewEntry, History),
+ {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
find_repl_doc(SrcDb, TgtUUIDPrefix) ->
@@ -336,6 +444,12 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
end.
+% used during compaction to check if _local/purge doc is current
+mem3_sync_purge(Opts)->
+ Node = couch_util:get_value(<<"node">>, Opts),
+ lists:member(mem3:nodes(), Node).
+
+
is_prefix(Prefix, Subject) ->
binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/8b3a6b19/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index 13641dc..07b4f54 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -19,15 +19,21 @@
find_common_seq/4,
get_missing_revs/4,
update_docs/4,
+ get_purge_seq/2,
+ purge_docs/4,
load_checkpoint/4,
- save_checkpoint/6
+ save_checkpoint/7,
+ load_purges/3,
+ save_purge_checkpoint/5
]).
% Private RPC callbacks
-export([
find_common_seq_rpc/3,
load_checkpoint_rpc/3,
- save_checkpoint_rpc/5
+ save_checkpoint_rpc/6,
+ load_purges_rpc/2,
+ save_purge_checkpoint_rpc/4
]).
@@ -43,16 +49,34 @@ update_docs(Node, DbName, Docs, Options) ->
rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
+get_purge_seq(Node, DbName) ->
+ rexi_call(Node, {fabric_rpc, get_purge_seq, [DbName]}).
+
+
+purge_docs(Node, DbName, PUUIdsIdsRevs, Options) ->
+ rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PUUIdsIdsRevs, Options]}).
+
+
load_checkpoint(Node, DbName, SourceNode, SourceUUID) ->
Args = [DbName, SourceNode, SourceUUID],
rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
-save_checkpoint(Node, DbName, DocId, Seq, Entry, History) ->
- Args = [DbName, DocId, Seq, Entry, History],
+save_checkpoint(Node, DbName, DocId, Seq, PurgeSeq, Entry, History) ->
+ Args = [DbName, DocId, Seq, PurgeSeq, Entry, History],
rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}).
+load_purges(Node, DbName, SourceUUID) ->
+ Args = [DbName, SourceUUID],
+ rexi_call(Node, {mem3_rpc, load_purges_rpc, Args}).
+
+
+save_purge_checkpoint(Node, DbName, DocId, PurgeSeq, SourceNode) ->
+ Args = [DbName, DocId, PurgeSeq, SourceNode],
+ rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}).
+
+
find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
Args = [DbName, SourceUUID, SourceEpochs],
rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
@@ -81,7 +105,8 @@ load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
end.
-save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
+save_checkpoint_rpc(DbName, Id, SourceSeq, SourcePurgeSeq,
+ NewEntry0, History0) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
{ok, Db} ->
@@ -93,6 +118,7 @@ save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
] ++ NewEntry0},
Body = {[
{<<"seq">>, SourceSeq},
+ {<<"purge_seq">>, SourcePurgeSeq},
{<<"target_uuid">>, couch_db:get_uuid(Db)},
{<<"history">>, add_checkpoint(NewEntry, History0)}
]},
@@ -129,6 +155,64 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
end.
+load_purges_rpc(DbName, SourceUUID) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ case get_or_create_db(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ TargetUUID = couch_db:get_uuid(Db),
+ DocId = mem3_rep:make_local_purge_id(SourceUUID, TargetUUID),
+ LastPSeq = case couch_db:open_doc(Db, DocId, []) of
+ {ok, #doc{body={Props}} } ->
+ couch_util:get_value(<<"purge_seq">>, Props);
+ {not_found, _} ->
+ 0
+ end,
+ {ok, CurPSeq} = couch_db:get_purge_seq(Db),
+ UUIDsIdsRevs = if (LastPSeq == CurPSeq) -> []; true ->
+ FoldFun = fun({_PSeq, UUID, Id, Revs}, Acc) ->
+ [{UUID, Id, Revs} | Acc]
+ end,
+ {ok, UUIDsIdsRevs0} = couch_db:fold_purged_docs(
+ Db, LastPSeq, FoldFun, [], []
+ ),
+ UUIDsIdsRevs0
+ end,
+ rexi:reply({ok, {UUIDsIdsRevs, DocId, CurPSeq}});
+ Error ->
+ rexi:reply(Error)
+ end.
+
+
+save_purge_checkpoint_rpc(DbName, Id, PurgeSeq, Node) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ case get_or_create_db(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ Timestamp = couch_util:utc_string(),
+ Body = {[
+ {<<"purge_seq">>, PurgeSeq},
+ {<<"timestamp_utc">>, Timestamp},
+ {<<"verify_module">>, <<"mem3_rep">>},
+ {<<"verify_function">>, <<"mem3_sync_purge">>},
+ {<<"verify_options">>, {[{<<"node">>, Node}]}},
+ {<<"type">>, <<"internal_replication">>}
+ ]},
+ Doc = #doc{id = Id, body = Body},
+ rexi:reply(try couch_db:update_doc(Db, Doc, []) of
+ {ok, _} ->
+ {ok, Body};
+ Else ->
+ {error, Else}
+ catch
+ Exception ->
+ Exception;
+ error:Reason ->
+ {error, Reason}
+ end);
+ Error ->
+ rexi:reply(Error)
+ end.
+
+
%% @doc Return the sequence where two files with the same UUID diverged.
compare_epochs(SourceEpochs, TargetEpochs) ->
compare_rev_epochs(
[2/3] mem3 commit: updated refs/heads/COUCHDB-3326-clustered-purge to
8b3a6b1
Posted by da...@apache.org.
Update to use the pluggable storage API
Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/5e5201a6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/5e5201a6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/5e5201a6
Branch: refs/heads/COUCHDB-3326-clustered-purge
Commit: 5e5201a6dccaaf425bce6627038d15ca7ba207a0
Parents: 15615b2
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Feb 10 16:56:28 2016 -0600
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Tue Jun 7 13:00:01 2016 -0500
----------------------------------------------------------------------
src/mem3_nodes.erl | 33 ++++++++++++++++++---------------
src/mem3_rep.erl | 23 +++++++++++------------
src/mem3_rpc.erl | 3 ++-
src/mem3_shards.erl | 15 +++++++++------
4 files changed, 40 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/5e5201a6/src/mem3_nodes.erl
----------------------------------------------------------------------
diff --git a/src/mem3_nodes.erl b/src/mem3_nodes.erl
index f31891a..b4ce95f 100644
--- a/src/mem3_nodes.erl
+++ b/src/mem3_nodes.erl
@@ -92,24 +92,27 @@ code_change(_OldVsn, #state{}=State, _Extra) ->
initialize_nodelist() ->
DbName = config:get("mem3", "nodes_db", "_nodes"),
{ok, Db} = mem3_util:ensure_exists(DbName),
- {ok, _, Db} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, Db, []),
- % add self if not already present
- case ets:lookup(?MODULE, node()) of
- [_] ->
- ok;
- [] ->
- ets:insert(?MODULE, {node(), []}),
- Doc = #doc{id = couch_util:to_binary(node())},
- {ok, _} = couch_db:update_doc(Db, Doc, [])
- end,
- couch_db:close(Db),
- Db#db.update_seq.
+ try
+ {ok, Db} = couch_db:fold_docs(Db, fun first_fold/2, Db),
+ % add self it not already present
+ case ets:lookup(?MODULE, node()) of
+ [_] ->
+ ok;
+ [] ->
+ ets:insert(?MODULE, {node(), []}),
+ Doc = #doc{id = couch_util:to_binary(node())},
+ {ok, _} = couch_db:update_doc(Db, Doc, [])
+ end,
+ couch_db:get_update_seq(Db)
+ after
+ couch_db:close(Db)
+ end.
-first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
+first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
{ok, Acc};
-first_fold(#full_doc_info{deleted=true}, _, Acc) ->
+first_fold(#full_doc_info{deleted=true}, Acc) ->
{ok, Acc};
-first_fold(#full_doc_info{id=Id}=DocInfo, _, Db) ->
+first_fold(#full_doc_info{id=Id}=DocInfo, Db) ->
{ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo, [ejson_body]),
ets:insert(?MODULE, {mem3_util:to_atom(Id), Props}),
{ok, Db}.
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/5e5201a6/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 938260d..64905e3 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -21,7 +21,7 @@
]).
-export([
- changes_enumerator/3
+ changes_enumerator/2
]).
@@ -170,11 +170,11 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
end.
-repl(#db{name=DbName, seq_tree=Bt}=Db, Acc0) ->
+repl(#db{name=DbName}=Db, Acc0) ->
erlang:put(io_priority, {internal_repl, DbName}),
#acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
- Fun = fun ?MODULE:changes_enumerator/3,
- {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
+ Fun = fun ?MODULE:changes_enumerator/2,
+ {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1),
{ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
{ok, couch_db:count_changes_since(Db, LastSeq)}.
@@ -225,11 +225,10 @@ compare_epochs(Acc) ->
Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs),
Acc#acc{seq = Seq, history = {[]}}.
-changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
+changes_enumerator(#doc_info{id=DocId}, #acc{db=Db}=Acc) ->
{ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
- changes_enumerator(FDI, Reds, Acc);
-changes_enumerator(#full_doc_info{}=FDI, _,
- #acc{revcount=C, infos=Infos}=Acc0) ->
+ changes_enumerator(FDI, Acc);
+changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) ->
#doc_info{
high_seq=Seq,
revs=Revs
@@ -325,11 +324,11 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
{stop, not_found}
end
end,
- Options = [{start_key, DocIdPrefix}],
- case couch_btree:fold(SrcDb#db.local_tree, FoldFun, not_found, Options) of
- {ok, _, {TgtUUID, Doc}} ->
+ Options = [{start_key, DocIdPrefix}, {local, true}],
+ case couch_db:fold_docs(SrcDb, Options, FoldFun, not_found) of
+ {ok, {TgtUUID, Doc}} ->
{ok, TgtUUID, Doc};
- {ok, _, not_found} ->
+ {ok, not_found} ->
{not_found, missing};
Else ->
couch_log:error("Error finding replication doc: ~w", [Else]),
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/5e5201a6/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index 93cb99a..13641dc 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -84,7 +84,8 @@ load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
- {ok, #db{update_seq = TargetSeq} = Db} ->
+ {ok, Db} ->
+ TargetSeq = couch_db:get_update_seq(Db),
NewEntry = {[
{<<"target_node">>, atom_to_binary(node(), utf8)},
{<<"target_uuid">>, couch_db:get_uuid(Db)},
http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/5e5201a6/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index 8a1bb54..548ab3e 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -155,7 +155,7 @@ fold(Fun, Acc) ->
{ok, Db} = mem3_util:ensure_exists(DbName),
FAcc = {Db, Fun, Acc},
try
- {ok, _, LastAcc} = couch_db:enum_docs(Db, fun fold_fun/3, FAcc, []),
+ {ok, LastAcc} = couch_db:fold_docs(Db, fun fold_fun/2, FAcc),
{_Db, _UFun, UAcc} = LastAcc,
UAcc
after
@@ -247,10 +247,10 @@ code_change(_OldVsn, #st{}=St, _Extra) ->
%% internal functions
-fold_fun(#full_doc_info{}=FDI, _, Acc) ->
+fold_fun(#full_doc_info{}=FDI, Acc) ->
DI = couch_doc:to_doc_info(FDI),
- fold_fun(DI, nil, Acc);
-fold_fun(#doc_info{}=DI, _, {Db, UFun, UAcc}) ->
+ fold_fun(DI, Acc);
+fold_fun(#doc_info{}=DI, {Db, UFun, UAcc}) ->
case couch_db:open_doc(Db, DI, [ejson_body, conflicts]) of
{ok, Doc} ->
{Props} = Doc#doc.body,
@@ -264,8 +264,11 @@ fold_fun(#doc_info{}=DI, _, {Db, UFun, UAcc}) ->
get_update_seq() ->
DbName = config:get("mem3", "shards_db", "_dbs"),
{ok, Db} = mem3_util:ensure_exists(DbName),
- couch_db:close(Db),
- Db#db.update_seq.
+ try
+ couch_db:get_update_seq(Db)
+ after
+ couch_db:close(Db)
+ end.
listen_for_changes(Since) ->
DbName = config:get("mem3", "shards_db", "_dbs"),