You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:22:21 UTC

[01/50] mem3 commit: updated refs/heads/master to 64c0c74

Repository: couchdb-mem3
Updated Branches:
  refs/heads/master 9dbea0340 -> 64c0c7475


Fix _membership/$DBNAME api endpoint

This switches the JSON key to be a binary, as required by jiffy.

Also, remove extraneous <<"parts">> path from the url.

Show full shard range.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/51e436f9
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/51e436f9
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/51e436f9

Branch: refs/heads/master
Commit: 51e436f9f578f00cba6b8087c3266346a802f267
Parents: 7fa726f
Author: Russell Branca <ch...@gmail.com>
Authored: Thu Apr 11 14:18:12 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:11:47 2014 +0100

----------------------------------------------------------------------
 src/mem3_httpd.erl | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/51e436f9/src/mem3_httpd.erl
----------------------------------------------------------------------
diff --git a/src/mem3_httpd.erl b/src/mem3_httpd.erl
index 94196fa..f182f7f 100644
--- a/src/mem3_httpd.erl
+++ b/src/mem3_httpd.erl
@@ -28,7 +28,7 @@ handle_membership_req(#httpd{method='GET',
         {cluster_nodes, lists:sort(ClusterNodes)}
     ]});
 handle_membership_req(#httpd{method='GET',
-        path_parts=[<<"_membership">>, <<"parts">>, DbName]} = Req) ->
+        path_parts=[<<"_membership">>, DbName]} = Req) ->
     ClusterNodes = try mem3:nodes()
     catch _:_ -> {ok,[]} end,
     Shards = mem3:shards(DbName),
@@ -46,6 +46,8 @@ handle_membership_req(#httpd{method='GET',
 json_shards([], AccIn) ->
     List = dict:to_list(AccIn),
     {lists:sort(List)};
-json_shards([#shard{node=Node, range=[B,_E]} | Rest], AccIn) ->
+json_shards([#shard{node=Node, range=[B,E]} | Rest], AccIn) ->
     HexBeg = couch_util:to_hex(<<B:32/integer>>),
-    json_shards(Rest, dict:append(HexBeg, Node, AccIn)).
+    HexEnd = couch_util:to_hex(<<E:32/integer>>),
+    Range = list_to_binary(HexBeg ++ "-" ++ HexEnd),
+    json_shards(Rest, dict:append(Range, Node, AccIn)).


[27/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Stabilize mem3_util:owner/2

BugzID: 21413


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/720049ba
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/720049ba
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/720049ba

Branch: refs/heads/master
Commit: 720049ba273388e084fc1f2b7ab18e836595c063
Parents: 4b6f001
Author: Robert Newson <ro...@cloudant.com>
Authored: Wed Jul 31 11:14:30 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3_util.erl | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/720049ba/src/mem3_util.erl
----------------------------------------------------------------------
diff --git a/src/mem3_util.erl b/src/mem3_util.erl
index 8e17393..0927493 100644
--- a/src/mem3_util.erl
+++ b/src/mem3_util.erl
@@ -224,10 +224,8 @@ ensure_exists(DbName) ->
 
 
 owner(DbName, DocId) ->
-    Shards = mem3:shards(DbName, DocId),
-    Ushards = mem3:ushards(DbName),
-    [Node] = [N || #shard{node=N} = S <- Shards, lists:member(S, Ushards)],
-    node() =:= Node.
+    Nodes = lists:sort([node()|nodes()]),
+    node() =:= hd(rotate_list({DbName, DocId}, Nodes)).
 
 is_deleted(Change) ->
     case couch_util:get_value(<<"deleted">>, Change) of


[30/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Reorder functions into a logical progression

This just moves functions around in the mem3_rep module to give a better
logical progression. Purely stylistic but it should make things easier
to read and find.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/ade6ab16
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/ade6ab16
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/ade6ab16

Branch: refs/heads/master
Commit: ade6ab16709146f3196505e24067e5151d142148
Parents: 27e315b
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 11:42:49 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rep.erl | 96 +++++++++++++++++++++++++--------------------------
 1 file changed, 48 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/ade6ab16/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 2460f64..9904965 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -101,16 +101,6 @@ go(#acc{source=Source, batch_count=BC}=Acc0) ->
     end.
 
 
-repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
-    erlang:put(io_priority, {internal_repl, DbName}),
-    Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
-    Acc1 = Acc0#acc{source=Db, seq=Seq},
-    Fun = fun ?MODULE:changes_enumerator/3,
-    {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
-    {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
-    {ok, couch_db:count_changes_since(Db, LastSeq)}.
-
-
 make_local_id(#shard{}=Source, #shard{}=Target) ->
     make_local_id(Source, Target, undefined).
 
@@ -129,6 +119,33 @@ make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
     <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
 
 
+repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
+    Acc1 = Acc0#acc{source=Db, seq=Seq},
+    Fun = fun ?MODULE:changes_enumerator/3,
+    {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
+    {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
+    {ok, couch_db:count_changes_since(Db, LastSeq)}.
+
+
+calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
+    case couch_db:open_doc(Db, LocalId, [ejson_body]) of
+    {ok, #doc{body = {SProps}}} ->
+        Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
+        try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
+        #doc{body = {TProps}} ->
+            SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
+            TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
+            erlang:min(SourceSeq, TargetSeq)
+        catch error:{not_found, _} ->
+            0
+        end;
+    {not_found, _} ->
+        0
+    end.
+
+
 changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
     {ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
     changes_enumerator(FDI, Reds, Acc);
@@ -151,17 +168,6 @@ changes_enumerator(#full_doc_info{}=FDI, _,
     {Go, Acc1}.
 
 
-filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
-    try Filter(FullDocInfo) of
-        discard -> discard;
-        _ -> keep
-    catch _:_ ->
-        keep
-    end;
-filter_doc(_, _) ->
-    keep.
-
-
 replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
     case find_missing_revs(Acc) of
     [] ->
@@ -183,6 +189,13 @@ find_missing_revs(Acc) ->
     rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).
 
 
+save_on_target(Node, Name, Docs) ->
+    Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
+        {io_priority, {internal_repl, Name}}],
+    rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+    ok.
+
+
 open_docs(#acc{source=Source, infos=Infos}, Missing) ->
     lists:flatmap(fun({Id, Revs, _}) ->
         FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
@@ -190,11 +203,11 @@ open_docs(#acc{source=Source, infos=Infos}, Missing) ->
     end, Missing).
 
 
-save_on_target(Node, Name, Docs) ->
-    Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
-        {io_priority, {internal_repl, Name}}],
-    rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
-    ok.
+open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
+    {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
+    lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
+                  couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
+    end, FoundRevs).
 
 
 update_locals(Acc) ->
@@ -228,28 +241,15 @@ rexi_call(Node, MFA) ->
     end.
 
 
-calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
-    case couch_db:open_doc(Db, LocalId, [ejson_body]) of
-    {ok, #doc{body = {SProps}}} ->
-        Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
-        try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
-        #doc{body = {TProps}} ->
-            SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
-            TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
-            erlang:min(SourceSeq, TargetSeq)
-        catch error:{not_found, _} ->
-            0
-        end;
-    {not_found, _} ->
-        0
-    end.
-
-
-open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
-    {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
-    lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
-                  couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
-    end, FoundRevs).
+filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
+    try Filter(FullDocInfo) of
+        discard -> discard;
+        _ -> keep
+    catch _:_ ->
+        keep
+    end;
+filter_doc(_, _) ->
+    keep.
 
 
 iso8601_timestamp() ->


[13/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Fix trivial typo


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/7706134e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/7706134e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/7706134e

Branch: refs/heads/master
Commit: 7706134ec6618d272c6d0eb15104007173cd506a
Parents: 68ca9cd
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri May 24 15:00:32 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3_sync.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/7706134e/src/mem3_sync.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync.erl b/src/mem3_sync.erl
index d85cb2f..9c82c0e 100644
--- a/src/mem3_sync.erl
+++ b/src/mem3_sync.erl
@@ -287,7 +287,7 @@ handle_db_event(<<"shards/", _/binary>> = ShardName, updated, St) ->
         Live = nodes(),
         [?MODULE:push(ShardName,N) || #shard{node=N} <- Targets,
             lists:member(N, Live)]
-    catch error:database_does_not_eist ->
+    catch error:database_does_not_exist ->
         ok
     end,
     {ok, St};


[09/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Balance replication ownership across nodes

The previous algorithm was biased towards low-numbered nodes, and in the
case of a 3 node cluster would declare db1 to be the owner of all
replications.  We can do better just by leveraging the existing
ushards code.

There's a possibility to refactor this as a new ushards/2 function if
that's perceived as useful.

BugzID: 19870


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/68ca9cdf
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/68ca9cdf
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/68ca9cdf

Branch: refs/heads/master
Commit: 68ca9cdfb3c1e9631d2469b5038cec9356e56659
Parents: 8f9f58f
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu May 23 10:32:58 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3_util.erl | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/68ca9cdf/src/mem3_util.erl
----------------------------------------------------------------------
diff --git a/src/mem3_util.erl b/src/mem3_util.erl
index a63d9a0..5c8c989 100644
--- a/src/mem3_util.erl
+++ b/src/mem3_util.erl
@@ -225,10 +225,8 @@ ensure_exists(DbName) ->
 
 owner(DbName, DocId) ->
     Shards = mem3:shards(DbName, DocId),
-    Nodes = [node()|nodes()],
-    LiveShards = [S || #shard{node=Node} = S <- Shards, lists:member(Node, Nodes)],
-    [#shard{node=Node}] = lists:usort(fun(#shard{name=A}, #shard{name=B}) ->
-                                              A =< B  end, LiveShards),
+    Ushards = mem3:ushards(DbName),
+    [Node] = [N || #shard{node=N} = S <- Shards, lists:member(S, Ushards)],
     node() =:= Node.
 
 is_deleted(Change) ->


[11/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Add function to assist with rebalancing

This function takes either a database name or a list of shards and a
list of target nodes to balance the shards across. Every node with
less than a fair share of shards will steal shards from the node with
the most shards as long as both shards are in the same zone.

BugzID: 18638


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/d2171e9b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/d2171e9b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/d2171e9b

Branch: refs/heads/master
Commit: d2171e9b4d7cfe78858a6139cdf74d79607fe3e6
Parents: 43cd763
Author: Robert Newson <ro...@cloudant.com>
Authored: Wed Apr 3 20:13:35 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 81 +++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 81 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/d2171e9b/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
new file mode 100644
index 0000000..ca3c4a7
--- /dev/null
+++ b/src/mem3_rebalance.erl
@@ -0,0 +1,81 @@
+% Copyright 2013 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_rebalance).
+
+-export([rebalance/1, rebalance/2]).
+-include("mem3.hrl").
+
+rebalance(DbName) ->
+    rebalance(DbName, mem3:nodes()).
+
+rebalance(DbName, TargetNodes) when is_binary(DbName) ->
+    rebalance(mem3:shards(DbName), TargetNodes);
+rebalance(Shards, TargetNodes) when is_list(Shards) ->
+    TargetLevel = length(Shards) div length(TargetNodes),
+    rebalance2(TargetLevel, Shards, TargetNodes, TargetNodes, []).
+
+rebalance2(_TargetLevel, Shards, _Nodes, [], Moves) ->
+    {Shards, Moves};
+rebalance2(TargetLevel, Shards, Nodes, [Node | Rest], Moves) ->
+    ShardsForNode = [S || S <- Shards, S#shard.node =:= Node],
+    CurrentLevel = length(ShardsForNode),
+    case CurrentLevel < TargetLevel of
+        true ->
+            case victim(TargetLevel, Shards, Nodes, Node) of
+                {ok, Victim} ->
+                    rebalance2(TargetLevel,
+                             replace(Victim, Victim#shard{node=Node}, Shards),
+                             Nodes, [Node|Rest], [{Victim, Node}|Moves]);
+                false ->
+                    rebalance2(TargetLevel, Shards, Nodes, Rest, Moves)
+            end;
+        false ->
+            rebalance2(TargetLevel, Shards, Nodes, Rest, Moves)
+    end.
+
+victim(TargetLevel, Shards, Nodes, TargetNode) ->
+    TargetZone = mem3:node_info(TargetNode, <<"zone">>),
+    CandidateNodes = lists:usort([Node || Node <- Nodes,
+                                     Node =/= TargetNode,
+                                     mem3:node_info(Node, <<"zone">>) =:= TargetZone]),
+    %% make {Node, ShardsInNode} list
+    GroupedByNode0 = [{Node, [S || S <- Shards, S#shard.node =:= Node]} || Node <- CandidateNodes],
+    %% don't take from a node below target level
+    GroupedByNode1 = [{N, SS} || {N, SS} <- GroupedByNode0, length(SS) > TargetLevel],
+    %% prefer to take from a node with more shards than others
+    GroupedByNode2 = lists:sort(fun largest_first/2, GroupedByNode1),
+    %% don't take a shard for a range the target already has
+    TargetRanges = lists:usort([S#shard.range || S <- Shards, S#shard.node =:= TargetNode]),
+    GroupedByNode3 = [{N, lists:filter(fun(S) -> not lists:member(S#shard.range, TargetRanges) end, SS)}
+                      || {N, SS} <- GroupedByNode2],
+    %% remove nodes with no candidates shards
+    GroupedByNode4 = [{N, SS} || {N, SS} <- GroupedByNode3, SS =/= []],
+    case GroupedByNode4 of
+        [{_, [Victim|_]} | _] -> {ok, Victim};
+        [] -> false
+    end.
+
+largest_first({_, A}, {_, B}) ->
+    length(A) >= length(B).
+
+replace(A, B, List) ->
+    replace(A, B, List, []).
+
+replace(_A, _B, [], Acc) ->
+    Acc;
+replace(A, B, [A | Rest], Acc) ->
+    replace(A, B, Rest, [B | Acc]);
+replace(A, B, [C | Rest], Acc) ->
+    replace(A, B, Rest, [C | Acc]).


[34/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Update whitespace and exports formatting


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/27e315be
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/27e315be
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/27e315be

Branch: refs/heads/master
Commit: 27e315be18d1dc0584a08a5baac06370cfeca912
Parents: 88a6491
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 11:36:47 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rep.erl | 32 +++++++++++++++++++++++++++++++-
 1 file changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/27e315be/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 1a528e7..2460f64 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -12,13 +12,25 @@
 
 -module(mem3_rep).
 
--export([go/2, go/3, changes_enumerator/3, make_local_id/2]).
+
+-export([
+    go/2,
+    go/3,
+    make_local_id/2
+]).
+
+-export([
+    changes_enumerator/3
+]).
+
 
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+
 -define(CTX, #user_ctx{roles = [<<"_admin">>]}).
 
+
 -record(acc, {
     batch_size,
     batch_count,
@@ -32,12 +44,15 @@
     db
 }).
 
+
 go(Source, Target) ->
     go(Source, Target, []).
 
+
 go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) ->
     go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}, Opts);
 
+
 go(#shard{} = Source, #shard{} = Target, Opts) ->
     mem3_sync_security:maybe_sync(Source, Target),
     BatchSize = case proplists:get_value(batch_size, Opts) of
@@ -61,6 +76,7 @@ go(#shard{} = Source, #shard{} = Target, Opts) ->
     },
     go(Acc).
 
+
 go(#acc{source=Source, batch_count=BC}=Acc0) ->
     case couch_db:open(Source#shard.name, [{user_ctx,?CTX}]) of
     {ok, Db} ->
@@ -84,6 +100,7 @@ go(#acc{source=Source, batch_count=BC}=Acc0) ->
         {error, missing_source}
     end.
 
+
 repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
@@ -93,9 +110,11 @@ repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
     {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
     {ok, couch_db:count_changes_since(Db, LastSeq)}.
 
+
 make_local_id(#shard{}=Source, #shard{}=Target) ->
     make_local_id(Source, Target, undefined).
 
+
 make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
     S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
     T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
@@ -109,6 +128,7 @@ make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
     end,
     <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
 
+
 changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
     {ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
     changes_enumerator(FDI, Reds, Acc);
@@ -130,6 +150,7 @@ changes_enumerator(#full_doc_info{}=FDI, _,
     Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end,
     {Go, Acc1}.
 
+
 filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
     try Filter(FullDocInfo) of
         discard -> discard;
@@ -140,6 +161,7 @@ filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
 filter_doc(_, _) ->
     keep.
 
+
 replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
     case find_missing_revs(Acc) of
     [] ->
@@ -150,6 +172,7 @@ replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
     update_locals(Acc),
     {ok, Acc#acc{revcount=0, infos=[]}}.
 
+
 find_missing_revs(Acc) ->
     #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc,
     IdsRevs = lists:map(fun(FDI) ->
@@ -159,18 +182,21 @@ find_missing_revs(Acc) ->
     Options = [{io_priority, {internal_repl, Name}}, {user_ctx, ?CTX}],
     rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).
 
+
 open_docs(#acc{source=Source, infos=Infos}, Missing) ->
     lists:flatmap(fun({Id, Revs, _}) ->
         FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
         open_doc_revs(Source, FDI, Revs)
     end, Missing).
 
+
 save_on_target(Node, Name, Docs) ->
     Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
         {io_priority, {internal_repl, Name}}],
     rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
     ok.
 
+
 update_locals(Acc) ->
     #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
     #shard{name=Name, node=Node} = Target,
@@ -183,6 +209,7 @@ update_locals(Acc) ->
     Options = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
     rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).
 
+
 rexi_call(Node, MFA) ->
     Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
     Ref = rexi:cast(Node, self(), MFA, [sync]),
@@ -200,6 +227,7 @@ rexi_call(Node, MFA) ->
         rexi_monitor:stop(Mon)
     end.
 
+
 calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
     case couch_db:open_doc(Db, LocalId, [ejson_body]) of
     {ok, #doc{body = {SProps}}} ->
@@ -216,12 +244,14 @@ calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
         0
     end.
 
+
 open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
     {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
     lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
                   couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
     end, FoundRevs).
 
+
 iso8601_timestamp() ->
     {_,_,Micro} = Now = os:timestamp(),
     {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),


[02/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Add doc shard info endpoint


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/d4e7748f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/d4e7748f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/d4e7748f

Branch: refs/heads/master
Commit: d4e7748f096a2ff3bcbf8a00adc0f6bf805d4b7e
Parents: 51e436f
Author: Russell Branca <ch...@gmail.com>
Authored: Fri Apr 12 15:06:58 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:12:00 2014 +0100

----------------------------------------------------------------------
 src/mem3_httpd.erl | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/d4e7748f/src/mem3_httpd.erl
----------------------------------------------------------------------
diff --git a/src/mem3_httpd.erl b/src/mem3_httpd.erl
index f182f7f..dc42924 100644
--- a/src/mem3_httpd.erl
+++ b/src/mem3_httpd.erl
@@ -37,6 +37,14 @@ handle_membership_req(#httpd{method='GET',
         {all_nodes, lists:sort([node()|nodes()])},
         {cluster_nodes, lists:sort(ClusterNodes)},
         {partitions, JsonShards}
+    ]});
+handle_membership_req(#httpd{method='GET',
+        path_parts=[<<"_membership">>, DbName, DocId]} = Req) ->
+    Shards = mem3:shards(DbName, DocId),
+    {[{Shard, Dbs}]} = json_shards(Shards, dict:new()),
+    couch_httpd:send_json(Req, {[
+        {range, Shard},
+        {nodes, Dbs}
     ]}).
 
 %%


[25/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Fix two bugs in the global balancing phase

* Nodes with 0 shards were being ignored because wrong datatype.
* The limit was being ignored because max not min.

BugzID: 24680


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/51838ca2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/51838ca2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/51838ca2

Branch: refs/heads/master
Commit: 51838ca25bd44ca287676e98d2c907205300eb58
Parents: 0b0a7e7
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 30 11:05:41 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/51838ca2/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 7e0826b..af050f6 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -138,8 +138,8 @@ global_expand(TargetNodes0, LocalOps, Limit) ->
         ({Node0, Count}, Acc) ->
             Node = list_to_existing_atom(binary_to_list(Node0)),
             % Compute the max number of shards to donate.
-            DC0 = erlang:max(Count - TargetLevel, Limit - length(Acc)),
-            InternalAcc0 = {Node, TargetNodes, Acc, DC0},
+            DC0 = erlang:min(Count - TargetLevel, Limit - length(Acc)),
+            InternalAcc0 = {Node, TargetNodes0, Acc, DC0},
             try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of
                 {_, _, Moves, _} ->
                     Moves


[39/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Include replication history on checkpoint docs

This changes how and what we store on internal replication checkpoint
documents. The two major changes are that we are now identifying
checkpoint documents by the database UUIDs (instead of the node that
hosted them) and we're storing a history of checkpoint information to
allow us to be able to replace dead shards.

The history is a list of checkpoint entries stored with exponentially
decreasing granularity. This allows us to store ~30 checkpoints covering
ranges into the billions of update sequences which means we won't need
to worry about truncations or other issues for the time being.

There's also a new mem3_rep:find_source_seq/4 helper function that will
find a local update_seq replacement provided information for a remote
shard copy. This logic is a bit subtle and should be reused rather than
reimplemented.

BugzId: 21973


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/bf07a7c2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/bf07a7c2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/bf07a7c2

Branch: refs/heads/master
Commit: bf07a7c2987fe3dea2088f25be373f05135e3f8a
Parents: 36a1e63
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 12:08:07 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:50:19 2014 +0100

----------------------------------------------------------------------
 src/mem3_rep.erl | 259 ++++++++++++++++++++++-----
 src/mem3_rpc.erl | 476 +++++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 685 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/bf07a7c2/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 4284518..bdc6fa4 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -16,7 +16,8 @@
 -export([
     go/2,
     go/3,
-    make_local_id/2
+    make_local_id/2,
+    find_source_seq/4
 ]).
 
 -export([
@@ -36,12 +37,13 @@
     batch_count,
     revcount = 0,
     infos = [],
-    seq,
+    seq = 0,
     localid,
     source,
     target,
     filter,
-    db
+    db,
+    history = {[]}
 }).
 
 
@@ -65,11 +67,9 @@ go(#shard{} = Source, #shard{} = Target, Opts) ->
         _ -> 1
     end,
     Filter = proplists:get_value(filter, Opts),
-    LocalId = make_local_id(Source, Target, Filter),
     Acc = #acc{
         batch_size = BatchSize,
         batch_count = BatchCount,
-        localid = LocalId,
         source = Source,
         target = Target,
         filter = Filter
@@ -101,13 +101,17 @@ go(#acc{source=Source, batch_count=BC}=Acc0) ->
     end.
 
 
-make_local_id(#shard{}=Source, #shard{}=Target) ->
+make_local_id(Source, Target) ->
     make_local_id(Source, Target, undefined).
 
 
 make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
-    S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceNode))),
-    T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetNode))),
+    make_local_id(SourceNode, TargetNode, Filter);
+
+
+make_local_id(SourceThing, TargetThing, Filter) ->
+    S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SourceThing))),
+    T = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(TargetThing))),
     F = case is_function(Filter) of
         true ->
             {new_uniq, Hash} = erlang:fun_info(Filter, new_uniq),
@@ -119,30 +123,102 @@ make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
     <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
 
 
-repl(#db{name=DbName, seq_tree=Bt}=Db, #acc{localid=LocalId}=Acc0) ->
+%% @doc Find and return the largest update_seq in SourceDb
+%% that the client has seen from TargetNode.
+%%
+%% When reasoning about this function it is very important to
+%% understand the direction of replication for this comparison.
+%% We're only interesting in internal replications initiated
+%% by this node to the node being replaced. When doing a
+%% replacement the most important thing is that the client doesn't
+%% miss any updates. This means we can only fast-forward as far
+%% as they've seen updates on this node. We can detect that by
+%% looking for our push replication history and choosing the
+%% largest source_seq that has a target_seq =< TgtSeq.
+find_source_seq(SrcDb, TgtNode, TgtUUID, TgtSeq) ->
+    SrcNode = atom_to_binary(node(), utf8),
+    SrcUUID = couch_db:get_uuid(SrcDb),
+    DocId = make_local_id(SrcUUID, TgtUUID),
+    case couch_db:open_doc(SrcDb, DocId, []) of
+    {ok, Doc} ->
+        find_source_seq_int(Doc, SrcNode, TgtNode, TgtUUID, TgtSeq);
+    {not_found, _} ->
+        0
+    end.
+
+
+find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
+    SrcNode = case is_atom(SrcNode0) of
+        true -> atom_to_binary(SrcNode0, utf8);
+        false -> SrcNode0
+    end,
+    TgtNode = case is_atom(TgtNode0) of
+        true -> atom_to_binary(TgtNode0, utf8);
+        false -> TgtNode0
+    end,
+    % This is split off purely for the ability to run unit tests
+    % against this bit of code without requiring all sorts of mocks.
+    {History} = couch_util:get_value(<<"history">>, Props, {[]}),
+    SrcHistory = couch_util:get_value(SrcNode, History, []),
+    UseableHistory = lists:filter(fun({Entry}) ->
+        couch_util:get_value(<<"target_node">>, Entry) =:= TgtNode andalso
+        couch_util:get_value(<<"target_uuid">>, Entry) =:= TgtUUID andalso
+        couch_util:get_value(<<"target_seq">>,  Entry) =<  TgtSeq
+    end, SrcHistory),
+
+    % This relies on SrcHistory being ordered desceding by source
+    % sequence.
+    case UseableHistory of
+        [{Entry} | _] ->
+            couch_util:get_value(<<"source_seq">>, Entry);
+        [] ->
+            0
+    end.
+
+
+repl(#db{name=DbName, seq_tree=Bt}=Db, Acc0) ->
     erlang:put(io_priority, {internal_repl, DbName}),
-    Seq = calculate_start_seq(Db, Acc0#acc.target, LocalId),
-    Acc1 = Acc0#acc{source=Db, seq=Seq},
+    #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
     Fun = fun ?MODULE:changes_enumerator/3,
     {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
     {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
     {ok, couch_db:count_changes_since(Db, LastSeq)}.
 
 
-calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
-    case couch_db:open_doc(Db, LocalId, [ejson_body]) of
-    {ok, #doc{body = {SProps}}} ->
-        Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
-        try mem3_rpc:load_checkpoint(Node, Name, LocalId, Opts) of
-        #doc{body = {TProps}} ->
+calculate_start_seq(Acc) ->
+    #acc{
+        source = Db,
+        target = #shard{node=Node, name=Name}
+    } = Acc,
+    %% Give the target our UUID and ask it to return the checkpoint doc
+    UUID = couch_db:get_uuid(Db),
+    {NewDocId, Doc} = mem3_rpc:load_checkpoint(Node, Name, node(), UUID),
+    #doc{id=FoundId, body={TProps}} = Doc,
+    Acc1 = Acc#acc{localid = NewDocId},
+    % NewDocId and FoundId may be different the first time
+    % this code runs to save our newly named internal replication
+    % checkpoints. We store NewDocId to use when saving checkpoints
+    % but use FoundId to reuse the same docid that the target used.
+    case couch_db:open_doc(Db, FoundId, [ejson_body]) of
+        {ok, #doc{body = {SProps}}} ->
             SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
             TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
-            erlang:min(SourceSeq, TargetSeq)
-        catch error:{not_found, _} ->
-            0
-        end;
-    {not_found, _} ->
-        0
+            % We resume from the lower update seq stored in the two
+            % shard copies. We also need to be sure and use the
+            % corresponding history. A difference here could result
+            % from either a write failure on one of the nodes or if
+            % either shard was truncated by an operator.
+            case SourceSeq =< TargetSeq of
+                true ->
+                    Seq = SourceSeq,
+                    History = couch_util:get_value(<<"history">>, SProps, {[]});
+                false ->
+                    Seq = TargetSeq,
+                    History = couch_util:get_value(<<"history">>, TProps, {[]})
+            end,
+            Acc1#acc{seq = Seq, history = History};
+        {not_found, _} ->
+            Acc1
     end.
 
 
@@ -173,7 +249,8 @@ replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
     [] ->
         ok;
     Missing ->
-        ok = save_on_target(Node, Name, open_docs(Acc, Missing))
+        Docs = open_docs(Acc, Missing),
+        ok = save_on_target(Node, Name, Docs)
     end,
     update_locals(Acc),
     {ok, Acc#acc{revcount=0, infos=[]}}.
@@ -191,20 +268,10 @@ find_missing_revs(Acc) ->
     ]).
 
 
-save_on_target(Node, Name, Docs) ->
-    mem3_rpc:update_docs(Node, Name, Docs, [
-        replicated_changes,
-        full_commit,
-        {user_ctx, ?CTX},
-        {io_priority, {internal_repl, Name}}
-    ]),
-    ok.
-
-
 open_docs(#acc{source=Source, infos=Infos}, Missing) ->
     lists:flatmap(fun({Id, Revs, _}) ->
         FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
-        RevTree = FDI#full_doc_info.rev_tree,
+        #full_doc_info{rev_tree=RevTree} = FDI,
         {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
         lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
             couch_db:make_doc(Source, Id, IsDel, SummaryPtr, FoundRevPath)
@@ -212,19 +279,27 @@ open_docs(#acc{source=Source, infos=Infos}, Missing) ->
     end, Missing).
 
 
+save_on_target(Node, Name, Docs) ->
+    mem3_rpc:update_docs(Node, Name, Docs, [
+        replicated_changes,
+        full_commit,
+        {user_ctx, ?CTX},
+        {io_priority, {internal_repl, Name}}
+    ]),
+    ok.
+
+
 update_locals(Acc) ->
-    #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
+    #acc{seq=Seq, source=Db, target=Target, localid=Id, history=History} = Acc,
     #shard{name=Name, node=Node} = Target,
-    Doc = #doc{id = Id, body = {[
-        {<<"seq">>, Seq},
-        {<<"node">>, list_to_binary(atom_to_list(Node))},
+    NewEntry = [
+        {<<"source_node">>, atom_to_binary(node(), utf8)},
+        {<<"source_uuid">>, couch_db:get_uuid(Db)},
+        {<<"source_seq">>, Seq},
         {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
-    ]}},
-    {ok, _} = couch_db:update_doc(Db, Doc, []),
-    mem3_rpc:save_checkpoint(Node, Name, Doc, [
-        {user_ctx, ?CTX},
-        {io_priority, {internal_repl, Name}}
-    ]).
+    ],
+    NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History),
+    {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
 
 
 filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
@@ -243,3 +318,97 @@ iso8601_timestamp() ->
     {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
     Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
     io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+find_source_seq_unknown_node_test() ->
+    ?assertEqual(
+        find_source_seq_int(doc_(), <<"foo">>, <<"bing">>, <<"bar_uuid">>, 10),
+        0
+    ).
+
+
+find_source_seq_unknown_uuid_test() ->
+    ?assertEqual(
+        find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"teapot">>, 10),
+        0
+    ).
+
+
+find_source_seq_ok_test() ->
+    ?assertEqual(
+        find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 100),
+        100
+    ).
+
+
+find_source_seq_old_ok_test() ->
+    ?assertEqual(
+        find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 84),
+        50
+    ).
+
+
+find_source_seq_different_node_test() ->
+    ?assertEqual(
+        find_source_seq_int(doc_(), <<"foo2">>, <<"bar">>, <<"bar_uuid">>, 92),
+        31
+    ).
+
+
+-define(SNODE, <<"source_node">>).
+-define(SUUID, <<"source_uuid">>).
+-define(SSEQ, <<"source_seq">>).
+-define(TNODE, <<"target_node">>).
+-define(TUUID, <<"target_uuid">>).
+-define(TSEQ, <<"target_seq">>).
+
+doc_() ->
+    Foo_Bar = [
+        {[
+            {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 100},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 100}
+        ]},
+        {[
+            {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 90},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 85}
+        ]},
+        {[
+            {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 50},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 51}
+        ]},
+        {[
+            {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 40},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 45}
+        ]},
+        {[
+            {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 2},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 2}
+        ]}
+    ],
+    Foo2_Bar = [
+        {[
+            {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 100},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 100}
+        ]},
+        {[
+            {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 92},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 93}
+        ]},
+        {[
+            {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 31},
+            {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 30}
+        ]}
+    ],
+    History = {[
+        {<<"foo">>, Foo_Bar},
+        {<<"foo2">>, Foo2_Bar}
+    ]},
+    #doc{
+        body={[{<<"history">>, History}]}
+    }.
+
+-endif.

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/bf07a7c2/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index d71cc93..1e77b57 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -19,7 +19,13 @@
     get_missing_revs/4,
     update_docs/4,
     load_checkpoint/4,
-    save_checkpoint/4
+    save_checkpoint/6
+]).
+
+% Private RPC callbacks
+-export([
+    load_checkpoint_rpc/3,
+    save_checkpoint_rpc/5
 ]).
 
 
@@ -38,12 +44,165 @@ update_docs(Node, DbName, Docs, Options) ->
     rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
 
 
-load_checkpoint(Node, DbName, DocId, Opts) ->
-    rexi_call(Node, {fabric_rpc, open_doc, [DbName, DocId, Opts]}).
+load_checkpoint(Node, DbName, SourceNode, SourceUUID) ->
+    Args = [DbName, SourceNode, SourceUUID],
+    rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
+
+
+save_checkpoint(Node, DbName, DocId, Seq, Entry, History) ->
+    Args = [DbName, DocId, Seq, Entry, History],
+    rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}).
+
+
+load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case couch_db:open_int(DbName, [{user_ctx, ?CTX}]) of
+    {ok, Db} ->
+        TargetUUID = couch_db:get_uuid(Db),
+        NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID),
+        case couch_db:open_doc(Db, NewId, []) of
+        {ok, Doc} ->
+            rexi:reply({ok, {NewId, Doc}});
+        {not_found, _} ->
+            OldId = mem3_rep:make_local_id(SourceNode, node()),
+            case couch_db:open_doc(Db, OldId, []) of
+            {ok, Doc} ->
+                rexi:reply({ok, {NewId, Doc}});
+            {not_found, _} ->
+                rexi:reply({ok, {NewId, #doc{id = NewId}}})
+            end
+        end;
+    Error ->
+        rexi:reply(Error)
+    end.
+
+
+save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case couch_db:open_int(DbName, [{user_ctx, ?CTX}]) of
+        {ok, #db{update_seq = TargetSeq} = Db} ->
+            NewEntry = {[
+                {<<"target_node">>, atom_to_binary(node(), utf8)},
+                {<<"target_uuid">>, couch_db:get_uuid(Db)},
+                {<<"target_seq">>, TargetSeq}
+            ] ++ NewEntry0},
+            Body = {[
+                {<<"seq">>, SourceSeq},
+                {<<"history">>, add_checkpoint(NewEntry, History0)}
+            ]},
+            Doc = #doc{id = Id, body = Body},
+            rexi:reply(try couch_db:update_doc(Db, Doc, []) of
+                {ok, _} ->
+                    {ok, Body};
+                Else ->
+                    {error, Else}
+            catch
+                Exception ->
+                    Exception;
+                error:Reason ->
+                    {error, Reason}
+            end);
+        Error ->
+            rexi:reply(Error)
+    end.
+
 
+%% @doc This adds a new update sequence checkpoint to the replication
+%%      history. Checkpoints are keyed by the source node so that we
+%%      aren't mixing history between source shard moves.
+add_checkpoint({Props}, {History}) ->
+    % Extract the source and target seqs for reference
+    SourceSeq = couch_util:get_value(<<"source_seq">>, Props),
+    TargetSeq = couch_util:get_value(<<"target_seq">>, Props),
 
-save_checkpoint(Node, DbName, Doc, Options) ->
-    rexi_call(Node, {fabric_rpc, update_docs, [DbName, [Doc], Options]}).
+    % Get the history relevant to the source node.
+    SourceNode = couch_util:get_value(<<"source_node">>, Props),
+    SourceHistory = couch_util:get_value(SourceNode, History, []),
+
+    % If either the source or target shard has been truncated
+    % we need to filter out any history that was stored for
+    % any larger update seq than we're currently recording.
+    FilteredHistory = filter_history(SourceSeq, TargetSeq, SourceHistory),
+
+    % Insert the new entry into the history and trim the history
+    % to keep an exponentially increasing delta between checkpoints.
+    % We do this by defining logical buckets of exponentially
+    % increasing size and then keep the smallest and largest values
+    % in each bucket. We keep both min and max points so that
+    % we don't end up with empty buckets as new points are added.
+    %
+    % NB: We're guaranteed to keep the newest entry passed to this
+    % function because we filter out all larger update sequences
+    % which means it is guaranteed to be the smallest value in the
+    % first bucket with a delta of 0.
+    WithNewEntry = [{Props} | FilteredHistory],
+
+    % Tag each entry with the bucket id
+    BucketTagged = lists:map(fun({Entry}) ->
+        EntrySourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
+        BucketTag = case SourceSeq - EntrySourceSeq of
+            0 ->
+                0;
+            N when N > 0 ->
+                % This is int(log2(SourceSeq - EntrySourceSeq))
+                trunc(math:log(N) / math:log(2))
+        end,
+        {BucketTag, SourceSeq - EntrySourceSeq, {Entry}}
+    end, WithNewEntry),
+
+    % Find the min/max entries for each bucket
+    Buckets = lists:foldl(fun({Bucket, Delta, Entry}, BucketAcc) ->
+        {MinEntry, MaxEntry} = case dict:find(Bucket, BucketAcc) of
+            {ok, Value} -> Value;
+            error -> {nil, nil}
+        end,
+        NewMin = case MinEntry of
+            {MinDelta, _} when Delta < MinDelta ->
+                {Delta, Entry};
+            nil ->
+                {Delta, Entry};
+            _ ->
+                MinEntry
+        end,
+        NewMax = case MaxEntry of
+            {MaxDelta, _} when Delta > MaxDelta ->
+                {Delta, Entry};
+            nil ->
+                {Delta, Entry};
+            _ ->
+                MaxEntry
+        end,
+        dict:store(Bucket, {NewMin, NewMax}, BucketAcc)
+    end, dict:new(), BucketTagged),
+
+    % Turn our bucket dict back into a list sorted by increasing
+    % deltas (which corresponds to decreasing source_seq values).
+    NewSourceHistory = lists:flatmap(fun({_Bucket, {Min, Max}}) ->
+        % If there's a single point in a bucket its both the min
+        % and max entry so we account for that here.
+        if Min == Max ->
+            [element(2, Min)];
+        true ->
+            [element(2, Min), element(2, Max)]
+        end
+    end, lists:sort(dict:to_list(Buckets))),
+
+    % Finally update the source node history and we're done.
+    NodeRemoved = lists:keydelete(SourceNode, 1, History),
+    {[{SourceNode, NewSourceHistory} | NodeRemoved]}.
+
+
+filter_history(SourceSeqThresh, TargetSeqThresh, History) ->
+    SourceFilter = fun({Entry}) ->
+        SourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
+        SourceSeq < SourceSeqThresh
+    end,
+    TargetFilter = fun({Entry}) ->
+        TargetSeq = couch_util:get_value(<<"target_seq">>, Entry),
+        TargetSeq < TargetSeqThresh
+    end,
+    SourceFiltered = lists:filter(SourceFilter, History),
+    lists:filter(TargetFilter, SourceFiltered).
 
 
 rexi_call(Node, MFA) ->
@@ -62,3 +221,310 @@ rexi_call(Node, MFA) ->
     after
         rexi_monitor:stop(Mon)
     end.
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+-define(SNODE, <<"src@localhost">>).
+-define(TNODE, <<"tgt@localhost">>).
+-define(SNODE_KV, {<<"source_node">>, ?SNODE}).
+-define(TNODE_KV, {<<"target_node">>, ?TNODE}).
+-define(SSEQ, <<"source_seq">>).
+-define(TSEQ, <<"target_seq">>).
+-define(ENTRY(S, T), {[?SNODE_KV, {?SSEQ, S}, ?TNODE_KV, {?TSEQ, T}]}).
+
+
+filter_history_data() ->
+    [
+        ?ENTRY(13, 15),
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ].
+
+
+filter_history_remove_none_test() ->
+    ?assertEqual(filter_history(20, 20, filter_history_data()), [
+        ?ENTRY(13, 15),
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]).
+
+
+filter_history_remove_all_test() ->
+    ?assertEqual(filter_history(1, 1, filter_history_data()), []).
+
+
+filter_history_remove_equal_test() ->
+    ?assertEqual(filter_history(10, 10, filter_history_data()), [
+        ?ENTRY(2, 3)
+    ]),
+    ?assertEqual(filter_history(11, 9, filter_history_data()), [
+        ?ENTRY(2, 3)
+    ]).
+
+
+filter_history_remove_for_source_and_target_test() ->
+    ?assertEqual(filter_history(11, 20, filter_history_data()), [
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]),
+    ?assertEqual(filter_history(14, 14, filter_history_data()), [
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]).
+
+
+filter_history_remove_for_both_test() ->
+    ?assertEqual(filter_history(11, 11, filter_history_data()), [
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]).
+
+
+filter_history_remove_for_both_again_test() ->
+    ?assertEqual(filter_history(3, 4, filter_history_data()), [
+        ?ENTRY(2, 3)
+    ]).
+
+
+add_first_checkpoint_test() ->
+    History = {[]},
+    ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[
+        {?SNODE, [
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+add_first_checkpoint_to_empty_test() ->
+    History = {[{?SNODE, []}]},
+    ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[
+        {?SNODE, [
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+add_second_checkpoint_test() ->
+    History = {[{?SNODE, [?ENTRY(2, 3)]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(10, 9), History), {[
+        {?SNODE, [
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+add_third_checkpoint_test() ->
+    History = {[{?SNODE, [
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(11, 10), History), {[
+        {?SNODE, [
+            ?ENTRY(11, 10),
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+add_fourth_checkpoint_test() ->
+    History = {[{?SNODE, [
+        ?ENTRY(11, 10),
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(12, 13), History), {[
+        {?SNODE, [
+            ?ENTRY(12, 13),
+            ?ENTRY(11, 10),
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+add_checkpoint_with_replacement_test() ->
+    History = {[{?SNODE, [
+        ?ENTRY(12, 13),
+        ?ENTRY(11, 10),
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]}]},
+    % Picking a source_seq of 16 to force 10, 11, and 12
+    % into the same bucket to show we drop the 11 entry.
+    ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[
+        {?SNODE, [
+            ?ENTRY(16, 16),
+            ?ENTRY(12, 13),
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+add_checkpoint_drops_redundant_checkpoints_test() ->
+    % I've added comments showing the bucket ID based
+    % on the ?ENTRY passed to add_checkpoint
+    History = {[{?SNODE, [
+        ?ENTRY(15, 15), % Bucket 0
+        ?ENTRY(14, 14), % Bucket 1
+        ?ENTRY(13, 13), % Bucket 1
+        ?ENTRY(12, 12), % Bucket 2
+        ?ENTRY(11, 11), % Bucket 2
+        ?ENTRY(10, 10), % Bucket 2
+        ?ENTRY(9, 9),   % Bucket 2
+        ?ENTRY(8, 8),   % Bucket 3
+        ?ENTRY(7, 7),   % Bucket 3
+        ?ENTRY(6, 6),   % Bucket 3
+        ?ENTRY(5, 5),   % Bucket 3
+        ?ENTRY(4, 4),   % Bucket 3
+        ?ENTRY(3, 3),   % Bucket 3
+        ?ENTRY(2, 2),   % Bucket 3
+        ?ENTRY(1, 1)    % Bucket 3
+    ]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[
+        {?SNODE, [
+            ?ENTRY(16, 16), % Bucket 0
+            ?ENTRY(15, 15), % Bucket 0
+            ?ENTRY(14, 14), % Bucket 1
+            ?ENTRY(13, 13), % Bucket 1
+            ?ENTRY(12, 12), % Bucket 2
+            ?ENTRY(9, 9),   % Bucket 2
+            ?ENTRY(8, 8),   % Bucket 3
+            ?ENTRY(1, 1)    % Bucket 3
+        ]}
+    ]}).
+
+
+add_checkpoint_show_not_always_a_drop_test() ->
+    % Depending on the edge conditions of buckets we
+    % may not always drop values when adding new
+    % checkpoints. In this case 12 stays because there's
+    % no longer a value for 10 or 11.
+    %
+    % I've added comments showing the bucket ID based
+    % on the ?ENTRY passed to add_checkpoint
+    History = {[{?SNODE, [
+        ?ENTRY(16, 16), % Bucket 0
+        ?ENTRY(15, 15), % Bucket 1
+        ?ENTRY(14, 14), % Bucket 1
+        ?ENTRY(13, 13), % Bucket 2
+        ?ENTRY(12, 12), % Bucket 2
+        ?ENTRY(9, 9),   % Bucket 3
+        ?ENTRY(8, 8),   % Bucket 3
+        ?ENTRY(1, 1)    % Bucket 4
+    ]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(17, 17), History), {[
+        {?SNODE, [
+            ?ENTRY(17, 17), % Bucket 0
+            ?ENTRY(16, 16), % Bucket 0
+            ?ENTRY(15, 15), % Bucket 1
+            ?ENTRY(14, 14), % Bucket 1
+            ?ENTRY(13, 13), % Bucket 2
+            ?ENTRY(12, 12), % Bucket 2
+            ?ENTRY(9, 9),   % Bucket 3
+            ?ENTRY(8, 8),   % Bucket 3
+            ?ENTRY(1, 1)    % Bucket 4
+        ]}
+    ]}).
+
+
+add_checkpoint_big_jump_show_lots_drop_test() ->
+    % I've added comments showing the bucket ID based
+    % on the ?ENTRY passed to add_checkpoint
+    History = {[{?SNODE, [
+        ?ENTRY(16, 16), % Bucket 4
+        ?ENTRY(15, 15), % Bucket 4
+        ?ENTRY(14, 14), % Bucket 4
+        ?ENTRY(13, 13), % Bucket 4
+        ?ENTRY(12, 12), % Bucket 4
+        ?ENTRY(9, 9),   % Bucket 4
+        ?ENTRY(8, 8),   % Bucket 4
+        ?ENTRY(1, 1)    % Bucket 4
+    ]}]},
+    ?assertEqual(add_checkpoint(?ENTRY(32, 32), History), {[
+        {?SNODE, [
+            ?ENTRY(32, 32), % Bucket 0
+            ?ENTRY(16, 16), % Bucket 4
+            ?ENTRY(1, 1)    % Bucket 4
+        ]}
+    ]}).
+
+
+add_checkpoint_show_filter_history_test() ->
+    History = {[{?SNODE, [
+        ?ENTRY(16, 16),
+        ?ENTRY(15, 15),
+        ?ENTRY(14, 14),
+        ?ENTRY(13, 13),
+        ?ENTRY(12, 12),
+        ?ENTRY(9, 9),
+        ?ENTRY(8, 8),
+        ?ENTRY(1, 1)
+    ]}]},
+    % Drop for both
+    ?assertEqual(add_checkpoint(?ENTRY(10, 10), History), {[
+        {?SNODE, [
+            ?ENTRY(10, 10),
+            ?ENTRY(9, 9),
+            ?ENTRY(8, 8),
+            ?ENTRY(1, 1)
+        ]}
+    ]}),
+    % Drop four source
+    ?assertEqual(add_checkpoint(?ENTRY(10, 200), History), {[
+        {?SNODE, [
+            ?ENTRY(10, 200),
+            ?ENTRY(9, 9),
+            ?ENTRY(8, 8),
+            ?ENTRY(1, 1)
+        ]}
+    ]}),
+    % Drop for target. Obviously a source_seq of 200
+    % will end up droping the 8 entry.
+    ?assertEqual(add_checkpoint(?ENTRY(200, 10), History), {[
+        {?SNODE, [
+            ?ENTRY(200, 10),
+            ?ENTRY(9, 9),
+            ?ENTRY(1, 1)
+        ]}
+    ]}).
+
+
+add_checkpoint_from_other_node_test() ->
+    History = {[{<<"not_the_source">>, [
+        ?ENTRY(12, 13),
+        ?ENTRY(11, 10),
+        ?ENTRY(10, 9),
+        ?ENTRY(2, 3)
+    ]}]},
+    % No filtering
+    ?assertEqual(add_checkpoint(?ENTRY(1, 1), History), {[
+        {?SNODE, [
+            ?ENTRY(1, 1)
+        ]},
+        {<<"not_the_source">>, [
+            ?ENTRY(12, 13),
+            ?ENTRY(11, 10),
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}),
+    % No dropping
+    ?assertEqual(add_checkpoint(?ENTRY(200, 200), History), {[
+        {?SNODE, [
+            ?ENTRY(200, 200)
+        ]},
+        {<<"not_the_source">>, [
+            ?ENTRY(12, 13),
+            ?ENTRY(11, 10),
+            ?ENTRY(10, 9),
+            ?ENTRY(2, 3)
+        ]}
+    ]}).
+
+
+-endif.


[28/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Move rotate_list to mem3_util


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/4b6f001d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/4b6f001d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/4b6f001d

Branch: refs/heads/master
Commit: 4b6f001df3a5724d3d679dd1d8cc4af502a4108a
Parents: e30659b
Author: Robert Newson <ro...@cloudant.com>
Authored: Wed Jul 31 11:14:15 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3.erl      |  8 +-------
 src/mem3_util.erl | 10 +++++++++-
 2 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/4b6f001d/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index af17d5c..02e929f 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -262,16 +262,10 @@ group_by_proximity(Shards, ZoneMap) ->
 
 choose_ushards(DbName, Shards) ->
     Groups0 = group_by_range(Shards),
-    Groups1 = [rotate_list(term_to_binary({DbName, R}), order_shards(G))
+    Groups1 = [mem3_util:rotate_list({DbName, R}, order_shards(G))
                || {R, G} <- Groups0],
     [hd(G) || G <- Groups1].
 
-rotate_list(_Key, []) ->
-    [];
-rotate_list(Key, List) ->
-    {H, T} = lists:split(erlang:crc32(Key) rem length(List), List),
-    T ++ H.
-
 order_shards([#ordered_shard{}|_]=OrderedShards) ->
     lists:keysort(#ordered_shard.order, OrderedShards);
 order_shards(UnorderedShards) ->

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/4b6f001d/src/mem3_util.erl
----------------------------------------------------------------------
diff --git a/src/mem3_util.erl b/src/mem3_util.erl
index 5c8c989..8e17393 100644
--- a/src/mem3_util.erl
+++ b/src/mem3_util.erl
@@ -15,7 +15,7 @@
 -export([hash/1, name_shard/2, create_partition_map/5, build_shards/2,
     n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
     shard_info/1, ensure_exists/1, open_db_doc/1]).
--export([owner/2, is_deleted/1]).
+-export([owner/2, is_deleted/1, rotate_list/2]).
 
 %% do not use outside mem3.
 -export([build_ordered_shards/2, downcast/1]).
@@ -238,6 +238,14 @@ is_deleted(Change) ->
         Else
     end.
 
+rotate_list(_Key, []) ->
+    [];
+rotate_list(Key, List) when not is_binary(Key) ->
+    rotate_list(term_to_binary(Key), List);
+rotate_list(Key, List) ->
+    {H, T} = lists:split(erlang:crc32(Key) rem length(List), List),
+    T ++ H.
+
 downcast(#shard{}=S) ->
     S;
 downcast(#ordered_shard{}=S) ->


[31/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Add a new mem3_rpc module for replication RPCs

This is intended to make the local/remote code execution contexts a lot
more clear.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/fcbc821b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/fcbc821b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/fcbc821b

Branch: refs/heads/master
Commit: fcbc821b4f9cd3fa66124860ebce921a8fe428f0
Parents: ade6ab1
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 11:55:47 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rep.erl | 41 ++++++++++++---------------------
 src/mem3_rpc.erl | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 79 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/fcbc821b/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 9904965..d01eaf3 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -133,7 +133,7 @@ calculate_start_seq(Db, #shard{node=Node, name=Name}, LocalId) ->
     case couch_db:open_doc(Db, LocalId, [ejson_body]) of
     {ok, #doc{body = {SProps}}} ->
         Opts = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
-        try rexi_call(Node, {fabric_rpc, open_doc, [Name, LocalId, Opts]}) of
+        try mem3_rpc:load_checkpoint(Node, Name, LocalId, Opts) of
         #doc{body = {TProps}} ->
             SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
             TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
@@ -185,14 +185,19 @@ find_missing_revs(Acc) ->
         #doc_info{id=Id, revs=RevInfos} = couch_doc:to_doc_info(FDI),
         {Id, [R || #rev_info{rev=R} <- RevInfos]}
     end, Infos),
-    Options = [{io_priority, {internal_repl, Name}}, {user_ctx, ?CTX}],
-    rexi_call(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs, Options]}).
+    mem3_rpc:get_missing_revs(Node, Name, IdsRevs, [
+        {io_priority, {internal_repl, Name}},
+        {user_ctx, ?CTX}
+    ]).
 
 
 save_on_target(Node, Name, Docs) ->
-    Options = [replicated_changes, full_commit, {user_ctx, ?CTX},
-        {io_priority, {internal_repl, Name}}],
-    rexi_call(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+    mem3_rpc:update_docs(Node, Name, Docs, [
+        replicated_changes,
+        full_commit,
+        {user_ctx, ?CTX},
+        {io_priority, {internal_repl, Name}}
+    ]),
     ok.
 
 
@@ -219,26 +224,10 @@ update_locals(Acc) ->
         {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
     ]}},
     {ok, _} = couch_db:update_doc(Db, Doc, []),
-    Options = [{user_ctx, ?CTX}, {io_priority, {internal_repl, Name}}],
-    rexi_call(Node, {fabric_rpc, update_docs, [Name, [Doc], Options]}).
-
-
-rexi_call(Node, MFA) ->
-    Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
-    Ref = rexi:cast(Node, self(), MFA, [sync]),
-    try
-        receive {Ref, {ok, Reply}} ->
-            Reply;
-        {Ref, Error} ->
-            erlang:error(Error);
-        {rexi_DOWN, Mon, _, Reason} ->
-            erlang:error({rexi_DOWN, {Node, Reason}})
-        after 600000 ->
-            erlang:error(timeout)
-        end
-    after
-        rexi_monitor:stop(Mon)
-    end.
+    mem3_rpc:save_checkpoint(Node, Name, Doc, [
+        {user_ctx, ?CTX},
+        {io_priority, {internal_repl, Name}}
+    ]).
 
 
 filter_doc(Filter, FullDocInfo) when is_function(Filter) ->

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/fcbc821b/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
new file mode 100644
index 0000000..d71cc93
--- /dev/null
+++ b/src/mem3_rpc.erl
@@ -0,0 +1,64 @@
+% Copyright 2013 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_rpc).
+
+
+-export([
+    get_missing_revs/4,
+    update_docs/4,
+    load_checkpoint/4,
+    save_checkpoint/4
+]).
+
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(CTX, #user_ctx{roles = [<<"_admin">>]}).
+
+
+get_missing_revs(Node, DbName, IdsRevs, Options) ->
+    rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}).
+
+
+update_docs(Node, DbName, Docs, Options) ->
+    rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
+
+
+load_checkpoint(Node, DbName, DocId, Opts) ->
+    rexi_call(Node, {fabric_rpc, open_doc, [DbName, DocId, Opts]}).
+
+
+save_checkpoint(Node, DbName, Doc, Options) ->
+    rexi_call(Node, {fabric_rpc, update_docs, [DbName, [Doc], Options]}).
+
+
+rexi_call(Node, MFA) ->
+    Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
+    Ref = rexi:cast(Node, self(), MFA, [sync]),
+    try
+        receive {Ref, {ok, Reply}} ->
+            Reply;
+        {Ref, Error} ->
+            erlang:error(Error);
+        {rexi_DOWN, Mon, _, Reason} ->
+            erlang:error({rexi_DOWN, {Node, Reason}})
+        after 600000 ->
+            erlang:error(timeout)
+        end
+    after
+        rexi_monitor:stop(Mon)
+    end.


[19/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Address comments from PR


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/94fc7633
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/94fc7633
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/94fc7633

Branch: refs/heads/master
Commit: 94fc76333f924a0f41aca9063521836cca68b24f
Parents: 6d9983f
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Mon Aug 19 09:40:38 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 16 +++++-----------
 1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/94fc7633/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 28cd32e..08605be 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -27,17 +27,11 @@ rebalance(Shards, TargetNodes) when is_list(Shards) ->
     {OK, MoveThese} = lists:partition(fun(#shard{node=Node}) ->
         lists:member(Node, TargetNodes)
     end, Shards),
-    ShardsByTargetNode0 = lists:foldl(fun(Shard, Acc) ->
+    % ensure every target node is present in the orddict
+    ShardsByTargetNode0 = orddict:from_list([{N,[]} || N <- TargetNodes]),
+    ShardsByTargetNode = lists:foldl(fun(Shard, Acc) ->
         orddict:append(Shard#shard.node, Shard, Acc)
-    end, orddict:new(), OK),
-    ShardsByTargetNode = lists:sort(lists:foldl(fun(Node, Acc) ->
-        case orddict:is_key(Node, ShardsByTargetNode0) of
-            true ->
-                Acc;
-            false ->
-                [{Node, []} | Acc]
-        end
-    end, ShardsByTargetNode0, TargetNodes)),
+    end, ShardsByTargetNode0, OK),
     Moves = find_replacements(MoveThese, ShardsByTargetNode, []),
     Moved = [Shard#shard{node = Node} || {Shard, Node} <- Moves],
     TargetLevel = length(Shards) div length(TargetNodes),
@@ -107,7 +101,7 @@ largest_first({_, A}, {_, B}) ->
     length(A) >= length(B).
 
 smallest_first({_, A}, {_, B}) ->
-    length(A) < length(B).
+    length(A) =< length(B).
 
 replace(A, B, List) ->
     replace(A, B, List, []).


[45/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Allow mem3_shards:local to take a list or binary

BugzId: 29571


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/7044f6c5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/7044f6c5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/7044f6c5

Branch: refs/heads/master
Commit: 7044f6c543ac6bbdbe0f4bb3f9b7a5d2ec0f206a
Parents: 8665201
Author: Russell Branca <ch...@gmail.com>
Authored: Tue Apr 29 15:56:59 2014 -0700
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:51:52 2014 +0100

----------------------------------------------------------------------
 src/mem3_shards.erl | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/7044f6c5/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index 397fb74..4557e1c 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -107,6 +107,8 @@ get(DbName, Node, Range) ->
         [_|_] -> {error, duplicates}
     end.
 
+local(DbName) when is_list(DbName) ->
+    local(list_to_binary(DbName));
 local(DbName) ->
     Pred = fun(#shard{node=Node}) when Node == node() -> true; (_) -> false end,
     lists:filter(Pred, for_db(DbName)).


[24/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Refuse to place shards on decom:true nodes

BugzID: 24420


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/45b040b4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/45b040b4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/45b040b4

Branch: refs/heads/master
Commit: 45b040b4c6bb554a4ee496735ec6701c6b5f7c42
Parents: d7a4f26
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 23 10:08:51 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3.erl | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/45b040b4/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index 864cddb..cc76454 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -157,7 +157,7 @@ choose_shards(DbName, Options) when is_list(DbName) ->
 choose_shards(DbName, Options) ->
     try shards(DbName)
     catch error:E when E==database_does_not_exist; E==badarg ->
-        Nodes = mem3:nodes(),
+        Nodes = allowed_nodes(),
         case get_placement(Options) of
             undefined ->
                 choose_shards(DbName, Nodes, Options);
@@ -237,6 +237,9 @@ range(<<"shards/", Start:8/binary, "-", End:8/binary, "/", _/binary>>) ->
     [httpd_util:hexlist_to_integer(binary_to_list(Start)),
      httpd_util:hexlist_to_integer(binary_to_list(End))].
 
+allowed_nodes() ->
+    [Node || Node <- mem3:nodes(), mem3:node_info(Node, <<"decom">>) =/= true].
+
 nodes_in_zone(Nodes, Zone) ->
     [Node || Node <- Nodes, Zone == mem3:node_info(Node, <<"zone">>)].
 


[07/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Fix load_shards_from_disk/2

load_shards_from_disk/2 did not expect #ordered_shards to be returned
from load_shards_from_disk/1. Since it uses a list comprehension the
mistake is silently squashed, resulting in an empty list.

In production this manifests are the occasional failure, where 'n' is
calculated as 0, causing quorum reads to fail. The very next call
succeeds as it reads the cached versions and correctly downcasts.

BugzID: 20629


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/c9292bb3
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/c9292bb3
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/c9292bb3

Branch: refs/heads/master
Commit: c9292bb3a204bd14ada2f28d5f7b3d7d96c88152
Parents: 1d50774
Author: Robert Newson <ro...@cloudant.com>
Authored: Thu Jun 27 19:19:24 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3_shards.erl | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/c9292bb3/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index 2d21db2..df8cbac 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -278,7 +278,11 @@ load_shards_from_db(#db{} = ShardDb, DbName) ->
 load_shards_from_disk(DbName, DocId)->
     Shards = load_shards_from_disk(DbName),
     HashKey = mem3_util:hash(DocId),
-    [S || #shard{range = [B,E]} = S <- Shards, B =< HashKey, HashKey =< E].
+    [S || S <- Shards, in_range(S, HashKey)].
+
+in_range(Shard, HashKey) ->
+    [B, E] = mem3:range(Shard),
+    B =< HashKey andalso HashKey =< E.
 
 create_if_missing(Name) ->
     DbDir = config:get("couchdb", "database_dir"),


[17/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Rewrite rebalancing plan generator

This patch splits the functionality of the module out into three
classes or work:

* Fixing zoning and replica level violations
* Contracting a cluster
* Rebalancing shards across a cluster

The implementations of the first two features are pretty similar - find
the shards that need to be moved, then choose an optimal home for each
of them.  By default the contraction code will remove shards from nodes
in the "decom" zone, and the rebalancing code will ignore that zone
entirrely. An optimal home is a node that

a) is in the correct zone, and
b) has the fewest # of shards for the DB among nodes in the zone, and
c) has the fewest total # of shards among nodes satisfying a) and b)

The implementation of rebalancing is a bit more complicated.  The
rebalancing algorithm looks roughly like this

For DB in all_dbs:
    Ensure all nodes have at least (N*Q) div length(Nodes) shards
    Ensure no node has more than (N*Q) div length(Nodes) + 1 shards
For node in nodes:
    If node has more than TotalShards div length(Nodes) + 1 shards:
        Donate shard to another node

The net result is that each database is balanced across the cluster and
the cluster as a whole is globally balanced.

The current version of the module prints out shard move and copy
operations in a clou-friendly format via io:format.  It also returns a
list of {Op, #shard{}, node()} tuples representing the operations.

The rebalancer will stop after generating 1000 operations by default.
The limit can be customized by using the 1-arity versions of expand,
contract and fix_zoning, but note that the performance of the rebalancer
degrades as the number of pending operations increases.

BugzID: 23690
BugzID: 20770


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/aef58662
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/aef58662
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/aef58662

Branch: refs/heads/master
Commit: aef58662a13dae7f9c8a6aec99beb32de3dd1f50
Parents: 2f62e2e
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Sep 25 14:25:44 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3.erl           |   1 +
 src/mem3_rebalance.erl | 463 ++++++++++++++++++++++++++++++++++++++------
 2 files changed, 404 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/aef58662/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index 8bbbbae..864cddb 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -20,6 +20,7 @@
 -export([quorum/1, group_by_proximity/1]).
 -export([live_shards/2]).
 -export([belongs/2]).
+-export([get_placement/1]).
 
 %% For mem3 use only.
 -export([name/1, node/1, range/1]).

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/aef58662/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 123bf47..4d8d069 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -14,46 +14,234 @@
 
 -module(mem3_rebalance).
 
--export([rebalance/1, rebalance/2]).
+-export([
+    contract/0,
+    contract/1,
+    contract/3,
+    expand/0,
+    expand/1,
+    expand/3,
+    fix_zoning/0,
+    fix_zoning/1,
+    fix_zoning/2,
+    print/1
+]).
+
+% Exposed for debugging purposes
+-export([
+    shard_count_by_node/1,
+    shard_count_view/0
+]).
+
 -include("mem3.hrl").
 
-rebalance(DbName) ->
-    rebalance(DbName, mem3:nodes()).
+%% @equiv expand(1000)
+-spec expand() -> [{atom(), #shard{}, node()}].
+expand() ->
+    expand(1000).
+
+%% @doc Expands all databases in the cluster, stopping at Limit operations.
+-spec expand(integer()) -> [{atom(), #shard{}, node()}].
+expand(Limit) when is_integer(Limit), Limit > 0 ->
+    TargetNodes = allowed_nodes(fun(Zone) -> Zone =/= <<"decom">> end),
+    LocalBalanceFun = fun(Db, Moves) -> expand(Db, TargetNodes, Moves) end,
+    LocalBalanceOps = apply_to_cluster(LocalBalanceFun, Limit),
+    % Now apply additional operations as needed to achieve global balance.
+    global_expand(TargetNodes, LocalBalanceOps, Limit);
+
+expand(DbName) when is_binary(DbName); is_list(DbName) ->
+    TargetNodes = allowed_nodes(fun(Zone) -> Zone =/= <<"decom">> end),
+    expand(DbName, TargetNodes, []).
+
+%% @doc Computes a plan to balance the shards across the target nodes.
+-spec expand(DbName::iolist(), [node()], [{atom(), #shard{}, node()}]) ->
+        [{atom(), #shard{}, node()}].
+expand(DbName, Nodes, PrevMoves) ->
+    Shards = mem3:shards(DbName),
+    Floor = length(Shards) div length(Nodes),
+    % Ensure every target node reaches the floor
+    {NewShards, Moves0} = rebalance2(Floor, Shards, Nodes, Nodes, PrevMoves),
+    % Now look for any nodes with more than floor+1 shards
+    {_, Moves} = rebalance2(Floor+1, NewShards, Nodes, Nodes, Moves0),
+    Moves.
+
+%% @equiv contract(1000)
+-spec contract() -> [{atom(), #shard{}, node()}].
+contract() ->
+    contract(1000).
+
+%% @doc Computes a plan to remove up to Limit shards from nodes in "decom" zone.
+-spec contract(integer()) -> [{atom(), #shard{}, node()}].
+contract(Limit) when is_integer(Limit), Limit > 0 ->
+    TargetNodes = allowed_nodes(fun(Zone) -> Zone =/= <<"decom">> end),
+    apply_to_cluster(fun(Db, Moves) -> contract(Db, TargetNodes, Moves) end, Limit);
 
-rebalance(DbName, TargetNodes) when is_binary(DbName) ->
-    rebalance(mem3:shards(DbName), TargetNodes);
-rebalance(Shards, TargetNodes) when is_list(Shards) ->
-    % First migrate all shards off of non-target nodes
+contract(DbName) when is_binary(DbName); is_list(DbName) ->
+    TargetNodes = allowed_nodes(fun(Zone) -> Zone =/= <<"decom">> end),
+    contract(DbName, TargetNodes, []).
+
+%% @doc Computes a plan to consolidate shards from a single database onto the
+%%      supplied set of nodes.
+-spec contract(DbName::iolist(), [node()], [{atom(), #shard{}, node()}]) ->
+        [{atom(), #shard{}, node()}].
+contract(DbName, TargetNodes, PrevMoves) ->
     {OK, MoveThese} = lists:partition(fun(#shard{node=Node}) ->
         lists:member(Node, TargetNodes)
-    end, Shards),
-    % Ensure every target node is present in the orddict
-    ShardsByTargetNode0 = orddict:from_list([{N,[]} || N <- TargetNodes]),
-    ShardsByTargetNode = lists:foldl(fun(Shard, Acc) ->
-        orddict:append(Shard#shard.node, Shard, Acc)
-    end, ShardsByTargetNode0, OK),
-    Moves = find_replacements(MoveThese, ShardsByTargetNode, []),
-    Moved = [Shard#shard{node = Node} || {Shard, Node} <- Moves],
-    TargetLevel = length(Shards) div length(TargetNodes),
-    rebalance2(TargetLevel, OK ++ Moved, TargetNodes, TargetNodes, Moves).
-
-find_replacements([], _ShardsByTargetNode, Result) ->
-    Result;
-find_replacements([Shard | Rest], ShardsByNode, Acc) ->
-    Zone = mem3:node_info(Shard#shard.node, <<"zone">>),
-    % Find a node in the same zone
-    InZone = [{Node, Shards} || {Node, Shards} <- ShardsByNode,
-        mem3:node_info(Node, <<"zone">>) =:= Zone],
-    % Prefer a node with the fewest number of shards
-    if InZone =:= [] ->
-        erlang:error({empty_zone, Zone, Shard});
-    true ->
-        ok
+    end, mem3:shards(DbName)),
+    find_homes(MoveThese, shards_by_node(OK, TargetNodes), PrevMoves).
+
+%% @equiv fix_zoning(1000)
+-spec fix_zoning() -> [{atom(), #shard{}, node()}].
+fix_zoning() ->
+    fix_zoning(1000).
+
+%% @doc Computes a plan containg up to Limit operations to repair replica
+%%      levels and improper zoning.
+-spec fix_zoning(integer()) -> [{atom(), #shard{}, node()}].
+fix_zoning(Limit) when is_integer(Limit), Limit > 0 ->
+    apply_to_cluster(fun fix_zoning/2, Limit);
+
+fix_zoning(DbName) when is_binary(DbName); is_list(DbName) ->
+    fix_zoning(DbName, []).
+
+%% @doc Computes a plan to repair replica levels and improper zoning for a
+%%      single database.
+-spec fix_zoning(DbName::iolist(), [{atom(), #shard{}, node()}]) ->
+        [{atom(), #shard{}, node()}].
+fix_zoning(DbName, PrevMoves) ->
+    IdealZoning = orddict:from_list(mem3:get_placement([])),
+    ByRange = shards_by_range(mem3:shards(DbName)),
+    orddict:fold(fun(_Range, Shards, Acc) ->
+        compute_moves(IdealZoning, computed_zoning(Shards), Shards, Acc)
+    end, PrevMoves, ByRange).
+
+%% Internal functions.
+
+global_expand(TargetNodes0, LocalOps, Limit) ->
+    TargetNodes = [couch_util:to_binary(Node) || Node <- TargetNodes0],
+    CountByNode = lists:filter(fun({Node, _Count}) ->
+        lists:member(Node, TargetNodes)
+    end, shard_count_by_node(LocalOps)),
+    TotalCount = lists:foldl(fun({_, C}, Sum) -> Sum + C end, 0, CountByNode),
+    TargetLevel = (TotalCount div length(TargetNodes)) + 1,
+    FoldFun = fun
+        (_, Acc) when length(Acc) >= Limit ->
+            % We've already accumulated the max number of shard ops.
+            Acc;
+        ({_Node, Count}, Acc) when Count =< TargetLevel ->
+            % This node is not a donor.
+            Acc;
+        ({Node0, Count}, Acc) ->
+            Node = list_to_existing_atom(binary_to_list(Node0)),
+            % Compute the max number of shards to donate.
+            DC0 = erlang:max(Count - TargetLevel, Limit - length(Acc)),
+            InternalAcc0 = {Node, TargetNodes, Acc, DC0},
+            try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of
+                {_, _, Moves, _} ->
+                    Moves
+            catch
+                {complete, Moves} ->
+                    Moves
+            end
     end,
-    [{TargetNode, _} | _] = lists:sort(fun smallest_first/2, InZone),
-    TargetShard = Shard#shard{node = TargetNode},
-    find_replacements(Rest, orddict:append(TargetNode, TargetShard, ShardsByNode),
-        [{Shard, TargetNode} | Acc]).
+    lists:foldl(FoldFun, LocalOps, CountByNode).
+
+donate_fold(_Shard, {_, _, Moves, 0}) ->
+    throw({complete, Moves});
+donate_fold(#shard{node = Node} = Shard, {Node, Nodes, Moves, DC}) ->
+    Zone = mem3:node_info(Node, <<"zone">>),
+    Shards = apply_shard_moves(mem3:shards(Shard#shard.dbname), Moves),
+    InZone = filter_map_by_zone(shards_by_node(Shards, Nodes), Zone),
+    SortedByCount = lists:sort(smallest_first(Moves), InZone),
+    Candidates = lists:dropwhile(fun({_Node, OwnShards}) ->
+        lists:keymember(Shard#shard.range, #shard.range, OwnShards)
+    end, SortedByCount),
+    case {lists:member(Shard, Shards), Candidates} of
+        {false, _} ->
+            {Node, Nodes, Moves, DC};
+        {true, []} ->
+            {Node, Nodes, Moves, DC};
+        {true, [{Node, _} | _]} ->
+            {Node, Nodes, Moves, DC};
+        {true, [{Target, _} | _]} ->
+            % Execute the move only if the target has fewer shards for this DB
+            % than the source. Otherwise we'd generate a local imbalance.
+            SourceCount = get_shard_count(Node, SortedByCount),
+            TargetCount = get_shard_count(Target, SortedByCount),
+            if TargetCount < SourceCount ->
+                print({move, Shard, Target}),
+                {Node, Nodes, [{move, Shard, Target} | Moves], DC - 1};
+            true ->
+                {Node, Nodes, Moves, DC}
+            end
+    end;
+donate_fold(_Shard, Acc) ->
+    Acc.
+
+get_shard_count(AtomKey, ShardsByNode) when is_atom(AtomKey) ->
+    length(couch_util:get_value(AtomKey, ShardsByNode, [])).
+
+compute_moves(IdealZoning, IdealZoning, _Copies, OtherMoves) ->
+    OtherMoves;
+compute_moves(IdealZoning, ActualZoning, Copies, OtherMoves) ->
+    {Donor, Recipient} = find_donor_and_recipient(IdealZoning, ActualZoning),
+    pair_up(Donor, Recipient, Copies, OtherMoves).
+
+find_donor_and_recipient(IdealZoning, ActualZoning) ->
+    lists:foldl(fun({Zone, IdealCopies}, {D,R}) ->
+        case couch_util:get_value(Zone, ActualZoning, 0) of
+            Actual when Actual < IdealCopies ->
+                {D, Zone};
+            Actual when Actual > IdealCopies ->
+                {Zone, R};
+            _ ->
+                {D, R}
+        end
+    end, {nil, nil}, IdealZoning).
+
+pair_up(_, nil, _Copies, Moves) ->
+    Moves;
+pair_up(nil, Recipient, Copies, Moves) ->
+    % We've got an insufficient replica level -- a recipient but no donor
+    Candidate = hd(Copies),
+    TargetNode = choose_node_in_target_zone(Candidate, Recipient, Moves),
+    print({copy, Candidate, TargetNode}),
+    [{copy, Candidate, TargetNode}|Moves];
+pair_up(Donor, Recipient, Copies, Moves) ->
+    Candidate = hd(lists:filter(fun(#shard{node = Node}) ->
+        mem3:node_info(Node, <<"zone">>) =:= Donor
+    end, Copies)),
+    TargetNode = choose_node_in_target_zone(Candidate, Recipient, Moves),
+    print({move, Candidate, TargetNode}),
+    [{move, Candidate, TargetNode}|Moves].
+
+choose_node_in_target_zone(#shard{dbname = DbName} = Candidate, Take, Moves) ->
+    TargetNodes = allowed_nodes(fun(Zone) -> Zone =:= Take end),
+    CurrentShards = apply_shard_moves(mem3:shards(DbName), Moves),
+    ByTargetNode = shards_by_node(CurrentShards, TargetNodes),
+    InZone = filter_map_by_zone(ByTargetNode, Take),
+    {TargetNode, _} = find_home(Candidate, InZone, Moves),
+    TargetNode.
+
+-spec find_homes([#shard{}], [{node(), [#shard{}]}], [{atom(), #shard{}, node()}]) ->
+        [{atom(), #shard{}, node()}].
+find_homes([], _ShardsByTargetNode, Result) ->
+    Result;
+find_homes([#shard{node = Node0} = Shard | Rest], ShardsByNode, PrevMoves) ->
+    InZone = filter_map_by_zone(ShardsByNode, mem3:node_info(Node0, <<"zone">>)),
+    {TargetNode, NewMap} = find_home(Shard, InZone, PrevMoves),
+    print({move, Shard, TargetNode}),
+    MergedMap = orddict:merge(fun(_, V1, _) -> V1 end, NewMap, ShardsByNode),
+    find_homes(Rest, MergedMap, [{move, Shard, TargetNode} | PrevMoves]).
+
+find_home(Shard, ShardsByNode, PrevMoves) ->
+    SortedByCount = lists:sort(smallest_first(PrevMoves), ShardsByNode),
+    % Ensure that the target node is not already an owner of this range
+    [{TargetNode, _} | _] = lists:dropwhile(fun({_Node, Shards}) ->
+        lists:keymember(Shard#shard.range, #shard.range, Shards)
+    end, SortedByCount),
+    NewMap = orddict:append(TargetNode, Shard#shard{node=TargetNode}, ShardsByNode),
+    {TargetNode, NewMap}.
 
 rebalance2(_TargetLevel, Shards, _Nodes, [], Moves) ->
     {Shards, Moves};
@@ -62,11 +250,12 @@ rebalance2(TargetLevel, Shards, Nodes, [Node | Rest], Moves) ->
     CurrentLevel = length(ShardsForNode),
     case CurrentLevel < TargetLevel of
         true ->
-            case victim(TargetLevel, Shards, Nodes, Node) of
+            case victim(TargetLevel, Shards, Nodes, Node, Moves) of
                 {ok, Victim} ->
+                    print({move, Victim, Node}),
                     rebalance2(TargetLevel,
                              replace(Victim, Victim#shard{node=Node}, Shards),
-                             Nodes, [Node|Rest], [{Victim, Node}|Moves]);
+                             Nodes, [Node|Rest], [{move, Victim, Node}|Moves]);
                 false ->
                     rebalance2(TargetLevel, Shards, Nodes, Rest, Moves)
             end;
@@ -74,34 +263,50 @@ rebalance2(TargetLevel, Shards, Nodes, [Node | Rest], Moves) ->
             rebalance2(TargetLevel, Shards, Nodes, Rest, Moves)
     end.
 
-victim(TargetLevel, Shards, Nodes, TargetNode) ->
+victim(TargetLevel, Shards, Nodes, TargetNode, Moves) ->
+    % Build a map of shards owned by nodes in the target zone.
     TargetZone = mem3:node_info(TargetNode, <<"zone">>),
-    CandidateNodes = lists:usort([Node || Node <- mem3:nodes(),
-                                     Node =/= TargetNode,
-                                     mem3:node_info(Node, <<"zone">>) =:= TargetZone]),
-    %% make {Node, ShardsInNode} list
-    GroupedByNode0 = [{Node, [S || S <- Shards, S#shard.node =:= Node]} || Node <- CandidateNodes],
-    %% don't take from a balancing node below target level
-    GroupedByNode1 = [{N, SS} || {N, SS} <- GroupedByNode0,
-        (length(SS) > TargetLevel) orelse (not lists:member(N, Nodes))],
-    %% prefer to take from a node with more shards than others
-    GroupedByNode2 = lists:sort(fun largest_first/2, GroupedByNode1),
-    %% don't take a shard for a range the target already has
-    TargetRanges = lists:usort([S#shard.range || S <- Shards, S#shard.node =:= TargetNode]),
-    GroupedByNode3 = [{N, lists:filter(fun(S) -> not lists:member(S#shard.range, TargetRanges) end, SS)}
-                      || {N, SS} <- GroupedByNode2],
-    %% remove nodes with no candidates shards
-    GroupedByNode4 = [{N, SS} || {N, SS} <- GroupedByNode3, SS =/= []],
-    case GroupedByNode4 of
-        [{_, [Victim|_]} | _] -> {ok, Victim};
-        [] -> false
+    ShardsByNode0 = filter_map_by_zone(shards_by_node(Shards, Nodes), TargetZone),
+    % Filter nodes that would drop below target level (including TargetNode).
+    ShardsByNode1 = [{N, SS} || {N, SS} <- ShardsByNode0, length(SS) > TargetLevel],
+    % Prefer to take from a node with more shards than others.
+    ShardsByNode2 = lists:sort(largest_first(Moves), ShardsByNode1),
+    % Don't take a shard for a range already hosted by the target.
+    TargetRanges = [S#shard.range || S <- Shards, S#shard.node =:= TargetNode],
+    ShardsByNode3 = lists:map(fun({N, SS}) ->
+        {N, [S || S <- SS, not lists:member(S#shard.range, TargetRanges)]}
+    end, ShardsByNode2),
+    % Find the first node that still owns a candidate shard.
+    case lists:dropwhile(fun({_, SS}) -> SS =:= [] end, ShardsByNode3) of
+        [] ->
+            false;
+        [{_SourceNode, [Victim | _OtherShards]} | _] ->
+            {ok, Victim}
     end.
 
-largest_first({_, A}, {_, B}) ->
-    length(A) >= length(B).
+largest_first(PrevMoves) ->
+    % use the global shard count on each node to break the tie
+    Global = shard_count_by_node(PrevMoves),
+    fun(A, B) -> sort_by_count(A, B, Global) >= 0 end.
+
+smallest_first(PrevMoves) ->
+    % use the global shard count on each node to break the tie
+    Global = shard_count_by_node(PrevMoves),
+    fun(A, B) -> sort_by_count(A, B, Global) =< 0 end.
 
-smallest_first({_, A}, {_, B}) ->
-    length(A) =< length(B).
+sort_by_count({NodeA, SA}, {NodeB, SB}, Global) when length(SA) =:= length(SB) ->
+    CountA = couch_util:get_value(couch_util:to_binary(NodeA), Global, 0),
+    CountB = couch_util:get_value(couch_util:to_binary(NodeB), Global, 0),
+    cmp(CountA, CountB);
+sort_by_count({_, A}, {_, B}, _) ->
+    cmp(length(A), length(B)).
+
+cmp(A, B) when A < B ->
+    -1;
+cmp(A, B) when A > B ->
+    1;
+cmp(_, _) ->
+    0.
 
 replace(A, B, List) ->
     replace(A, B, List, []).
@@ -112,3 +317,141 @@ replace(A, B, [A | Rest], Acc) ->
     replace(A, B, Rest, [B | Acc]);
 replace(A, B, [C | Rest], Acc) ->
     replace(A, B, Rest, [C | Acc]).
+
+%% @doc Takes a list of copy/move operations and applies them to the current
+%%      set of shards.  Any moves that reference a shard not in the current set
+%%      will be ignored.
+apply_shard_moves(Shards, []) ->
+    Shards;
+apply_shard_moves(Shards, [{move, Shard, Node}| Rest]) ->
+    NewShards = replace(Shard, Shard#shard{node = Node}, Shards, []),
+    apply_shard_moves(NewShards, Rest);
+apply_shard_moves(Shards, [{copy, Shard, Node}| Rest]) ->
+    case lists:member(Shard, Shards) of
+        true ->
+            apply_shard_moves([Shard#shard{node = Node} | Shards], Rest);
+        false ->
+            apply_shard_moves(Shards, Rest)
+    end.
+
+allowed_nodes(Fun) ->
+    lists:filter(fun(Node) ->
+        Fun(mem3:node_info(Node, <<"zone">>))
+    end, mem3:nodes()).
+
+shards_by_node(Shards, Nodes) ->
+    % Ensure every target node is present in the orddict
+    ShardsByNode0 = orddict:from_list([{N,[]} || N <- Nodes]),
+    lists:foldl(fun(#shard{node = Node} = Shard, Acc) ->
+        orddict:append(Node, Shard, Acc)
+    end, ShardsByNode0, Shards).
+
+filter_map_by_zone(ShardsByNode, Zone) ->
+    Result = orddict:filter(fun(Node, _Shards) ->
+        mem3:node_info(Node, <<"zone">>) =:= Zone
+    end, ShardsByNode),
+    if Result =:= [] ->
+        erlang:error({empty_zone, Zone});
+    true ->
+        Result
+    end.
+
+shards_by_range(Shards) ->
+    lists:foldl(fun(#shard{range = Range} = Shard, OD) ->
+        orddict:append(Range, Shard, OD)
+    end, orddict:new(), Shards).
+
+computed_zoning(Shards) ->
+    lists:foldl(fun(#shard{node = Node}, OD) ->
+        orddict:update_counter(mem3:node_info(Node, <<"zone">>), 1, OD)
+    end, orddict:new(), Shards).
+
+shard_count_by_node(PrevMoves) ->
+    Map0 = case erlang:get(shard_count_by_node) of
+        undefined ->
+            try shard_count_view() catch _:_ -> [] end;
+        {T0, Map} ->
+            case timer:now_diff(os:timestamp(), T0) div 1000 of
+                Delta when Delta < 5000 ->
+                    Map;
+                _Else ->
+                    try shard_count_view() catch _:_ -> [] end
+            end
+    end,
+    % Incorporate the operations we've already scheduled into the total counts
+    lists:foldl(fun
+        ({copy, _, TargetNode}, OD0) ->
+            orddict:update_counter(couch_util:to_binary(TargetNode), 1, OD0);
+        ({move, #shard{node = SourceNode}, TargetNode}, OD0) ->
+            OD1 = orddict:update_counter(couch_util:to_binary(SourceNode), -1, OD0),
+            orddict:update_counter(couch_util:to_binary(TargetNode), 1, OD1)
+    end, orddict:from_list(Map0), PrevMoves).
+
+shard_count_view() ->
+    %% TODO rewrite CouchDB's internal view API.  Wow!
+    {ok, Db} = couch_db:open(<<"dbs">>, []),
+    {ok, DDoc} = couch_db:open_doc(Db, <<"_design/rebalance">>, []),
+    Group0 = couch_view_group:design_doc_to_view_group(DDoc),
+    {ok, Pid} = gen_server:call(couch_view, {get_group_server, <<"dbs">>, Group0}),
+    {ok, Group} = couch_view_group:request_group(Pid, 0),
+    Lang = couch_view_group:get_language(Group),
+    Views = couch_view_group:get_views(Group),
+    Ref = erlang:monitor(process, couch_view_group:get_fd(Group)),
+    {IRed, View} = fabric_view:extract_view(Pid, <<"count_by_node">>, Views, reduce),
+    ReduceView = {reduce, IRed, Lang, View},
+    Options = [{key_group_level, exact}],
+    Fold = fun(Node, Count, Acc) -> {ok, [{Node, Count} | Acc]} end,
+    %% Workaround for problems where we hold onto bad collators in the shell
+    erlang:erase(couch_drv_port),
+    {ok, Map} = couch_view:fold_reduce(ReduceView, Fold, [], Options),
+    erlang:put(shard_count_by_node, {os:timestamp(), Map}),
+    erlang:demonitor(Ref),
+    Map.
+
+print({Op, Shard, TargetNode} = Operation) ->
+    {match, [SourceId, Cluster]} = re:run(
+        atom_to_list(Shard#shard.node),
+        "dbcore@db(?<node>[0-9]+)\.(?<cluster>[a-z0-9]+)\.cloudant.net",
+        [{capture, all_but_first, binary}]
+    ),
+    {match, [TargetId, Cluster]} = re:run(
+        atom_to_list(TargetNode),
+        "dbcore@db(?<node>[0-9]+)\.(?<cluster>[a-z0-9]+)\.cloudant.net",
+        [{capture, all_but_first, binary}]
+    ),
+    {match, [Range, Account, DbName]} = re:run(
+        Shard#shard.name,
+        "shards/(?<range>[0-9a-f\-]+)/(?<account>.+)/(?<dbname>[a-z][a-z0-9\\_\\$()\\+\\-\\/]+)\.[0-9]{8}",
+        [{capture, all_but_first, binary}]
+    ),
+    OpName = case Op of move -> move2; _ -> Op end,
+    io:format("clou shard ~s ~s ~s ~s ~s ~s ~s~n", [OpName, Cluster, Account, DbName,
+         Range, SourceId, TargetId]),
+    Operation;
+
+print(Operations) when is_list(Operations) ->
+    [print(Operation) || Operation <- Operations].
+
+apply_to_cluster(UserFun, Limit) ->
+    try mem3_shards:fold(cluster_fold_fun(UserFun, Limit), {nil, []}) of
+        {_LastDb, Moves} ->
+            Moves
+    catch
+        {complete, Moves} ->
+            Moves
+    end.
+
+cluster_fold_fun(UserFun, Limit) ->
+    fun
+        (#shard{dbname = DbName}, {DbName, PrevMoves}) ->
+            {DbName, PrevMoves};
+        (#shard{dbname = DbName}, {_PrevName, PrevMoves}) ->
+            Moves = UserFun(DbName, PrevMoves),
+            check_limit(Moves, Limit),
+            {DbName, Moves}
+    end.
+
+check_limit(Moves, Limit) when length(Moves) >= Limit ->
+    throw({complete, Moves});
+check_limit(_, _) ->
+    ok.


[22/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Ensure that the owner of a doc is also a host

BugzID: 24395


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/19c3ee2c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/19c3ee2c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/19c3ee2c

Branch: refs/heads/master
Commit: 19c3ee2cd9ea4941505db7f54fbf257e740586ae
Parents: aef5866
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Tue Oct 22 15:29:00 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3_util.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/19c3ee2c/src/mem3_util.erl
----------------------------------------------------------------------
diff --git a/src/mem3_util.erl b/src/mem3_util.erl
index 0927493..be7302d 100644
--- a/src/mem3_util.erl
+++ b/src/mem3_util.erl
@@ -224,7 +224,7 @@ ensure_exists(DbName) ->
 
 
 owner(DbName, DocId) ->
-    Nodes = lists:sort([node()|nodes()]),
+    Nodes = lists:sort([N || #shard{node=N} <- mem3:shards(DbName, DocId)]),
     node() =:= hd(rotate_list({DbName, DocId}, Nodes)).
 
 is_deleted(Change) ->


[38/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Remove old code_change, set module version to 1


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/88a6491b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/88a6491b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/88a6491b

Branch: refs/heads/master
Commit: 88a6491bba5c7a12f271e3feed6ff8f14b3a840f
Parents: 781dc89
Author: Robert Newson <ro...@cloudant.com>
Authored: Fri Nov 22 16:50:15 2013 +0000
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_nodes.erl      | 7 ++-----
 src/mem3_shards.erl     | 5 +++--
 src/mem3_sync.erl       | 1 +
 src/mem3_sync_event.erl | 1 +
 src/mem3_sync_nodes.erl | 1 +
 5 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/88a6491b/src/mem3_nodes.erl
----------------------------------------------------------------------
diff --git a/src/mem3_nodes.erl b/src/mem3_nodes.erl
index 7e97438..a8c1448 100644
--- a/src/mem3_nodes.erl
+++ b/src/mem3_nodes.erl
@@ -12,6 +12,7 @@
 
 -module(mem3_nodes).
 -behaviour(gen_server).
+-vsn(1).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
     code_change/3]).
 
@@ -83,11 +84,7 @@ handle_info(_Info, State) ->
 terminate(_Reason, _State) ->
     ok.
 
-code_change(_OldVsn, {state, ChangesPid, UpdateSeq, _}, _Extra) ->
-    ets:new(?MODULE, [named_table, {read_concurrency, true}]),
-    initialize_nodelist(),
-    {ok, #state{changes_pid = ChangesPid, update_seq = UpdateSeq}};
-code_change(_OldVsn, State, _Extra) ->
+code_change(_OldVsn, #state{}=State, _Extra) ->
     {ok, State}.
 
 %% internal functions

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/88a6491b/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index df8cbac..397fb74 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -12,6 +12,7 @@
 
 -module(mem3_shards).
 -behaviour(gen_server).
+-vsn(1).
 -behaviour(config_listener).
 
 -export([init/1, terminate/2, code_change/3]).
@@ -192,8 +193,8 @@ terminate(_Reason, #st{changes_pid=Pid}) ->
     exit(Pid, kill),
     ok.
 
-code_change(_OldVsn, St, _Extra) ->
-    {ok, cache_clear(St)}.
+code_change(_OldVsn, #st{}=St, _Extra) ->
+    {ok, St}.
 
 %% internal functions
 

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/88a6491b/src/mem3_sync.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync.erl b/src/mem3_sync.erl
index 3d975fc..1e5b505 100644
--- a/src/mem3_sync.erl
+++ b/src/mem3_sync.erl
@@ -12,6 +12,7 @@
 
 -module(mem3_sync).
 -behaviour(gen_server).
+-vsn(1).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
     code_change/3]).
 

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/88a6491b/src/mem3_sync_event.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync_event.erl b/src/mem3_sync_event.erl
index 2cc86d9..7bca230 100644
--- a/src/mem3_sync_event.erl
+++ b/src/mem3_sync_event.erl
@@ -12,6 +12,7 @@
 
 -module(mem3_sync_event).
 -behaviour(gen_event).
+-vsn(1).
 
 -export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
     code_change/3]).

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/88a6491b/src/mem3_sync_nodes.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync_nodes.erl b/src/mem3_sync_nodes.erl
index e07fd44..0a4bffc 100644
--- a/src/mem3_sync_nodes.erl
+++ b/src/mem3_sync_nodes.erl
@@ -12,6 +12,7 @@
 
 -module(mem3_sync_nodes).
 -behaviour(gen_server).
+-vsn(1).
 
 
 -export([start_link/0]).


[42/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Refactor mem3_rpc:add_checkpoint/2

This is based on Adam Kocoloski's original add_checkpoint/2 but uses a
body recursive function to avoid the final reverse/filter steps.

BugzId: 21973


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/e64dd028
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/e64dd028
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/e64dd028

Branch: refs/heads/master
Commit: e64dd0281f1d9b9b22511b3625c1fb1f97a42042
Parents: e147621
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Dec 9 14:04:39 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:51:10 2014 +0100

----------------------------------------------------------------------
 src/mem3_rpc.erl | 133 +++++++++++++++++++++++++++-----------------------
 1 file changed, 71 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/e64dd028/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index 8d8c832..10294a7 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -125,68 +125,11 @@ add_checkpoint({Props}, {History}) ->
     % any larger update seq than we're currently recording.
     FilteredHistory = filter_history(SourceSeq, TargetSeq, SourceHistory),
 
-    % Insert the new entry into the history and trim the history
-    % to keep an exponentially increasing delta between checkpoints.
-    % We do this by defining logical buckets of exponentially
-    % increasing size and then keep the smallest and largest values
-    % in each bucket. We keep both min and max points so that
-    % we don't end up with empty buckets as new points are added.
-    %
-    % NB: We're guaranteed to keep the newest entry passed to this
-    % function because we filter out all larger update sequences
-    % which means it is guaranteed to be the smallest value in the
-    % first bucket with a delta of 0.
-    WithNewEntry = [{Props} | FilteredHistory],
-
-    % Tag each entry with the bucket id
-    BucketTagged = lists:map(fun({Entry}) ->
-        EntrySourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
-        BucketTag = case SourceSeq - EntrySourceSeq of
-            0 ->
-                0;
-            N when N > 0 ->
-                % This is int(log2(SourceSeq - EntrySourceSeq))
-                trunc(math:log(N) / math:log(2))
-        end,
-        {BucketTag, SourceSeq - EntrySourceSeq, {Entry}}
-    end, WithNewEntry),
-
-    % Find the min/max entries for each bucket
-    Buckets = lists:foldl(fun({Bucket, Delta, Entry}, BucketAcc) ->
-        {MinEntry, MaxEntry} = case dict:find(Bucket, BucketAcc) of
-            {ok, Value} -> Value;
-            error -> {nil, nil}
-        end,
-        NewMin = case MinEntry of
-            {MinDelta, _} when Delta < MinDelta ->
-                {Delta, Entry};
-            nil ->
-                {Delta, Entry};
-            _ ->
-                MinEntry
-        end,
-        NewMax = case MaxEntry of
-            {MaxDelta, _} when Delta > MaxDelta ->
-                {Delta, Entry};
-            nil ->
-                {Delta, Entry};
-            _ ->
-                MaxEntry
-        end,
-        dict:store(Bucket, {NewMin, NewMax}, BucketAcc)
-    end, dict:new(), BucketTagged),
-
-    % Turn our bucket dict back into a list sorted by increasing
-    % deltas (which corresponds to decreasing source_seq values).
-    NewSourceHistory = lists:flatmap(fun({_Bucket, {Min, Max}}) ->
-        % If there's a single point in a bucket its both the min
-        % and max entry so we account for that here.
-        if Min == Max ->
-            [element(2, Min)];
-        true ->
-            [element(2, Min), element(2, Max)]
-        end
-    end, lists:sort(dict:to_list(Buckets))),
+    % Re-bucket our history based on the most recent source
+    % sequence. This is where we drop old checkpoints to
+    % maintain the exponential distribution.
+    {_, RebucketedHistory} = rebucket(FilteredHistory, SourceSeq, 0),
+    NewSourceHistory = [{Props} | RebucketedHistory],
 
     % Finally update the source node history and we're done.
     NodeRemoved = lists:keydelete(SourceNode, 1, History),
@@ -206,6 +149,72 @@ filter_history(SourceSeqThresh, TargetSeqThresh, History) ->
     lists:filter(TargetFilter, SourceFiltered).
 
 
+%% @doc This function adjusts our history to maintain a
+%% history of checkpoints that follow an exponentially
+%% increasing age from the most recent checkpoint.
+%%
+%% The terms newest and oldest used in these comments
+%% refers to the (NewSeq - CurSeq) difference where smaller
+%% values are considered newer.
+%%
+%% It works by assigning each entry to a bucket and keeping
+%% the newest and oldest entry in each bucket. Keeping
+%% both the newest and oldest means that we won't end up
+%% with empty buckets as checkpoints are promoted to new
+%% buckets.
+%%
+%% The return value of this function is a two-tuple of the
+%% form `{BucketId, History}` where BucketId is the id of
+%% the bucket for the first entry in History. This is used
+%% when recursing to detect the oldest value in a given
+%% bucket.
+%%
+%% This function expects the provided history to be sorted
+%% in descending order of source_seq values.
+rebucket([], _NewSeq, Bucket) ->
+    {Bucket+1, []};
+rebucket([{Entry} | RestHistory], NewSeq, Bucket) ->
+    CurSeq = couch_util:get_value(<<"source_seq">>, Entry),
+    case find_bucket(NewSeq, CurSeq, Bucket) of
+        Bucket ->
+            % This entry is in an existing bucket which means
+            % we will only keep it if its the oldest value
+            % in the bucket. To detect this we rebucket the
+            % rest of the list and only include Entry if the
+            % rest of the list is in a bigger bucket.
+            case rebucket(RestHistory, NewSeq, Bucket) of
+                {Bucket, NewHistory} ->
+                    % There's another entry in this bucket so we drop the
+                    % current entry.
+                    {Bucket, NewHistory};
+                {NextBucket, NewHistory} when NextBucket > Bucket ->
+                    % The rest of the history was rebucketed into a larger
+                    % bucket so this is the oldest entry in the current
+                    % bucket.
+                    {Bucket, [{Entry} | NewHistory]}
+            end;
+        NextBucket when NextBucket > Bucket ->
+            % This entry is the newest in NextBucket so we add it
+            % to our history and continue rebucketing.
+            {_, NewHistory} = rebucket(RestHistory, NewSeq, NextBucket),
+            {NextBucket, [{Entry} | NewHistory]}
+    end.
+
+
+%% @doc Find the bucket id for the given sequence pair.
+find_bucket(NewSeq, CurSeq, Bucket) ->
+    % The +1 constant in this comparison is a bit subtle. The
+    % reason for it is to make sure that the first entry in
+    % the history is guaranteed to have a BucketId of 1. This
+    % also relies on never having a duplicated update
+    % sequence so adding 1 here guarantees a difference >= 2.
+    if (NewSeq - CurSeq + 1) > (2 bsl Bucket) ->
+        find_bucket(NewSeq, CurSeq, Bucket+1);
+    true ->
+        Bucket
+    end.
+
+
 rexi_call(Node, MFA) ->
     Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
     Ref = rexi:cast(Node, self(), MFA, [sync]),


[43/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Avoid decom:true nodes when fixing zoning

This patch prevents mem3_rebalance:fix_zoning from suggesting moves
onto nodes that are flagged with "decom":true.

BugzID: 26362


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/fd3c7b59
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/fd3c7b59
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/fd3c7b59

Branch: refs/heads/master
Commit: fd3c7b59f2f79519b8a1f04b6931adde47aaa659
Parents: e64dd02
Author: Mike Wallace <mi...@googlemail.com>
Authored: Fri Dec 20 14:15:43 2013 +0000
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:51:29 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/fd3c7b59/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 54dcd1c..3972d89 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -400,7 +400,7 @@ apply_shard_moves(Shards, [{copy, Shard, Node}| Rest]) ->
 allowed_nodes(Fun) ->
     lists:filter(fun(Node) ->
         Fun(mem3:node_info(Node, <<"zone">>))
-    end, mem3:nodes()).
+    end, surviving_nodes()).
 
 surviving_nodes() ->
     lists:filter(fun(Node) ->


[26/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Ensure all shards are moved off non-target nodes

BugzID: 20742


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/6d9983fc
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/6d9983fc
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/6d9983fc

Branch: refs/heads/master
Commit: 6d9983fce97f9395049061a9e051eaac4a95ffe0
Parents: 720049b
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Aug 16 13:40:50 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 40 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/6d9983fc/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 8f5872a..28cd32e 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -23,8 +23,43 @@ rebalance(DbName) ->
 rebalance(DbName, TargetNodes) when is_binary(DbName) ->
     rebalance(mem3:shards(DbName), TargetNodes);
 rebalance(Shards, TargetNodes) when is_list(Shards) ->
+    %% first migrate shards off of non-target nodes
+    {OK, MoveThese} = lists:partition(fun(#shard{node=Node}) ->
+        lists:member(Node, TargetNodes)
+    end, Shards),
+    ShardsByTargetNode0 = lists:foldl(fun(Shard, Acc) ->
+        orddict:append(Shard#shard.node, Shard, Acc)
+    end, orddict:new(), OK),
+    ShardsByTargetNode = lists:sort(lists:foldl(fun(Node, Acc) ->
+        case orddict:is_key(Node, ShardsByTargetNode0) of
+            true ->
+                Acc;
+            false ->
+                [{Node, []} | Acc]
+        end
+    end, ShardsByTargetNode0, TargetNodes)),
+    Moves = find_replacements(MoveThese, ShardsByTargetNode, []),
+    Moved = [Shard#shard{node = Node} || {Shard, Node} <- Moves],
     TargetLevel = length(Shards) div length(TargetNodes),
-    rebalance2(TargetLevel, Shards, TargetNodes, TargetNodes, []).
+    rebalance2(TargetLevel, OK ++ Moved, TargetNodes, TargetNodes, Moves).
+
+find_replacements([], _ShardsByTargetNode, Result) ->
+    Result;
+find_replacements([Shard | Rest], ShardsByNode, Acc) ->
+    Zone = mem3:node_info(Shard#shard.node, <<"zone">>),
+    % Find a node in the same zone
+    InZone = [{Node, Shards} || {Node, Shards} <- ShardsByNode,
+        mem3:node_info(Node, <<"zone">>) =:= Zone],
+    % Prefer a node with the fewest number of shards
+    if InZone =:= [] ->
+        erlang:error({empty_zone, Zone, Shard});
+    true ->
+        ok
+    end,
+    [{TargetNode, _} | _] = lists:sort(fun smallest_first/2, InZone),
+    TargetShard = Shard#shard{node = TargetNode},
+    find_replacements(Rest, orddict:append(TargetNode, TargetShard, ShardsByNode),
+        [{Shard, TargetNode} | Acc]).
 
 rebalance2(_TargetLevel, Shards, _Nodes, [], Moves) ->
     {Shards, Moves};
@@ -71,6 +106,9 @@ victim(TargetLevel, Shards, Nodes, TargetNode) ->
 largest_first({_, A}, {_, B}) ->
     length(A) >= length(B).
 
+smallest_first({_, A}, {_, B}) ->
+    length(A) < length(B).
+
 replace(A, B, List) ->
     replace(A, B, List, []).
 


[14/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Preserve key and incorporate range into rotation key


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/1d50774d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/1d50774d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/1d50774d

Branch: refs/heads/master
Commit: 1d50774d269f749aa893518eb40b787241a0fa8c
Parents: 0efb85e
Author: Robert Newson <ro...@cloudant.com>
Authored: Tue Jun 25 12:37:33 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3.erl | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/1d50774d/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index 0046fd4..af17d5c 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -262,7 +262,8 @@ group_by_proximity(Shards, ZoneMap) ->
 
 choose_ushards(DbName, Shards) ->
     Groups0 = group_by_range(Shards),
-    Groups1 = [rotate_list(DbName, order_shards(G)) || G <- Groups0],
+    Groups1 = [rotate_list(term_to_binary({DbName, R}), order_shards(G))
+               || {R, G} <- Groups0],
     [hd(G) || G <- Groups1].
 
 rotate_list(_Key, []) ->
@@ -277,10 +278,8 @@ order_shards(UnorderedShards) ->
     UnorderedShards.
 
 group_by_range(Shards) ->
-    Groups0 = lists:foldl(fun(Shard, Dict) ->
-        orddict:append(mem3:range(Shard), Shard, Dict) end, orddict:new(), Shards),
-    {_, Groups} = lists:unzip(Groups0),
-    Groups.
+    lists:foldl(fun(Shard, Dict) ->
+        orddict:append(mem3:range(Shard), Shard, Dict) end, orddict:new(), Shards).
 
 % quorum functions
 


[12/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Use a private record for event listener state


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/43cd763d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/43cd763d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/43cd763d

Branch: refs/heads/master
Commit: 43cd763dcbda4889d65f0e0b0e2fea234e6be92c
Parents: 7706134
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri May 24 15:03:54 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3_sync.erl | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/43cd763d/src/mem3_sync.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync.erl b/src/mem3_sync.erl
index 9c82c0e..3d975fc 100644
--- a/src/mem3_sync.erl
+++ b/src/mem3_sync.erl
@@ -25,6 +25,12 @@
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
+-record(event_listener, {
+    nodes,
+    shards,
+    users
+}).
+
 -record(state, {
     active = [],
     count = 0,
@@ -265,18 +271,22 @@ sync_push(ShardName, N) ->
     gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity).
 
 start_event_listener() ->
-    State = {nodes_db(), shards_db(), users_db()},
+    State = #event_listener{
+        nodes = nodes_db(),
+        shards = shards_db(),
+        users = users_db()
+    },
     couch_event:link_listener(?MODULE, handle_db_event, State, [all_dbs]).
 
-handle_db_event(NodesDb, updated, {NodesDb, _, _}=St) ->
+handle_db_event(NodesDb, updated, #event_listener{nodes = NodesDb} = St) ->
     Nodes = mem3:nodes(),
     Live = nodes(),
     [?MODULE:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)],
     {ok, St};
-handle_db_event(ShardsDb, updated, {_, ShardsDb, _}=St) ->
+handle_db_event(ShardsDb, updated, #event_listener{shards = ShardsDb} = St) ->
     ?MODULE:push(ShardsDb, find_next_node()),
     {ok, St};
-handle_db_event(UsersDb, updated, {_, _, UsersDb}=St) ->
+handle_db_event(UsersDb, updated, #event_listener{users = UsersDb} = St) ->
     ?MODULE:push(UsersDb, find_next_node()),
     {ok, St};
 handle_db_event(<<"shards/", _/binary>> = ShardName, updated, St) ->


[44/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Fast forward internal repl. between file copies

In the case where two files have the same UUID we can analyze epoch
information to determine the safe start sequence.

BugzID: 27753


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/86652018
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/86652018
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/86652018

Branch: refs/heads/master
Commit: 866520183220a90f29c74319c7bc548fc9f39d48
Parents: fd3c7b5
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Tue Feb 4 15:44:14 2014 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:51:40 2014 +0100

----------------------------------------------------------------------
 src/mem3_rep.erl | 11 ++++++++++-
 src/mem3_rpc.erl | 45 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/86652018/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 2186fa3..339bd66 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -216,9 +216,18 @@ calculate_start_seq(Acc) ->
             end,
             Acc1#acc{seq = Seq, history = History};
         {not_found, _} ->
-            Acc1
+            compare_epochs(Acc1)
     end.
 
+compare_epochs(Acc) ->
+    #acc{
+        source = Db,
+        target = #shard{node=Node, name=Name}
+    } = Acc,
+    UUID = couch_db:get_uuid(Db),
+    Epochs = couch_db:get_epochs(Db),
+    Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs),
+    Acc#acc{seq = Seq, history = {[]}}.
 
 changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
     {ok, FDI} = couch_db:get_full_doc_info(Db, DocId),

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/86652018/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index 10294a7..9507f5b 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -16,6 +16,7 @@
 
 
 -export([
+    find_common_seq/4,
     get_missing_revs/4,
     update_docs/4,
     load_checkpoint/4,
@@ -24,6 +25,7 @@
 
 % Private RPC callbacks
 -export([
+    find_common_seq_rpc/3,
     load_checkpoint_rpc/3,
     save_checkpoint_rpc/5
 ]).
@@ -54,6 +56,11 @@ save_checkpoint(Node, DbName, DocId, Seq, Entry, History) ->
     rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}).
 
 
+find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
+    Args = [DbName, SourceUUID, SourceEpochs],
+    rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
+
+
 load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
     erlang:put(io_priority, {internal_repl, DbName}),
     case couch_db:open_int(DbName, [{user_ctx, ?CTX}]) of
@@ -107,6 +114,44 @@ save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
             rexi:reply(Error)
     end.
 
+find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
+    erlang:put(io_priority, {internal_repl, DbName}),
+    case couch_db:open_int(DbName, [{user_ctx, ?CTX}]) of
+    {ok, Db} ->
+        case couch_db:get_uuid(Db) of
+        SourceUUID ->
+            TargetEpochs = couch_db:get_epochs(Db),
+            Seq = compare_epochs(SourceEpochs, TargetEpochs),
+            rexi:reply({ok, Seq});
+        _Else ->
+            rexi:reply({ok, 0})
+        end;
+    Error ->
+        rexi:reply(Error)
+    end.
+
+
+%% @doc Return the sequence where two files with the same UUID diverged.
+compare_epochs(SourceEpochs, TargetEpochs) ->
+    compare_rev_epochs(
+        lists:reverse(SourceEpochs),
+        lists:reverse(TargetEpochs)
+    ).
+
+
+compare_rev_epochs([{Node, Seq} | SourceRest], [{Node, Seq} | TargetRest]) ->
+    % Common history, fast-forward
+    compare_epochs(SourceRest, TargetRest);
+compare_rev_epochs([], [{_, TargetSeq} | _]) ->
+    % Source has not moved, start from seq just before the target took over
+    TargetSeq - 1;
+compare_rev_epochs([{_, SourceSeq} | _], []) ->
+    % Target has not moved, start from seq where source diverged
+    SourceSeq;
+compare_rev_epochs([{_, SourceSeq} | _], [{_, TargetSeq} | _]) ->
+    % The source was moved to a new location independently, take the minimum
+    erlang:min(SourceSeq, TargetSeq) - 1.
+
 
 %% @doc This adds a new update sequence checkpoint to the replication
 %%      history. Checkpoints are keyed by the source node so that we


[32/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Stop donating once the target level is achieved

Also switched to a record accumulator for clarity.

BugzID: 24466


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/5ec7d044
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/5ec7d044
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/5ec7d044

Branch: refs/heads/master
Commit: 5ec7d0440ea9b4c63e7b35e365380109a108b0c8
Parents: 51838ca
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 30 11:53:22 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 48 +++++++++++++++++++++++++++++++++------------
 1 file changed, 36 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/5ec7d044/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index af050f6..234cad2 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -35,6 +35,14 @@
 
 -include("mem3.hrl").
 
+-record (gacc, {
+    node,
+    targets,
+    moves,
+    limit,
+    target_level
+}).
+
 %% @equiv expand(1000)
 -spec expand() -> [{atom(), #shard{}, node()}].
 expand() ->
@@ -137,11 +145,15 @@ global_expand(TargetNodes0, LocalOps, Limit) ->
             Acc;
         ({Node0, Count}, Acc) ->
             Node = list_to_existing_atom(binary_to_list(Node0)),
-            % Compute the max number of shards to donate.
-            DC0 = erlang:min(Count - TargetLevel, Limit - length(Acc)),
-            InternalAcc0 = {Node, TargetNodes0, Acc, DC0},
+            InternalAcc0 = #gacc{
+                node = Node,
+                targets = TargetNodes0,
+                moves = Acc,
+                limit = erlang:min(Count - TargetLevel, Limit - length(Acc)),
+                target_level = TargetLevel
+            },
             try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of
-                {_, _, Moves, _} ->
+                #gacc{moves = Moves} ->
                     Moves
             catch
                 {complete, Moves} ->
@@ -150,9 +162,15 @@ global_expand(TargetNodes0, LocalOps, Limit) ->
     end,
     lists:foldl(FoldFun, LocalOps, CountByNode).
 
-donate_fold(_Shard, {_, _, Moves, 0}) ->
+donate_fold(_Shard, #gacc{limit = 0, moves = Moves}) ->
     throw({complete, Moves});
-donate_fold(#shard{node = Node} = Shard, {Node, Nodes, Moves, DC}) ->
+donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) ->
+     #gacc{
+        targets = Nodes,
+        moves = Moves,
+        limit = DC,
+        target_level = TargetLevel
+    } = Acc0,
     Zone = mem3:node_info(Node, <<"zone">>),
     Shards = apply_shard_moves(mem3:shards(Shard#shard.dbname), Moves),
     InZone = filter_map_by_zone(shards_by_node(Shards, Nodes), Zone),
@@ -162,21 +180,27 @@ donate_fold(#shard{node = Node} = Shard, {Node, Nodes, Moves, DC}) ->
     end, SortedByCount),
     case {lists:member(Shard, Shards), Candidates} of
         {false, _} ->
-            {Node, Nodes, Moves, DC};
+            Acc0;
         {true, []} ->
-            {Node, Nodes, Moves, DC};
+            Acc0;
         {true, [{Node, _} | _]} ->
-            {Node, Nodes, Moves, DC};
+            Acc0;
         {true, [{Target, _} | _]} ->
             % Execute the move only if the target has fewer shards for this DB
             % than the source. Otherwise we'd generate a local imbalance.
             SourceCount = get_shard_count(Node, SortedByCount),
             TargetCount = get_shard_count(Target, SortedByCount),
-            if TargetCount < SourceCount ->
+            % Execute the move only if the target needs shards.
+            NodeKey = couch_util:to_binary(Target),
+            Total = couch_util:get_value(NodeKey, shard_count_by_node(Moves)),
+            if (TargetCount < SourceCount), (Total < TargetLevel) ->
                 print({move, Shard, Target}),
-                {Node, Nodes, [{move, Shard, Target} | Moves], DC - 1};
+                Acc0#gacc{
+                    moves = [{move, Shard, Target} | Moves],
+                    limit = DC - 1
+                };
             true ->
-                {Node, Nodes, Moves, DC}
+                Acc0
             end
     end;
 donate_fold(_Shard, Acc) ->


[05/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Choose ushards according to persistent record

The order of nodes in the by_range section of "dbs" documents is now
promoted to the principal order for ushards. Ushards still accounts
for Liveness, selecting the first live replica and still supports
Spread by rotating this list using the CRC32 of the database name
(since many databases will have the same layout).

If by_range and by_node are not symmetrical then by_node is used and
order is undefined to match existing behavior.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/f9c22769
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/f9c22769
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/f9c22769

Branch: refs/heads/master
Commit: f9c22769ea089812e401554f06626f7d0d1dac4b
Parents: a0d7430
Author: Robert Newson <ro...@cloudant.com>
Authored: Tue Apr 23 22:54:48 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:19 2014 +0100

----------------------------------------------------------------------
 include/mem3.hrl    | 10 +++++++
 src/mem3.erl        | 78 +++++++++++++++++++++++++++++++++++-------------
 src/mem3_shards.erl | 45 +++++++++++++++++++++-------
 src/mem3_util.erl   | 66 +++++++++++++++++++++++++++++++++++++---
 4 files changed, 164 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/f9c22769/include/mem3.hrl
----------------------------------------------------------------------
diff --git a/include/mem3.hrl b/include/mem3.hrl
index cb39e78..7c20061 100644
--- a/include/mem3.hrl
+++ b/include/mem3.hrl
@@ -19,6 +19,16 @@
     ref :: reference() | 'undefined' | '_'
 }).
 
+%% Do not reference outside of mem3.
+-record(ordered_shard, {
+    name :: binary() | '_',
+    node :: node() | '_',
+    dbname :: binary(),
+    range :: [non_neg_integer() | '$1' | '$2'],
+    ref :: reference() | 'undefined' | '_',
+    order :: non_neg_integer() | 'undefined' | '_'
+}).
+
 %% types
 -type join_type() :: init | join | replace | leave.
 -type join_order() :: non_neg_integer().

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/f9c22769/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index 36ff87b..8f8808b 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -21,6 +21,9 @@
 -export([live_shards/2]).
 -export([belongs/2]).
 
+%% For mem3 use only.
+-export([name/1, node/1, range/1]).
+
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
@@ -74,12 +77,25 @@ node_info(Node, Key) ->
     mem3_nodes:get_node_info(Node, Key).
 
 -spec shards(DbName::iodata()) -> [#shard{}].
-shards(DbName) when is_list(DbName) ->
-    shards(list_to_binary(DbName));
 shards(DbName) ->
+    shards_int(DbName, []).
+
+shards_int(DbName, Options) when is_list(DbName) ->
+    shards_int(list_to_binary(DbName), Options);
+shards_int(DbName, Options) ->
+    Ordered = lists:member(ordered, Options),
     ShardDbName =
         list_to_binary(config:get("mem3", "shard_db", "dbs")),
     case DbName of
+    ShardDbName when Ordered ->
+        %% shard_db is treated as a single sharded db to support calls to db_info
+        %% and view_all_docs
+        [#ordered_shard{
+            node = node(),
+            name = ShardDbName,
+            dbname = ShardDbName,
+            range = [0, 2 bsl 31],
+            order = undefined}];
     ShardDbName ->
         %% shard_db is treated as a single sharded db to support calls to db_info
         %% and view_all_docs
@@ -89,22 +105,27 @@ shards(DbName) ->
             dbname = ShardDbName,
             range = [0, 2 bsl 31]}];
     _ ->
-        mem3_shards:for_db(DbName)
+        mem3_shards:for_db(DbName, Options)
     end.
 
 -spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}].
-shards(DbName, DocId) when is_list(DbName) ->
-    shards(list_to_binary(DbName), DocId);
-shards(DbName, DocId) when is_list(DocId) ->
-    shards(DbName, list_to_binary(DocId));
 shards(DbName, DocId) ->
-    mem3_shards:for_docid(DbName, DocId).
+    shards_int(DbName, DocId, []).
+
+shards_int(DbName, DocId, Options) when is_list(DbName) ->
+    shards_int(list_to_binary(DbName), DocId, Options);
+shards_int(DbName, DocId, Options) when is_list(DocId) ->
+    shards_int(DbName, list_to_binary(DocId), Options);
+shards_int(DbName, DocId, Options) ->
+    mem3_shards:for_docid(DbName, DocId, Options).
+
 
 -spec ushards(DbName::iodata()) -> [#shard{}].
 ushards(DbName) ->
     Nodes = [node()|erlang:nodes()],
     ZoneMap = zone_map(Nodes),
-    ushards(DbName, live_shards(DbName, Nodes), ZoneMap).
+    Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap),
+    mem3_util:downcast(Shards).
 
 ushards(DbName, Shards0, ZoneMap) ->
     {L,S,D} = group_by_proximity(Shards0, ZoneMap),
@@ -209,6 +230,8 @@ belongs(Begin, End, DocId) ->
 
 range(#shard{range = Range}) ->
     Range;
+range(#ordered_shard{range = Range}) ->
+    Range;
 range(<<"shards/", Start:8/binary, "-", End:8/binary, "/", _/binary>>) ->
     [httpd_util:hexlist_to_integer(binary_to_list(Start)),
      httpd_util:hexlist_to_integer(binary_to_list(End))].
@@ -217,29 +240,29 @@ nodes_in_zone(Nodes, Zone) ->
     [Node || Node <- Nodes, Zone == mem3:node_info(Node, <<"zone">>)].
 
 live_shards(DbName, Nodes) ->
-    [S || #shard{node=Node} = S <- shards(DbName), lists:member(Node, Nodes)].
+    live_shards(DbName, Nodes, []).
+
+live_shards(DbName, Nodes, Options) ->
+    [S || S <- shards_int(DbName, Options), lists:member(mem3:node(S), Nodes)].
 
 zone_map(Nodes) ->
     [{Node, node_info(Node, <<"zone">>)} || Node <- Nodes].
 
 group_by_proximity(Shards) ->
-    Nodes = [N || #shard{node=N} <- lists:ukeysort(#shard.node, Shards)],
+    Nodes = [mem3:node(S) || S <- lists:ukeysort(#shard.node, Shards)],
     group_by_proximity(Shards, zone_map(Nodes)).
 
 group_by_proximity(Shards, ZoneMap) ->
-    {Local, Remote} = lists:partition(fun(S) -> S#shard.node =:= node() end,
+    {Local, Remote} = lists:partition(fun(S) -> mem3:node(S) =:= node() end,
         Shards),
     LocalZone = proplists:get_value(node(), ZoneMap),
-    Fun = fun(S) -> proplists:get_value(S#shard.node, ZoneMap) =:= LocalZone end,
+    Fun = fun(S) -> proplists:get_value(mem3:node(S), ZoneMap) =:= LocalZone end,
     {SameZone, DifferentZone} = lists:partition(Fun, Remote),
     {Local, SameZone, DifferentZone}.
 
 choose_ushards(DbName, Shards) ->
-    Groups = group_by_range(rotate_list(DbName, lists:sort(Shards))),
-    Fun = fun(Group, {N, Acc}) ->
-        {N+1, [lists:nth(1 + N rem length(Group), Group) | Acc]} end,
-    {_, Result} = lists:foldl(Fun, {0, []}, Groups),
-    Result.
+    [hd(G) || G <- [rotate_list(DbName, order_shards(G)) ||
+        G <- group_by_range(Shards)]].
 
 rotate_list(_DbName, []) ->
     [];
@@ -247,9 +270,14 @@ rotate_list(DbName, List) ->
     {H, T} = lists:split(erlang:crc32(DbName) rem length(List), List),
     T ++ H.
 
+order_shards([#ordered_shard{}|_]=OrderedShards) ->
+    lists:keysort(#ordered_shard.order, OrderedShards);
+order_shards(UnorderedShards) ->
+    UnorderedShards.
+
 group_by_range(Shards) ->
-    Groups0 = lists:foldl(fun(#shard{range=Range}=Shard, Dict) ->
-        orddict:append(Range, Shard, Dict) end, orddict:new(), Shards),
+    Groups0 = lists:foldl(fun(Shard, Dict) ->
+        orddict:append(mem3:range(Shard), Shard, Dict) end, orddict:new(), Shards),
     {_, Groups} = lists:unzip(Groups0),
     Groups.
 
@@ -259,3 +287,13 @@ quorum(#db{name=DbName}) ->
     quorum(DbName);
 quorum(DbName) ->
     n(DbName) div 2 + 1.
+
+node(#shard{node=Node}) ->
+    Node;
+node(#ordered_shard{node=Node}) ->
+    Node.
+
+name(#shard{name=Name}) ->
+    Name;
+name(#ordered_shard{name=Name}) ->
+    Name.

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/f9c22769/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index 9315c5c..6127ff6 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -19,7 +19,7 @@
 -export([handle_config_change/5]).
 
 -export([start_link/0]).
--export([for_db/1, for_docid/2, get/3, local/1, fold/2]).
+-export([for_db/1, for_db/2, for_docid/2, for_docid/3, get/3, local/1, fold/2]).
 -export([set_max_size/1]).
 
 -record(st, {
@@ -39,7 +39,10 @@ start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 for_db(DbName) ->
-    try ets:lookup(?SHARDS, DbName) of
+    for_db(DbName, []).
+
+for_db(DbName, Options) ->
+    Shards = try ets:lookup(?SHARDS, DbName) of
         [] ->
             load_shards_from_disk(DbName);
         Else ->
@@ -47,26 +50,47 @@ for_db(DbName) ->
             Else
     catch error:badarg ->
         load_shards_from_disk(DbName)
+    end,
+    case lists:member(ordered, Options) of
+        true  -> Shards;
+        false -> mem3_util:downcast(Shards)
     end.
 
 for_docid(DbName, DocId) ->
+    for_docid(DbName, DocId, []).
+
+for_docid(DbName, DocId, Options) ->
     HashKey = mem3_util:hash(DocId),
-    Head = #shard{
+    ShardHead = #shard{
         name = '_',
         node = '_',
         dbname = DbName,
         range = ['$1','$2'],
         ref = '_'
     },
+    OrderedShardHead = #ordered_shard{
+        name = '_',
+        node = '_',
+        dbname = DbName,
+        range = ['$1','$2'],
+        ref = '_',
+        order = '_'
+    },
     Conditions = [{'=<', '$1', HashKey}, {'=<', HashKey, '$2'}],
-    try ets:select(?SHARDS, [{Head, Conditions, ['$_']}]) of
+    ShardSpec = {ShardHead, Conditions, ['$_']},
+    OrderedShardSpec = {OrderedShardHead, Conditions, ['$_']},
+    Shards = try ets:select(?SHARDS, [ShardSpec, OrderedShardSpec]) of
         [] ->
             load_shards_from_disk(DbName, DocId);
-        Shards ->
+        Else ->
             gen_server:cast(?MODULE, {cache_hit, DbName}),
-            Shards
+            Else
     catch error:badarg ->
         load_shards_from_disk(DbName, DocId)
+    end,
+    case lists:member(ordered, Options) of
+        true  -> Shards;
+        false -> mem3_util:downcast(Shards)
     end.
 
 get(DbName, Node, Range) ->
@@ -221,10 +245,10 @@ changes_callback({change, {Change}, _}, _) ->
                 couch_log:error("missing partition table for ~s: ~p",
                     [DbName, Reason]);
             {Doc} ->
-                Shards = mem3_util:build_shards(DbName, Doc),
+                Shards = mem3_util:build_ordered_shards(DbName, Doc),
                 gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
-                [create_if_missing(Name) || #shard{name=Name, node=Node}
-                    <- Shards, Node =:= node()]
+                [create_if_missing(mem3:name(S)) || S
+                    <- Shards, mem3:node(S) =:= node()]
             end
         end
     end,
@@ -244,7 +268,7 @@ load_shards_from_disk(DbName) when is_binary(DbName) ->
 load_shards_from_db(#db{} = ShardDb, DbName) ->
     case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of
     {ok, #doc{body = {Props}}} ->
-        Shards = mem3_util:build_shards(DbName, Props),
+        Shards = mem3_util:build_ordered_shards(DbName, Props),
         gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
         Shards;
     {not_found, _} ->
@@ -326,4 +350,3 @@ cache_clear(St) ->
     true = ets:delete_all_objects(?SHARDS),
     true = ets:delete_all_objects(?ATIMES),
     St#st{cur_size=0}.
-

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/f9c22769/src/mem3_util.erl
----------------------------------------------------------------------
diff --git a/src/mem3_util.erl b/src/mem3_util.erl
index 6ed8924..a63d9a0 100644
--- a/src/mem3_util.erl
+++ b/src/mem3_util.erl
@@ -17,6 +17,9 @@
     shard_info/1, ensure_exists/1, open_db_doc/1]).
 -export([owner/2, is_deleted/1]).
 
+%% do not use outside mem3.
+-export([build_ordered_shards/2, downcast/1]).
+
 -export([create_partition_map/4, name_shard/1]).
 -deprecated({create_partition_map, 4, eventually}).
 -deprecated({name_shard, 1, eventually}).
@@ -34,10 +37,17 @@ hash(Item) ->
 name_shard(Shard) ->
     name_shard(Shard, "").
 
-name_shard(#shard{dbname = DbName, range=[B,E]} = Shard, Suffix) ->
-    Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
-        couch_util:to_hex(<<E:32/integer>>), "/", DbName, Suffix],
-    Shard#shard{name = ?l2b(Name)}.
+name_shard(#shard{dbname = DbName, range=Range} = Shard, Suffix) ->
+    Name = make_name(DbName, Range, Suffix),
+    Shard#shard{name = ?l2b(Name)};
+
+name_shard(#ordered_shard{dbname = DbName, range=Range} = Shard, Suffix) ->
+    Name = make_name(DbName, Range, Suffix),
+    Shard#ordered_shard{name = ?l2b(Name)}.
+
+make_name(DbName, [B,E], Suffix) ->
+    ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
+     couch_util:to_hex(<<E:32/integer>>), "/", DbName, Suffix].
 
 create_partition_map(DbName, N, Q, Nodes) ->
     create_partition_map(DbName, N, Q, Nodes, "").
@@ -122,7 +132,25 @@ delete_db_doc(DbName, DocId, ShouldMutate) ->
         couch_db:close(Db)
     end.
 
+%% Always returns original #shard records.
+-spec build_shards(binary(), list()) -> [#shard{}].
 build_shards(DbName, DocProps) ->
+    build_shards_by_node(DbName, DocProps).
+
+%% Will return #ordered_shard records if by_node and by_range
+%% are symmetrical, #shard records otherwise.
+-spec build_ordered_shards(binary(), list()) ->
+    [#shard{}] | [#ordered_shard{}].
+build_ordered_shards(DbName, DocProps) ->
+    ByNode = build_shards_by_node(DbName, DocProps),
+    ByRange = build_shards_by_range(DbName, DocProps),
+    Symmetrical = lists:sort(ByNode) =:= lists:sort(downcast(ByRange)),
+    case Symmetrical of
+        true  -> ByRange;
+        false -> ByNode
+    end.
+
+build_shards_by_node(DbName, DocProps) ->
     {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
     Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
     lists:flatmap(fun({Node, Ranges}) ->
@@ -138,6 +166,23 @@ build_shards(DbName, DocProps) ->
         end, Ranges)
     end, ByNode).
 
+build_shards_by_range(DbName, DocProps) ->
+    {ByRange} = couch_util:get_value(<<"by_range">>, DocProps, {[]}),
+    Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+    lists:flatmap(fun({Range, Nodes}) ->
+        lists:map(fun({Node, Order}) ->
+            [B,E] = string:tokens(?b2l(Range), "-"),
+            Beg = httpd_util:hexlist_to_integer(B),
+            End = httpd_util:hexlist_to_integer(E),
+            name_shard(#ordered_shard{
+                dbname = DbName,
+                node = to_atom(Node),
+                range = [Beg, End],
+                order = Order
+            }, Suffix)
+        end, lists:zip(Nodes, lists:seq(1, length(Nodes))))
+    end, ByRange).
+
 to_atom(Node) when is_binary(Node) ->
     list_to_atom(binary_to_list(Node));
 to_atom(Node) when is_atom(Node) ->
@@ -194,3 +239,16 @@ is_deleted(Change) ->
     Else ->
         Else
     end.
+
+downcast(#shard{}=S) ->
+    S;
+downcast(#ordered_shard{}=S) ->
+    #shard{
+       name = S#ordered_shard.name,
+       node = S#ordered_shard.node,
+       dbname = S#ordered_shard.dbname,
+       range = S#ordered_shard.range,
+       ref = S#ordered_shard.ref
+      };
+downcast(Shards) when is_list(Shards) ->
+    [downcast(Shard) || Shard <- Shards].


[35/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Refactor global candidate selection

The old approach was getting unwieldy, hopefully this makes the tests
more explicit.

BugzID: 24466


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/5c3c9c93
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/5c3c9c93
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/5c3c9c93

Branch: refs/heads/master
Commit: 5c3c9c93ec19dfa50530f79dc6bb743bcb26ce81
Parents: 5ec7d04
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 30 13:10:42 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 51 ++++++++++++++++++++++++++-------------------
 1 file changed, 29 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/5c3c9c93/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 234cad2..b7d161c 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -175,33 +175,40 @@ donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) ->
     Shards = apply_shard_moves(mem3:shards(Shard#shard.dbname), Moves),
     InZone = filter_map_by_zone(shards_by_node(Shards, Nodes), Zone),
     SortedByCount = lists:sort(smallest_first(Moves), InZone),
-    Candidates = lists:dropwhile(fun({_Node, OwnShards}) ->
-        lists:keymember(Shard#shard.range, #shard.range, OwnShards)
-    end, SortedByCount),
-    case {lists:member(Shard, Shards), Candidates} of
+    SourceCount = get_shard_count(Node, SortedByCount),
+    Fun = fun({CandidateNode, OwnShards}) ->
+        HasRange = lists:keymember(Shard#shard.range, #shard.range, OwnShards),
+        TargetCount = get_shard_count(CandidateNode, SortedByCount),
+        NodeKey = couch_util:to_binary(CandidateNode),
+        Total = couch_util:get_value(NodeKey, shard_count_by_node(Moves)),
+        if
+            CandidateNode =:= Node ->
+                % Can't move a shard to ourselves
+                true;
+            HasRange ->
+                % The candidate already has this shard
+                true;
+            TargetCount >= SourceCount ->
+                % Executing this move would create a local imbalance in the DB
+                true;
+            Total >= TargetLevel ->
+                % The candidate has already achieved the target level
+                true;
+            true ->
+                false
+        end
+    end,
+    case {lists:member(Shard, Shards), lists:dropwhile(Fun, SortedByCount)} of
         {false, _} ->
             Acc0;
         {true, []} ->
             Acc0;
-        {true, [{Node, _} | _]} ->
-            Acc0;
         {true, [{Target, _} | _]} ->
-            % Execute the move only if the target has fewer shards for this DB
-            % than the source. Otherwise we'd generate a local imbalance.
-            SourceCount = get_shard_count(Node, SortedByCount),
-            TargetCount = get_shard_count(Target, SortedByCount),
-            % Execute the move only if the target needs shards.
-            NodeKey = couch_util:to_binary(Target),
-            Total = couch_util:get_value(NodeKey, shard_count_by_node(Moves)),
-            if (TargetCount < SourceCount), (Total < TargetLevel) ->
-                print({move, Shard, Target}),
-                Acc0#gacc{
-                    moves = [{move, Shard, Target} | Moves],
-                    limit = DC - 1
-                };
-            true ->
-                Acc0
-            end
+            print({move, Shard, Target}),
+            Acc0#gacc{
+                moves = [{move, Shard, Target} | Moves],
+                limit = DC - 1
+            }
     end;
 donate_fold(_Shard, Acc) ->
     Acc.


[18/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Use a consistent commenting syntax


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/a29e6b75
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/a29e6b75
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/a29e6b75

Branch: refs/heads/master
Commit: a29e6b7556862f82ce416f93ac5cf847f36a276e
Parents: 94fc763
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Mon Aug 19 09:43:08 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/a29e6b75/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 08605be..123bf47 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -23,11 +23,11 @@ rebalance(DbName) ->
 rebalance(DbName, TargetNodes) when is_binary(DbName) ->
     rebalance(mem3:shards(DbName), TargetNodes);
 rebalance(Shards, TargetNodes) when is_list(Shards) ->
-    %% first migrate shards off of non-target nodes
+    % First migrate all shards off of non-target nodes
     {OK, MoveThese} = lists:partition(fun(#shard{node=Node}) ->
         lists:member(Node, TargetNodes)
     end, Shards),
-    % ensure every target node is present in the orddict
+    % Ensure every target node is present in the orddict
     ShardsByTargetNode0 = orddict:from_list([{N,[]} || N <- TargetNodes]),
     ShardsByTargetNode = lists:foldl(fun(Shard, Acc) ->
         orddict:append(Shard#shard.node, Shard, Acc)


[49/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Replace twig with couch_log


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/5668712f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/5668712f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/5668712f

Branch: refs/heads/master
Commit: 5668712fab6f55ad3edac7a86b30b90d8393433e
Parents: b3ddfca
Author: Paul J. Davis <pa...@gmail.com>
Authored: Sun Aug 17 16:18:30 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Sun Aug 17 16:18:30 2014 -0500

----------------------------------------------------------------------
 src/mem3_rep.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/5668712f/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index 339bd66..1213884 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -336,7 +336,7 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
         {ok, _, not_found} ->
             {not_found, missing};
         Else ->
-            twig:log(err, "Error finding replication doc: ~w", [Else]),
+            couch_log:error("Error finding replication doc: ~w", [Else]),
             {not_found, missing}
     end.
 


[08/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Zero out shard caches on upgrade

The mix of #shard and #ordered_shard records breaks ushards.  Different
nodes can start returning different results.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/5aa38c6f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/5aa38c6f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/5aa38c6f

Branch: refs/heads/master
Commit: 5aa38c6f15eb08b98b53c00e398b2f486a825cd6
Parents: d2171e9
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Jun 21 00:14:42 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3_shards.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/5aa38c6f/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index 6127ff6..2d21db2 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -193,7 +193,7 @@ terminate(_Reason, #st{changes_pid=Pid}) ->
     ok.
 
 code_change(_OldVsn, St, _Extra) ->
-    {ok, St}.
+    {ok, cache_clear(St)}.
 
 %% internal functions
 


[03/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Moving shard maps _membership endpoint to _shards db handler


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/a77e95f6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/a77e95f6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/a77e95f6

Branch: refs/heads/master
Commit: a77e95f64cce417710897d9dee0c62d5169a44a3
Parents: d4e7748
Author: Russell Branca <ch...@gmail.com>
Authored: Fri Apr 12 16:48:14 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:12:09 2014 +0100

----------------------------------------------------------------------
 src/mem3_httpd.erl | 22 ++++++++++------------
 1 file changed, 10 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/a77e95f6/src/mem3_httpd.erl
----------------------------------------------------------------------
diff --git a/src/mem3_httpd.erl b/src/mem3_httpd.erl
index dc42924..2331e95 100644
--- a/src/mem3_httpd.erl
+++ b/src/mem3_httpd.erl
@@ -12,7 +12,7 @@
 
 -module(mem3_httpd).
 
--export([handle_membership_req/1]).
+-export([handle_membership_req/1, handle_shards_req/2]).
 
 %% includes
 -include_lib("mem3/include/mem3.hrl").
@@ -26,27 +26,25 @@ handle_membership_req(#httpd{method='GET',
     couch_httpd:send_json(Req, {[
         {all_nodes, lists:sort([node()|nodes()])},
         {cluster_nodes, lists:sort(ClusterNodes)}
-    ]});
-handle_membership_req(#httpd{method='GET',
-        path_parts=[<<"_membership">>, DbName]} = Req) ->
-    ClusterNodes = try mem3:nodes()
-    catch _:_ -> {ok,[]} end,
+    ]}).
+
+handle_shards_req(#httpd{method='GET',
+        path_parts=[_DbName, <<"_shards">>]} = Req, Db) ->
+    DbName = mem3:dbname(Db#db.name),
     Shards = mem3:shards(DbName),
     JsonShards = json_shards(Shards, dict:new()),
     couch_httpd:send_json(Req, {[
-        {all_nodes, lists:sort([node()|nodes()])},
-        {cluster_nodes, lists:sort(ClusterNodes)},
-        {partitions, JsonShards}
+        {shards, JsonShards}
     ]});
-handle_membership_req(#httpd{method='GET',
-        path_parts=[<<"_membership">>, DbName, DocId]} = Req) ->
+handle_shards_req(#httpd{method='GET',
+        path_parts=[_DbName, <<"_shards">>, DocId]} = Req, Db) ->
+    DbName = mem3:dbname(Db#db.name),
     Shards = mem3:shards(DbName, DocId),
     {[{Shard, Dbs}]} = json_shards(Shards, dict:new()),
     couch_httpd:send_json(Req, {[
         {range, Shard},
         {nodes, Dbs}
     ]}).
-
 %%
 %% internal
 %%


[33/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Allow targets to exceed floor, add another check

Sometimes we want to transfer a shard to a target even though it's
already at the floor.  We add another check to make sure we're not
wasitng effort -- the difference in shard counts between the source and
the target must be 2 or greater.

We also refactor the global shard count code to avoid future atom /
binary problems.

BugzID: 24466


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/a6ca5c6f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/a6ca5c6f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/a6ca5c6f

Branch: refs/heads/master
Commit: a6ca5c6fa36f8b49bf7785a932b30aef8693b286
Parents: 5c3c9c9
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 30 14:17:03 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 23 ++++++++++++++++-------
 1 file changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/a6ca5c6f/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index b7d161c..5589e45 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -135,7 +135,7 @@ global_expand(TargetNodes0, LocalOps, Limit) ->
         lists:member(Node, TargetNodes)
     end, shard_count_by_node(LocalOps)),
     TotalCount = lists:foldl(fun({_, C}, Sum) -> Sum + C end, 0, CountByNode),
-    TargetLevel = (TotalCount div length(TargetNodes)) + 1,
+    TargetLevel = TotalCount div length(TargetNodes),
     FoldFun = fun
         (_, Acc) when length(Acc) >= Limit ->
             % We've already accumulated the max number of shard ops.
@@ -176,11 +176,12 @@ donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) ->
     InZone = filter_map_by_zone(shards_by_node(Shards, Nodes), Zone),
     SortedByCount = lists:sort(smallest_first(Moves), InZone),
     SourceCount = get_shard_count(Node, SortedByCount),
+    GlobalShardCounts = shard_count_by_node(Moves),
+    TotalSource = get_global_shard_count(Node, GlobalShardCounts),
     Fun = fun({CandidateNode, OwnShards}) ->
         HasRange = lists:keymember(Shard#shard.range, #shard.range, OwnShards),
         TargetCount = get_shard_count(CandidateNode, SortedByCount),
-        NodeKey = couch_util:to_binary(CandidateNode),
-        Total = couch_util:get_value(NodeKey, shard_count_by_node(Moves)),
+        TotalTarget = get_global_shard_count(CandidateNode, GlobalShardCounts),
         if
             CandidateNode =:= Node ->
                 % Can't move a shard to ourselves
@@ -191,8 +192,11 @@ donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) ->
             TargetCount >= SourceCount ->
                 % Executing this move would create a local imbalance in the DB
                 true;
-            Total >= TargetLevel ->
-                % The candidate has already achieved the target level
+            TotalTarget > TargetLevel ->
+                % The candidate has already exceeded the target level
+                true;
+            (TotalSource - TotalTarget) < 2 ->
+                % Donating here is wasted work
                 true;
             true ->
                 false
@@ -216,6 +220,11 @@ donate_fold(_Shard, Acc) ->
 get_shard_count(AtomKey, ShardsByNode) when is_atom(AtomKey) ->
     length(couch_util:get_value(AtomKey, ShardsByNode, [])).
 
+get_global_shard_count(Node, Counts) when is_atom(Node) ->
+    get_global_shard_count(couch_util:to_binary(Node), Counts);
+get_global_shard_count(Node, Counts) when is_binary(Node) ->
+    couch_util:get_value(Node, Counts, 0).
+
 compute_moves(IdealZoning, IdealZoning, _Copies, OtherMoves) ->
     OtherMoves;
 compute_moves(IdealZoning, ActualZoning, Copies, OtherMoves) ->
@@ -330,8 +339,8 @@ smallest_first(PrevMoves) ->
     fun(A, B) -> sort_by_count(A, B, Global) =< 0 end.
 
 sort_by_count({NodeA, SA}, {NodeB, SB}, Global) when length(SA) =:= length(SB) ->
-    CountA = couch_util:get_value(couch_util:to_binary(NodeA), Global, 0),
-    CountB = couch_util:get_value(couch_util:to_binary(NodeB), Global, 0),
+    CountA = get_global_shard_count(NodeA, Global),
+    CountB = get_global_shard_count(NodeB, Global),
     cmp(CountA, CountB);
 sort_by_count({_, A}, {_, B}, _) ->
     cmp(length(A), length(B)).


[04/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
If two shards differ we need to sync

There's no security if two shards return different answers but it gives
us enough of a signal to know that we need to trigger a full on
synchronization.

BugzId: 18955


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/a0d74306
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/a0d74306
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/a0d74306

Branch: refs/heads/master
Commit: a0d743062007113c5aa81526f4f1285c083b8daa
Parents: a77e95f
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Apr 16 17:12:38 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:12:19 2014 +0100

----------------------------------------------------------------------
 src/mem3_sync_security.erl | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/a0d74306/src/mem3_sync_security.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync_security.erl b/src/mem3_sync_security.erl
index 901b70d..9edd0ec 100644
--- a/src/mem3_sync_security.erl
+++ b/src/mem3_sync_security.erl
@@ -35,6 +35,8 @@ maybe_sync_int(#shard{name=Name}=Src, Dst) ->
                 1 -> ok;
                 2 -> go(DbName)
             end;
+        {error, no_majority} ->
+            go(DbName);
         Else ->
             Args = [DbName, Else],
             couch_log:error("Error checking security objects for ~s :: ~p", Args)


[06/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Support balancing across a subset of nodes

mem3_balance implicitly assumed the set of nodes over which the DB is
hosted is expanding.  We need to make a couple of small changes in the
case of cluster contraction.

BugzID: 20742


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/e30659b6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/e30659b6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/e30659b6

Branch: refs/heads/master
Commit: e30659b6fd5a788b17297f4312740d46ef63e204
Parents: c9292bb
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Tue Jul 2 14:30:09 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/e30659b6/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index ca3c4a7..8f5872a 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -47,13 +47,14 @@ rebalance2(TargetLevel, Shards, Nodes, [Node | Rest], Moves) ->
 
 victim(TargetLevel, Shards, Nodes, TargetNode) ->
     TargetZone = mem3:node_info(TargetNode, <<"zone">>),
-    CandidateNodes = lists:usort([Node || Node <- Nodes,
+    CandidateNodes = lists:usort([Node || Node <- mem3:nodes(),
                                      Node =/= TargetNode,
                                      mem3:node_info(Node, <<"zone">>) =:= TargetZone]),
     %% make {Node, ShardsInNode} list
     GroupedByNode0 = [{Node, [S || S <- Shards, S#shard.node =:= Node]} || Node <- CandidateNodes],
-    %% don't take from a node below target level
-    GroupedByNode1 = [{N, SS} || {N, SS} <- GroupedByNode0, length(SS) > TargetLevel],
+    %% don't take from a balancing node below target level
+    GroupedByNode1 = [{N, SS} || {N, SS} <- GroupedByNode0,
+        (length(SS) > TargetLevel) orelse (not lists:member(N, Nodes))],
     %% prefer to take from a node with more shards than others
     GroupedByNode2 = lists:sort(fun largest_first/2, GroupedByNode1),
     %% don't take a shard for a range the target already has


[15/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
we're not rotating by DbName any more


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/0efb85e2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/0efb85e2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/0efb85e2

Branch: refs/heads/master
Commit: 0efb85e2d0f96004758a637b5ba764e1fc0e07a5
Parents: f2edc41
Author: Robert Newson <ro...@cloudant.com>
Authored: Tue Jun 25 12:36:59 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3.erl | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/0efb85e2/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index 96e058e..0046fd4 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -265,10 +265,10 @@ choose_ushards(DbName, Shards) ->
     Groups1 = [rotate_list(DbName, order_shards(G)) || G <- Groups0],
     [hd(G) || G <- Groups1].
 
-rotate_list(_DbName, []) ->
+rotate_list(_Key, []) ->
     [];
-rotate_list(DbName, List) ->
-    {H, T} = lists:split(erlang:crc32(DbName) rem length(List), List),
+rotate_list(Key, List) ->
+    {H, T} = lists:split(erlang:crc32(Key) rem length(List), List),
     T ++ H.
 
 order_shards([#ordered_shard{}|_]=OrderedShards) ->


[50/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Update to use couch_stats


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/64c0c747
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/64c0c747
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/64c0c747

Branch: refs/heads/master
Commit: 64c0c7475ccf2ea9b4f87b6d287850d7b230c089
Parents: 5668712
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Aug 21 01:29:28 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Aug 21 01:29:28 2014 -0500

----------------------------------------------------------------------
 priv/stats_descriptions.cfg | 12 ++++++++++++
 src/mem3.app.src            |  3 ++-
 src/mem3_shards.erl         |  3 +++
 3 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/64c0c747/priv/stats_descriptions.cfg
----------------------------------------------------------------------
diff --git a/priv/stats_descriptions.cfg b/priv/stats_descriptions.cfg
new file mode 100644
index 0000000..a564dd9
--- /dev/null
+++ b/priv/stats_descriptions.cfg
@@ -0,0 +1,12 @@
+{[dbcore, mem3, shard_cache, eviction], [
+    {type, counter},
+    {desc, <<"number of shard cache evictions">>}
+]}.
+{[dbcore, mem3, shard_cache, hit], [
+    {type, counter},
+    {desc, <<"number of shard cache hits">>}
+]}.
+{[dbcore, mem3, shard_cache, miss], [
+    {type, counter},
+    {desc, <<"number of shard cache misses">>}
+]}.

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/64c0c747/src/mem3.app.src
----------------------------------------------------------------------
diff --git a/src/mem3.app.src b/src/mem3.app.src
index 79f2119..87eda0d 100644
--- a/src/mem3.app.src
+++ b/src/mem3.app.src
@@ -46,6 +46,7 @@
         couch,
         rexi,
         couch_log,
-        couch_event
+        couch_event,
+        couch_stats
     ]}
 ]}.

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/64c0c747/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index 4557e1c..b0d22ac 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -158,11 +158,14 @@ handle_call(_Call, _From, St) ->
     {noreply, St}.
 
 handle_cast({cache_hit, DbName}, St) ->
+    couch_stats:increment_counter([dbcore, mem3, shard_cache, hit]),
     cache_hit(DbName),
     {noreply, St};
 handle_cast({cache_insert, DbName, Shards}, St) ->
+    couch_stats:increment_counter([dbcore, mem3, shard_cache, miss]),
     {noreply, cache_free(cache_insert(St, DbName, Shards))};
 handle_cast({cache_remove, DbName}, St) ->
+    couch_stats:increment_counter([dbcore, mem3, shard_cache, eviction]),
     {noreply, cache_remove(St, DbName)};
 handle_cast(_Msg, St) ->
     {noreply, St}.


[23/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Allow skip straight to global phase

When rebalancing a DB-per-user cluster with small Q values its typical
that

a) the local phase takes a loooong time, and
b) the local phase doesn't suggest any moves

While the local phase should still run at least once, we'll expose a
flag to skip straight to the global phase since we'll need to run the
plan generator many many times and we can't afford to wait.

BugzID: 24680


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/0b0a7e79
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/0b0a7e79
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/0b0a7e79

Branch: refs/heads/master
Commit: 0b0a7e7935d3e43f378908157a05ac2c74c7a5f5
Parents: 45b040b
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 30 10:00:08 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/0b0a7e79/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 3c06596..7e0826b 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -40,8 +40,12 @@
 expand() ->
     expand(1000).
 
+%% @doc Expands a cluster without requiring each DB to be optimally balanced.
+-spec expand(integer() | global) -> [{atom(), #shard{}, node()}].
+expand(global) ->
+    global_expand(surviving_nodes(), [], 1000);
+
 %% @doc Expands all databases in the cluster, stopping at Limit operations.
--spec expand(integer()) -> [{atom(), #shard{}, node()}].
 expand(Limit) when is_integer(Limit), Limit > 0 ->
     TargetNodes = surviving_nodes(),
     LocalBalanceFun = fun(Db, Moves) -> expand(Db, TargetNodes, Moves) end,


[29/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Inline open_doc_revs into open_docs

This function was trivial and never reused. It was more confusing to
have it as a separate function rather than just inlining into where it's
used.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/36a1e63a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/36a1e63a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/36a1e63a

Branch: refs/heads/master
Commit: 36a1e63ad120dc1c3d1382119395aee42b555100
Parents: fcbc821
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 11:59:45 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rep.erl | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/36a1e63a/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index d01eaf3..4284518 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -204,17 +204,14 @@ save_on_target(Node, Name, Docs) ->
 open_docs(#acc{source=Source, infos=Infos}, Missing) ->
     lists:flatmap(fun({Id, Revs, _}) ->
         FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
-        open_doc_revs(Source, FDI, Revs)
+        RevTree = FDI#full_doc_info.rev_tree,
+        {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
+        lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
+            couch_db:make_doc(Source, Id, IsDel, SummaryPtr, FoundRevPath)
+        end, FoundRevs)
     end, Missing).
 
 
-open_doc_revs(Db, #full_doc_info{id=Id, rev_tree=RevTree}, Revs) ->
-    {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
-    lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
-                  couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath)
-    end, FoundRevs).
-
-
 update_locals(Acc) ->
     #acc{seq=Seq, source=Db, target=Target, localid=Id} = Acc,
     #shard{name=Name, node=Node} = Target,


[40/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Allow target_uuid prefixes in find_source_seq

Since sequence values only contain UUID prefixes so we need to account
for that when locating the replication checkpoints.

BugId: 21973


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/c63fc057
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/c63fc057
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/c63fc057

Branch: refs/heads/master
Commit: c63fc057078544290825172790b3c5c871f75037
Parents: bf07a7c
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 13:52:28 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:50:53 2014 +0100

----------------------------------------------------------------------
 src/mem3_rep.erl | 46 ++++++++++++++++++++++++++++++++++++++++------
 src/mem3_rpc.erl |  1 +
 2 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/c63fc057/src/mem3_rep.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rep.erl b/src/mem3_rep.erl
index bdc6fa4..2186fa3 100644
--- a/src/mem3_rep.erl
+++ b/src/mem3_rep.erl
@@ -135,12 +135,10 @@ make_local_id(SourceThing, TargetThing, Filter) ->
 %% as they've seen updates on this node. We can detect that by
 %% looking for our push replication history and choosing the
 %% largest source_seq that has a target_seq =< TgtSeq.
-find_source_seq(SrcDb, TgtNode, TgtUUID, TgtSeq) ->
-    SrcNode = atom_to_binary(node(), utf8),
-    SrcUUID = couch_db:get_uuid(SrcDb),
-    DocId = make_local_id(SrcUUID, TgtUUID),
-    case couch_db:open_doc(SrcDb, DocId, []) of
-    {ok, Doc} ->
+find_source_seq(SrcDb, TgtNode, TgtUUIDPrefix, TgtSeq) ->
+    case find_repl_doc(SrcDb, TgtUUIDPrefix) of
+    {ok, TgtUUID, Doc} ->
+        SrcNode = atom_to_binary(node(), utf8),
         find_source_seq_int(Doc, SrcNode, TgtNode, TgtUUID, TgtSeq);
     {not_found, _} ->
         0
@@ -302,6 +300,42 @@ update_locals(Acc) ->
     {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
 
 
+find_repl_doc(SrcDb, TgtUUIDPrefix) ->
+    SrcUUID = couch_db:get_uuid(SrcDb),
+    S = couch_util:encodeBase64Url(couch_util:md5(term_to_binary(SrcUUID))),
+    DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>,
+    FoldFun = fun({DocId, {Rev0, {BodyProps}}}, _, _) ->
+        TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>),
+        case is_prefix(DocIdPrefix, DocId) of
+            true ->
+                case is_prefix(TgtUUIDPrefix, TgtUUID) of
+                    true ->
+                        Rev = list_to_binary(integer_to_list(Rev0)),
+                        Doc = #doc{id=DocId, revs={0, [Rev]}, body={BodyProps}},
+                        {stop, {TgtUUID, Doc}};
+                    false ->
+                        {ok, not_found}
+                end;
+            _ ->
+                {stop, not_found}
+        end
+    end,
+    Options = [{start_key, DocIdPrefix}],
+    case couch_btree:fold(SrcDb#db.local_tree, FoldFun, not_found, Options) of
+        {ok, _, {TgtUUID, Doc}} ->
+            {ok, TgtUUID, Doc};
+        {ok, _, not_found} ->
+            {not_found, missing};
+        Else ->
+            twig:log(err, "Error finding replication doc: ~w", [Else]),
+            {not_found, missing}
+    end.
+
+
+is_prefix(Prefix, Subject) ->
+    binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
+
+
 filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
     try Filter(FullDocInfo) of
         discard -> discard;

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/c63fc057/src/mem3_rpc.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rpc.erl b/src/mem3_rpc.erl
index 1e77b57..8d8c832 100644
--- a/src/mem3_rpc.erl
+++ b/src/mem3_rpc.erl
@@ -88,6 +88,7 @@ save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
             ] ++ NewEntry0},
             Body = {[
                 {<<"seq">>, SourceSeq},
+                {<<"target_uuid">>, couch_db:get_uuid(Db)},
                 {<<"history">>, add_checkpoint(NewEntry, History0)}
             ]},
             Doc = #doc{id = Id, body = Body},


[36/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Suggest moves from all donor nodes in parallel

Previously the generator would suggest all moves from the first node
before moving onto the second one.  In the case where the quantum of
jobs is much smaller than the number of moves per node this results in
the other donors being neglected for long periods.

BugzID: 24612


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/f9c06d62
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/f9c06d62
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/f9c06d62

Branch: refs/heads/master
Commit: f9c06d629525873f2130caac81992a0cb42ab01f
Parents: a6ca5c6
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 30 17:40:20 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 87 +++++++++++++++++++++++++--------------------
 1 file changed, 49 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/f9c06d62/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 5589e45..f00201a 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -36,7 +36,7 @@
 -include("mem3.hrl").
 
 -record (gacc, {
-    node,
+    donors,
     targets,
     moves,
     limit,
@@ -136,36 +136,28 @@ global_expand(TargetNodes0, LocalOps, Limit) ->
     end, shard_count_by_node(LocalOps)),
     TotalCount = lists:foldl(fun({_, C}, Sum) -> Sum + C end, 0, CountByNode),
     TargetLevel = TotalCount div length(TargetNodes),
-    FoldFun = fun
-        (_, Acc) when length(Acc) >= Limit ->
-            % We've already accumulated the max number of shard ops.
-            Acc;
-        ({_Node, Count}, Acc) when Count =< TargetLevel ->
-            % This node is not a donor.
-            Acc;
-        ({Node0, Count}, Acc) ->
-            Node = list_to_existing_atom(binary_to_list(Node0)),
-            InternalAcc0 = #gacc{
-                node = Node,
-                targets = TargetNodes0,
-                moves = Acc,
-                limit = erlang:min(Count - TargetLevel, Limit - length(Acc)),
-                target_level = TargetLevel
-            },
-            try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of
-                #gacc{moves = Moves} ->
-                    Moves
-            catch
-                {complete, Moves} ->
-                    Moves
-            end
-    end,
-    lists:foldl(FoldFun, LocalOps, CountByNode).
+    Donors = [{list_to_existing_atom(binary_to_list(N)), C - TargetLevel} ||
+        {N, C} <- CountByNode, C > TargetLevel],
+    InternalAcc0 = #gacc{
+        donors = orddict:from_list(Donors),
+        targets = TargetNodes0,
+        moves = LocalOps,
+        limit = Limit - length(LocalOps),
+        target_level = TargetLevel
+    },
+    try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of
+        #gacc{moves = Moves} ->
+            Moves
+    catch
+        {complete, Moves} ->
+            Moves
+    end.
 
 donate_fold(_Shard, #gacc{limit = 0, moves = Moves}) ->
     throw({complete, Moves});
-donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) ->
+donate_fold(#shard{node = Node} = Shard, Acc0) ->
      #gacc{
+        donors = Donors,
         targets = Nodes,
         moves = Moves,
         limit = DC,
@@ -202,21 +194,40 @@ donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) ->
                 false
         end
     end,
-    case {lists:member(Shard, Shards), lists:dropwhile(Fun, SortedByCount)} of
-        {false, _} ->
-            Acc0;
-        {true, []} ->
-            Acc0;
-        {true, [{Target, _} | _]} ->
-            print({move, Shard, Target}),
-            Acc0#gacc{
-                moves = [{move, Shard, Target} | Moves],
-                limit = DC - 1
-            }
+    case {lists:member(Shard, Shards), lists:keymember(Node, 1, Donors)} of
+        {true, true} ->
+            case lists:dropwhile(Fun, SortedByCount) of
+                [{Target, _} | _] ->
+                    NewMoves = [{move, Shard, Target} | Moves],
+                    print({move, Shard, Target}),
+                    Acc0#gacc{
+                        moves = NewMoves,
+                        limit = DC - 1,
+                        donors = update_donors(Node, Donors, NewMoves)
+                    };
+                [] ->
+                    Acc0
+            end;
+        _ ->
+            Acc0
     end;
 donate_fold(_Shard, Acc) ->
     Acc.
 
+update_donors(Node, Donors, Moves) ->
+    NewDonors = case orddict:fetch(Node, Donors) of
+        1 ->
+            orddict:erase(Node, Donors);
+        X ->
+            orddict:store(Node, X-1, Donors)
+    end,
+    case orddict:size(NewDonors) of
+        0 ->
+            throw({complete, Moves});
+        _ ->
+            NewDonors
+    end.
+
 get_shard_count(AtomKey, ShardsByNode) when is_atom(AtomKey) ->
     length(couch_util:get_value(AtomKey, ShardsByNode, [])).
 


[46/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Get the shard suffix for a given database

This grabs the shards for the given database name, and then pulls out
the first shard and extracts out the suffix. mem3:shards is ets
backed, so in the general case this should be fast.

BugzId: 29571


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/282e5057
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/282e5057
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/282e5057

Branch: refs/heads/master
Commit: 282e5057afe90e53ec36f5a789a7e26cbd76863b
Parents: 7044f6c
Author: Russell Branca <ch...@gmail.com>
Authored: Tue Apr 29 16:36:06 2014 -0700
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:52:00 2014 +0100

----------------------------------------------------------------------
 src/mem3.erl | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/282e5057/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index cc76454..c713161 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -14,7 +14,7 @@
 
 -export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2,
     choose_shards/2, n/1, dbname/1, ushards/1]).
--export([get_shard/3, local_shards/1, fold_shards/2]).
+-export([get_shard/3, local_shards/1, shard_suffix/1, fold_shards/2]).
 -export([sync_security/0, sync_security/1]).
 -export([compare_nodelists/0, compare_shards/1]).
 -export([quorum/1, group_by_proximity/1]).
@@ -142,6 +142,14 @@ get_shard(DbName, Node, Range) ->
 local_shards(DbName) ->
     mem3_shards:local(DbName).
 
+shard_suffix(#db{name=DbName}) ->
+    shard_suffix(DbName);
+shard_suffix(DbName0) ->
+    Shard = hd(shards(DbName0)),
+    <<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>> =
+        Shard#shard.name,
+    filename:extension(binary_to_list(DbName)).
+
 fold_shards(Fun, Acc) ->
     mem3_shards:fold(Fun, Acc).
 


[21/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Rely on decom:true attribute to filter decom nodes

BugzID: 24420


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/d7a4f26d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/d7a4f26d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/d7a4f26d

Branch: refs/heads/master
Commit: d7a4f26d4e8aa56a2a38ebfbdd46f13efde27a22
Parents: 19c3ee2
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 23 10:02:33 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/d7a4f26d/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 4d8d069..3c06596 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -43,14 +43,14 @@ expand() ->
 %% @doc Expands all databases in the cluster, stopping at Limit operations.
 -spec expand(integer()) -> [{atom(), #shard{}, node()}].
 expand(Limit) when is_integer(Limit), Limit > 0 ->
-    TargetNodes = allowed_nodes(fun(Zone) -> Zone =/= <<"decom">> end),
+    TargetNodes = surviving_nodes(),
     LocalBalanceFun = fun(Db, Moves) -> expand(Db, TargetNodes, Moves) end,
     LocalBalanceOps = apply_to_cluster(LocalBalanceFun, Limit),
     % Now apply additional operations as needed to achieve global balance.
     global_expand(TargetNodes, LocalBalanceOps, Limit);
 
 expand(DbName) when is_binary(DbName); is_list(DbName) ->
-    TargetNodes = allowed_nodes(fun(Zone) -> Zone =/= <<"decom">> end),
+    TargetNodes = surviving_nodes(),
     expand(DbName, TargetNodes, []).
 
 %% @doc Computes a plan to balance the shards across the target nodes.
@@ -73,11 +73,11 @@ contract() ->
 %% @doc Computes a plan to remove up to Limit shards from nodes in "decom" zone.
 -spec contract(integer()) -> [{atom(), #shard{}, node()}].
 contract(Limit) when is_integer(Limit), Limit > 0 ->
-    TargetNodes = allowed_nodes(fun(Zone) -> Zone =/= <<"decom">> end),
+    TargetNodes = surviving_nodes(),
     apply_to_cluster(fun(Db, Moves) -> contract(Db, TargetNodes, Moves) end, Limit);
 
 contract(DbName) when is_binary(DbName); is_list(DbName) ->
-    TargetNodes = allowed_nodes(fun(Zone) -> Zone =/= <<"decom">> end),
+    TargetNodes = surviving_nodes(),
     contract(DbName, TargetNodes, []).
 
 %% @doc Computes a plan to consolidate shards from a single database onto the
@@ -339,6 +339,11 @@ allowed_nodes(Fun) ->
         Fun(mem3:node_info(Node, <<"zone">>))
     end, mem3:nodes()).
 
+surviving_nodes() ->
+    lists:filter(fun(Node) ->
+        mem3:node_info(Node, <<"decom">>) =/= true
+    end, mem3:nodes()).
+
 shards_by_node(Shards, Nodes) ->
     % Ensure every target node is present in the orddict
     ShardsByNode0 = orddict:from_list([{N,[]} || N <- Nodes]),


[20/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Fix latent single-shard range hack

We had an off-by-one error when we fake #shard{} records for node local
databases. This fixes the issue. The bug was noticeable when attempting
to pass these shards to `fabric_view:is_progress_possible/1`.

BugzId: 22809


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/2f62e2e5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/2f62e2e5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/2f62e2e5

Branch: refs/heads/master
Commit: 2f62e2e5586978bd39d59bad060d5168659e6dc5
Parents: a29e6b7
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 5 14:08:48 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:26 2014 +0100

----------------------------------------------------------------------
 src/mem3.erl | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/2f62e2e5/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index 02e929f..8bbbbae 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -94,7 +94,7 @@ shards_int(DbName, Options) ->
             node = node(),
             name = ShardDbName,
             dbname = ShardDbName,
-            range = [0, 2 bsl 31],
+            range = [0, (2 bsl 31)-1],
             order = undefined}];
     ShardDbName ->
         %% shard_db is treated as a single sharded db to support calls to db_info
@@ -103,7 +103,7 @@ shards_int(DbName, Options) ->
             node = node(),
             name = ShardDbName,
             dbname = ShardDbName,
-            range = [0, 2 bsl 31]}];
+            range = [0, (2 bsl 31)-1]}];
     _ ->
         mem3_shards:for_db(DbName, Options)
     end.


[10/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Update to use the new couch_event application


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/8f9f58f8
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/8f9f58f8
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/8f9f58f8

Branch: refs/heads/master
Commit: 8f9f58f87678f589be884f42c7ec92762534583e
Parents: f9c2276
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Apr 23 17:26:30 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3.app.src  |  3 ++-
 src/mem3_sync.erl | 70 ++++++++++++++++++++++++++++----------------------
 2 files changed, 41 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/8f9f58f8/src/mem3.app.src
----------------------------------------------------------------------
diff --git a/src/mem3.app.src b/src/mem3.app.src
index ee8ba56..79f2119 100644
--- a/src/mem3.app.src
+++ b/src/mem3.app.src
@@ -45,6 +45,7 @@
         mochiweb,
         couch,
         rexi,
-        couch_log
+        couch_log,
+        couch_event
     ]}
 ]}.

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/8f9f58f8/src/mem3_sync.erl
----------------------------------------------------------------------
diff --git a/src/mem3_sync.erl b/src/mem3_sync.erl
index 579c515..d85cb2f 100644
--- a/src/mem3_sync.erl
+++ b/src/mem3_sync.erl
@@ -18,6 +18,8 @@
 -export([start_link/0, get_active/0, get_queue/0, push/1, push/2,
     remove_node/1, initial_sync/1, get_backlog/0]).
 
+-export([handle_db_event/3]).
+
 -import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]).
 
 -include_lib("mem3/include/mem3.hrl").
@@ -29,7 +31,7 @@
     limit,
     dict = dict:new(),
     waiting = queue:new(),
-    update_notifier
+    event_listener
 }).
 
 -record(job, {name, node, count=nil, pid=nil}).
@@ -65,9 +67,9 @@ init([]) ->
     process_flag(trap_exit, true),
     Concurrency = config:get("mem3", "sync_concurrency", "10"),
     gen_event:add_handler(mem3_events, mem3_sync_event, []),
-    {ok, Pid} = start_update_notifier(),
+    {ok, Pid} = start_event_listener(),
     initial_sync(),
-    {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}.
+    {ok, #state{limit = list_to_integer(Concurrency), event_listener=Pid}}.
 
 handle_call({push, Job}, From, State) ->
     handle_cast({push, Job#job{pid = From}}, State);
@@ -116,9 +118,9 @@ handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) ->
         S =:= Shard],
     {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}.
 
-handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) ->
-    {ok, NewPid} = start_update_notifier(),
-    {noreply, State#state{update_notifier=NewPid}};
+handle_info({'EXIT', Pid, _}, #state{event_listener=Pid} = State) ->
+    {ok, NewPid} = start_event_listener(),
+    {noreply, State#state{event_listener=NewPid}};
 
 handle_info({'EXIT', Active, normal}, State) ->
     handle_replication_exit(State, Active);
@@ -262,32 +264,38 @@ submit_replication_tasks(LocalNode, Live, Shards) ->
 sync_push(ShardName, N) ->
     gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity).
 
-start_update_notifier() ->
-    Db1 = nodes_db(),
-    Db2 = shards_db(),
-    Db3 = users_db(),
-    couch_db_update_notifier:start_link(fun
-    ({updated, Db}) when Db == Db1 ->
-        Nodes = mem3:nodes(),
+start_event_listener() ->
+    State = {nodes_db(), shards_db(), users_db()},
+    couch_event:link_listener(?MODULE, handle_db_event, State, [all_dbs]).
+
+handle_db_event(NodesDb, updated, {NodesDb, _, _}=St) ->
+    Nodes = mem3:nodes(),
+    Live = nodes(),
+    [?MODULE:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)],
+    {ok, St};
+handle_db_event(ShardsDb, updated, {_, ShardsDb, _}=St) ->
+    ?MODULE:push(ShardsDb, find_next_node()),
+    {ok, St};
+handle_db_event(UsersDb, updated, {_, _, UsersDb}=St) ->
+    ?MODULE:push(UsersDb, find_next_node()),
+    {ok, St};
+handle_db_event(<<"shards/", _/binary>> = ShardName, updated, St) ->
+    try mem3:shards(mem3:dbname(ShardName)) of
+    Shards ->
+        Targets = [S || #shard{node=N, name=Name} = S <- Shards,
+            N =/= node(), Name =:= ShardName],
         Live = nodes(),
-        [?MODULE:push(Db1, N) || N <- Nodes, lists:member(N, Live)];
-    ({updated, Db}) when Db == Db2; Db == Db3 ->
-        ?MODULE:push(Db, find_next_node());
-    ({updated, <<"shards/", _/binary>> = ShardName}) ->
-        % TODO deal with split/merged partitions by comparing keyranges
-        try mem3:shards(mem3:dbname(ShardName)) of
-        Shards ->
-            Targets = [S || #shard{node=N, name=Name} = S <- Shards,
-                N =/= node(), Name =:= ShardName],
-            Live = nodes(),
-            [?MODULE:push(ShardName,N) || #shard{node=N} <- Targets,
-                lists:member(N, Live)]
-        catch error:database_does_not_exist ->
-            ok
-        end;
-    ({deleted, <<"shards/", _:18/binary, _/binary>> = ShardName}) ->
-        gen_server:cast(?MODULE, {remove_shard, ShardName});
-    (_) -> ok end).
+        [?MODULE:push(ShardName,N) || #shard{node=N} <- Targets,
+            lists:member(N, Live)]
+    catch error:database_does_not_eist ->
+        ok
+    end,
+    {ok, St};
+handle_db_event(<<"shards/", _:18/binary, _/binary>> =ShardName, deleted, St) ->
+    gen_server:cast(?MODULE, {remove_shard, ShardName}),
+    {ok, St};
+handle_db_event(_DbName, _Event, St) ->
+    {ok, St}.
 
 find_next_node() ->
     LiveNodes = [node()|nodes()],


[48/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Update mem3_rebalance to work with couch_mrview


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/b3ddfca9
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/b3ddfca9
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/b3ddfca9

Branch: refs/heads/master
Commit: b3ddfca9ddf12061cf9202026da3ede33d78ce07
Parents: ff02b9a
Author: Paul J. Davis <pa...@gmail.com>
Authored: Sun Aug 17 13:54:51 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Sun Aug 17 13:54:51 2014 -0500

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 29 ++++++++++++++---------------
 1 file changed, 14 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/b3ddfca9/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 3972d89..1c1ed64 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -458,24 +458,23 @@ shard_count_by_node(PrevMoves) ->
 shard_count_view() ->
     %% TODO rewrite CouchDB's internal view API.  Wow!
     {ok, Db} = couch_db:open(<<"dbs">>, []),
-    {ok, DDoc} = couch_db:open_doc(Db, <<"_design/rebalance">>, []),
-    Group0 = couch_view_group:design_doc_to_view_group(DDoc),
-    {ok, Pid} = gen_server:call(couch_view, {get_group_server, <<"dbs">>, Group0}),
-    {ok, Group} = couch_view_group:request_group(Pid, 0),
-    Lang = couch_view_group:get_language(Group),
-    Views = couch_view_group:get_views(Group),
-    Ref = erlang:monitor(process, couch_view_group:get_fd(Group)),
-    {IRed, View} = fabric_view:extract_view(Pid, <<"count_by_node">>, Views, reduce),
-    ReduceView = {reduce, IRed, Lang, View},
-    Options = [{key_group_level, exact}],
-    Fold = fun(Node, Count, Acc) -> {ok, [{Node, Count} | Acc]} end,
-    %% Workaround for problems where we hold onto bad collators in the shell
-    erlang:erase(couch_drv_port),
-    {ok, Map} = couch_view:fold_reduce(ReduceView, Fold, [], Options),
+    DDocId = <<"_design/rebalance">>,
+    Fold = fun view_cb/2,
+    Args = [{group_level, exact}],
+    {ok, Map} = couch_mrview:query_view(
+            Db, DDocId, <<"count_by_node">>, Fold, [], Args),
     erlang:put(shard_count_by_node, {os:timestamp(), Map}),
-    erlang:demonitor(Ref),
     Map.
 
+view_cb({meta, _}, Acc) ->
+    {ok, Acc};
+view_cb({row, Row}, Acc) ->
+    {key, Node} = lists:keyfind(key, 1, Row),
+    {value, Count} = lists:keyfind(value, 1, Row),
+    {ok, [{Node, Count} | Acc]};
+view_cb(complete, Acc) ->
+    {ok, lists:reverse(Acc)}.
+
 print({Op, Shard, TargetNode} = Operation) ->
     {match, [SourceId, Cluster]} = re:run(
         atom_to_list(Shard#shard.node),


[37/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Allow for rebalancing "special" DBs

For example, _replicator or _users.

BugzID: 24612


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/781dc89a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/781dc89a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/781dc89a

Branch: refs/heads/master
Commit: 781dc89ac7c67264d4985418e394b5b668228f60
Parents: f9c06d6
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 30 17:43:40 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/781dc89a/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index f00201a..d85e8c2 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -481,7 +481,7 @@ print({Op, Shard, TargetNode} = Operation) ->
     ),
     {match, [Range, Account, DbName]} = re:run(
         Shard#shard.name,
-        "shards/(?<range>[0-9a-f\-]+)/(?<account>.+)/(?<dbname>[a-z][a-z0-9\\_\\$()\\+\\-\\/]+)\.[0-9]{8}",
+        "shards/(?<range>[0-9a-f\-]+)/(?<account>.+)/(?<dbname>[a-z\\_][a-z0-9\\_\\$()\\+\\-\\/]+)\.[0-9]{8}",
         [{capture, all_but_first, binary}]
     ),
     OpName = case Op of move -> move2; _ -> Op end,


[16/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
refactor choose_ushards


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/f2edc418
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/f2edc418
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/f2edc418

Branch: refs/heads/master
Commit: f2edc4181127c44f1aa06cbdfdb775a3bc19bd38
Parents: 5aa38c6
Author: Robert Newson <ro...@cloudant.com>
Authored: Tue Jun 25 12:20:59 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:25 2014 +0100

----------------------------------------------------------------------
 src/mem3.erl | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/f2edc418/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index 8f8808b..96e058e 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -261,8 +261,9 @@ group_by_proximity(Shards, ZoneMap) ->
     {Local, SameZone, DifferentZone}.
 
 choose_ushards(DbName, Shards) ->
-    [hd(G) || G <- [rotate_list(DbName, order_shards(G)) ||
-        G <- group_by_range(Shards)]].
+    Groups0 = group_by_range(Shards),
+    Groups1 = [rotate_list(DbName, order_shards(G)) || G <- Groups0],
+    [hd(G) || G <- Groups1].
 
 rotate_list(_DbName, []) ->
     [];


[47/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Remove mem3_util:owner


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/ff02b9a1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/ff02b9a1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/ff02b9a1

Branch: refs/heads/master
Commit: ff02b9a1f29da9ab7dc1e3185d73ea16d59d744e
Parents: 282e505
Author: Robert Newson <rn...@apache.org>
Authored: Mon Jun 16 13:04:36 2014 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:52:09 2014 +0100

----------------------------------------------------------------------
 src/mem3_util.erl | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/ff02b9a1/src/mem3_util.erl
----------------------------------------------------------------------
diff --git a/src/mem3_util.erl b/src/mem3_util.erl
index be7302d..9c7449d 100644
--- a/src/mem3_util.erl
+++ b/src/mem3_util.erl
@@ -15,7 +15,7 @@
 -export([hash/1, name_shard/2, create_partition_map/5, build_shards/2,
     n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
     shard_info/1, ensure_exists/1, open_db_doc/1]).
--export([owner/2, is_deleted/1, rotate_list/2]).
+-export([is_deleted/1, rotate_list/2]).
 
 %% do not use outside mem3.
 -export([build_ordered_shards/2, downcast/1]).
@@ -223,10 +223,6 @@ ensure_exists(DbName) ->
     end.
 
 
-owner(DbName, DocId) ->
-    Nodes = lists:sort([N || #shard{node=N} <- mem3:shards(DbName, DocId)]),
-    node() =:= hd(rotate_list({DbName, DocId}, Nodes)).
-
 is_deleted(Change) ->
     case couch_util:get_value(<<"deleted">>, Change) of
     undefined ->


[41/50] mem3 commit: updated refs/heads/master to 64c0c74

Posted by rn...@apache.org.
Write plan to /tmp/rebalance_plan.txt

Was a request by @mattwhite to help with automation.  I was fairly
sloppy in the implementation here, could leave this off and do a better
job next time.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/e147621c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/e147621c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/e147621c

Branch: refs/heads/master
Commit: e147621ce9b4703228fc548280503a64eb65750b
Parents: c63fc05
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Oct 31 12:23:49 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:51:01 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/e147621c/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index d85e8c2..54dcd1c 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -51,10 +51,14 @@ expand() ->
 %% @doc Expands a cluster without requiring each DB to be optimally balanced.
 -spec expand(integer() | global) -> [{atom(), #shard{}, node()}].
 expand(global) ->
+    {ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]),
+    erlang:put(fd, FD),
     global_expand(surviving_nodes(), [], 1000);
 
 %% @doc Expands all databases in the cluster, stopping at Limit operations.
 expand(Limit) when is_integer(Limit), Limit > 0 ->
+    {ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]),
+    erlang:put(fd, FD),
     TargetNodes = surviving_nodes(),
     LocalBalanceFun = fun(Db, Moves) -> expand(Db, TargetNodes, Moves) end,
     LocalBalanceOps = apply_to_cluster(LocalBalanceFun, Limit),
@@ -85,6 +89,8 @@ contract() ->
 %% @doc Computes a plan to remove up to Limit shards from nodes in "decom" zone.
 -spec contract(integer()) -> [{atom(), #shard{}, node()}].
 contract(Limit) when is_integer(Limit), Limit > 0 ->
+    {ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]),
+    erlang:put(fd, FD),
     TargetNodes = surviving_nodes(),
     apply_to_cluster(fun(Db, Moves) -> contract(Db, TargetNodes, Moves) end, Limit);
 
@@ -111,6 +117,8 @@ fix_zoning() ->
 %%      levels and improper zoning.
 -spec fix_zoning(integer()) -> [{atom(), #shard{}, node()}].
 fix_zoning(Limit) when is_integer(Limit), Limit > 0 ->
+    {ok, FD} = file:open("/tmp/rebalance_plan.txt", [write]),
+    erlang:put(fd, FD),
     apply_to_cluster(fun fix_zoning/2, Limit);
 
 fix_zoning(DbName) when is_binary(DbName); is_list(DbName) ->
@@ -485,8 +493,14 @@ print({Op, Shard, TargetNode} = Operation) ->
         [{capture, all_but_first, binary}]
     ),
     OpName = case Op of move -> move2; _ -> Op end,
-    io:format("clou shard ~s ~s ~s ~s ~s ~s ~s~n", [OpName, Cluster, Account, DbName,
-         Range, SourceId, TargetId]),
+    case get(fd) of
+        undefined ->
+            io:format("clou shard ~s ~s ~s ~s ~s ~s ~s~n", [OpName,
+                 Cluster, Account, DbName, Range, SourceId, TargetId]);
+        FD ->
+            io:format(FD, "clou shard ~s ~s ~s ~s ~s ~s ~s~n", [OpName,
+                 Cluster, Account, DbName, Range, SourceId, TargetId])
+    end,
     Operation;
 
 print(Operations) when is_list(Operations) ->