You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:10:52 UTC

[06/48] mem3 commit: updated refs/heads/windsor-merge to ff02b9a

Choose ushards according to persistent record

The order of nodes in the by_range section of "dbs" documents is now
promoted to the principal order for ushards. Ushards still accounts
for Liveness, selecting the first live replica and still supports
Spread by rotating this list using the CRC32 of the database name
(since many databases will have the same layout).

If by_range and by_node are not symmetrical then by_node is used and
order is undefined to match existing behavior.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/f9c22769
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/f9c22769
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/f9c22769

Branch: refs/heads/windsor-merge
Commit: f9c22769ea089812e401554f06626f7d0d1dac4b
Parents: a0d7430
Author: Robert Newson <ro...@cloudant.com>
Authored: Tue Apr 23 22:54:48 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:19 2014 +0100

----------------------------------------------------------------------
 include/mem3.hrl    | 10 +++++++
 src/mem3.erl        | 78 +++++++++++++++++++++++++++++++++++-------------
 src/mem3_shards.erl | 45 +++++++++++++++++++++-------
 src/mem3_util.erl   | 66 +++++++++++++++++++++++++++++++++++++---
 4 files changed, 164 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/f9c22769/include/mem3.hrl
----------------------------------------------------------------------
diff --git a/include/mem3.hrl b/include/mem3.hrl
index cb39e78..7c20061 100644
--- a/include/mem3.hrl
+++ b/include/mem3.hrl
@@ -19,6 +19,16 @@
     ref :: reference() | 'undefined' | '_'
 }).
 
+%% Do not reference outside of mem3.
+-record(ordered_shard, {
+    name :: binary() | '_',
+    node :: node() | '_',
+    dbname :: binary(),
+    range :: [non_neg_integer() | '$1' | '$2'],
+    ref :: reference() | 'undefined' | '_',
+    order :: non_neg_integer() | 'undefined' | '_'
+}).
+
 %% types
 -type join_type() :: init | join | replace | leave.
 -type join_order() :: non_neg_integer().

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/f9c22769/src/mem3.erl
----------------------------------------------------------------------
diff --git a/src/mem3.erl b/src/mem3.erl
index 36ff87b..8f8808b 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -21,6 +21,9 @@
 -export([live_shards/2]).
 -export([belongs/2]).
 
+%% For mem3 use only.
+-export([name/1, node/1, range/1]).
+
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
@@ -74,12 +77,25 @@ node_info(Node, Key) ->
     mem3_nodes:get_node_info(Node, Key).
 
 -spec shards(DbName::iodata()) -> [#shard{}].
-shards(DbName) when is_list(DbName) ->
-    shards(list_to_binary(DbName));
 shards(DbName) ->
+    shards_int(DbName, []).
+
+shards_int(DbName, Options) when is_list(DbName) ->
+    shards_int(list_to_binary(DbName), Options);
+shards_int(DbName, Options) ->
+    Ordered = lists:member(ordered, Options),
     ShardDbName =
         list_to_binary(config:get("mem3", "shard_db", "dbs")),
     case DbName of
+    ShardDbName when Ordered ->
+        %% shard_db is treated as a single sharded db to support calls to db_info
+        %% and view_all_docs
+        [#ordered_shard{
+            node = node(),
+            name = ShardDbName,
+            dbname = ShardDbName,
+            range = [0, 2 bsl 31],
+            order = undefined}];
     ShardDbName ->
         %% shard_db is treated as a single sharded db to support calls to db_info
         %% and view_all_docs
@@ -89,22 +105,27 @@ shards(DbName) ->
             dbname = ShardDbName,
             range = [0, 2 bsl 31]}];
     _ ->
-        mem3_shards:for_db(DbName)
+        mem3_shards:for_db(DbName, Options)
     end.
 
 -spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}].
-shards(DbName, DocId) when is_list(DbName) ->
-    shards(list_to_binary(DbName), DocId);
-shards(DbName, DocId) when is_list(DocId) ->
-    shards(DbName, list_to_binary(DocId));
 shards(DbName, DocId) ->
-    mem3_shards:for_docid(DbName, DocId).
+    shards_int(DbName, DocId, []).
+
+shards_int(DbName, DocId, Options) when is_list(DbName) ->
+    shards_int(list_to_binary(DbName), DocId, Options);
+shards_int(DbName, DocId, Options) when is_list(DocId) ->
+    shards_int(DbName, list_to_binary(DocId), Options);
+shards_int(DbName, DocId, Options) ->
+    mem3_shards:for_docid(DbName, DocId, Options).
+
 
 -spec ushards(DbName::iodata()) -> [#shard{}].
 ushards(DbName) ->
     Nodes = [node()|erlang:nodes()],
     ZoneMap = zone_map(Nodes),
-    ushards(DbName, live_shards(DbName, Nodes), ZoneMap).
+    Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap),
+    mem3_util:downcast(Shards).
 
 ushards(DbName, Shards0, ZoneMap) ->
     {L,S,D} = group_by_proximity(Shards0, ZoneMap),
@@ -209,6 +230,8 @@ belongs(Begin, End, DocId) ->
 
 range(#shard{range = Range}) ->
     Range;
+range(#ordered_shard{range = Range}) ->
+    Range;
 range(<<"shards/", Start:8/binary, "-", End:8/binary, "/", _/binary>>) ->
     [httpd_util:hexlist_to_integer(binary_to_list(Start)),
      httpd_util:hexlist_to_integer(binary_to_list(End))].
@@ -217,29 +240,29 @@ nodes_in_zone(Nodes, Zone) ->
     [Node || Node <- Nodes, Zone == mem3:node_info(Node, <<"zone">>)].
 
 live_shards(DbName, Nodes) ->
-    [S || #shard{node=Node} = S <- shards(DbName), lists:member(Node, Nodes)].
+    live_shards(DbName, Nodes, []).
+
+live_shards(DbName, Nodes, Options) ->
+    [S || S <- shards_int(DbName, Options), lists:member(mem3:node(S), Nodes)].
 
 zone_map(Nodes) ->
     [{Node, node_info(Node, <<"zone">>)} || Node <- Nodes].
 
 group_by_proximity(Shards) ->
-    Nodes = [N || #shard{node=N} <- lists:ukeysort(#shard.node, Shards)],
+    Nodes = [mem3:node(S) || S <- lists:ukeysort(#shard.node, Shards)],
     group_by_proximity(Shards, zone_map(Nodes)).
 
 group_by_proximity(Shards, ZoneMap) ->
-    {Local, Remote} = lists:partition(fun(S) -> S#shard.node =:= node() end,
+    {Local, Remote} = lists:partition(fun(S) -> mem3:node(S) =:= node() end,
         Shards),
     LocalZone = proplists:get_value(node(), ZoneMap),
-    Fun = fun(S) -> proplists:get_value(S#shard.node, ZoneMap) =:= LocalZone end,
+    Fun = fun(S) -> proplists:get_value(mem3:node(S), ZoneMap) =:= LocalZone end,
     {SameZone, DifferentZone} = lists:partition(Fun, Remote),
     {Local, SameZone, DifferentZone}.
 
 choose_ushards(DbName, Shards) ->
-    Groups = group_by_range(rotate_list(DbName, lists:sort(Shards))),
-    Fun = fun(Group, {N, Acc}) ->
-        {N+1, [lists:nth(1 + N rem length(Group), Group) | Acc]} end,
-    {_, Result} = lists:foldl(Fun, {0, []}, Groups),
-    Result.
+    [hd(G) || G <- [rotate_list(DbName, order_shards(G)) ||
+        G <- group_by_range(Shards)]].
 
 rotate_list(_DbName, []) ->
     [];
@@ -247,9 +270,14 @@ rotate_list(DbName, List) ->
     {H, T} = lists:split(erlang:crc32(DbName) rem length(List), List),
     T ++ H.
 
+order_shards([#ordered_shard{}|_]=OrderedShards) ->
+    lists:keysort(#ordered_shard.order, OrderedShards);
+order_shards(UnorderedShards) ->
+    UnorderedShards.
+
 group_by_range(Shards) ->
-    Groups0 = lists:foldl(fun(#shard{range=Range}=Shard, Dict) ->
-        orddict:append(Range, Shard, Dict) end, orddict:new(), Shards),
+    Groups0 = lists:foldl(fun(Shard, Dict) ->
+        orddict:append(mem3:range(Shard), Shard, Dict) end, orddict:new(), Shards),
     {_, Groups} = lists:unzip(Groups0),
     Groups.
 
@@ -259,3 +287,13 @@ quorum(#db{name=DbName}) ->
     quorum(DbName);
 quorum(DbName) ->
     n(DbName) div 2 + 1.
+
+node(#shard{node=Node}) ->
+    Node;
+node(#ordered_shard{node=Node}) ->
+    Node.
+
+name(#shard{name=Name}) ->
+    Name;
+name(#ordered_shard{name=Name}) ->
+    Name.

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/f9c22769/src/mem3_shards.erl
----------------------------------------------------------------------
diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl
index 9315c5c..6127ff6 100644
--- a/src/mem3_shards.erl
+++ b/src/mem3_shards.erl
@@ -19,7 +19,7 @@
 -export([handle_config_change/5]).
 
 -export([start_link/0]).
--export([for_db/1, for_docid/2, get/3, local/1, fold/2]).
+-export([for_db/1, for_db/2, for_docid/2, for_docid/3, get/3, local/1, fold/2]).
 -export([set_max_size/1]).
 
 -record(st, {
@@ -39,7 +39,10 @@ start_link() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 for_db(DbName) ->
-    try ets:lookup(?SHARDS, DbName) of
+    for_db(DbName, []).
+
+for_db(DbName, Options) ->
+    Shards = try ets:lookup(?SHARDS, DbName) of
         [] ->
             load_shards_from_disk(DbName);
         Else ->
@@ -47,26 +50,47 @@ for_db(DbName) ->
             Else
     catch error:badarg ->
         load_shards_from_disk(DbName)
+    end,
+    case lists:member(ordered, Options) of
+        true  -> Shards;
+        false -> mem3_util:downcast(Shards)
     end.
 
 for_docid(DbName, DocId) ->
+    for_docid(DbName, DocId, []).
+
+for_docid(DbName, DocId, Options) ->
     HashKey = mem3_util:hash(DocId),
-    Head = #shard{
+    ShardHead = #shard{
         name = '_',
         node = '_',
         dbname = DbName,
         range = ['$1','$2'],
         ref = '_'
     },
+    OrderedShardHead = #ordered_shard{
+        name = '_',
+        node = '_',
+        dbname = DbName,
+        range = ['$1','$2'],
+        ref = '_',
+        order = '_'
+    },
     Conditions = [{'=<', '$1', HashKey}, {'=<', HashKey, '$2'}],
-    try ets:select(?SHARDS, [{Head, Conditions, ['$_']}]) of
+    ShardSpec = {ShardHead, Conditions, ['$_']},
+    OrderedShardSpec = {OrderedShardHead, Conditions, ['$_']},
+    Shards = try ets:select(?SHARDS, [ShardSpec, OrderedShardSpec]) of
         [] ->
             load_shards_from_disk(DbName, DocId);
-        Shards ->
+        Else ->
             gen_server:cast(?MODULE, {cache_hit, DbName}),
-            Shards
+            Else
     catch error:badarg ->
         load_shards_from_disk(DbName, DocId)
+    end,
+    case lists:member(ordered, Options) of
+        true  -> Shards;
+        false -> mem3_util:downcast(Shards)
     end.
 
 get(DbName, Node, Range) ->
@@ -221,10 +245,10 @@ changes_callback({change, {Change}, _}, _) ->
                 couch_log:error("missing partition table for ~s: ~p",
                     [DbName, Reason]);
             {Doc} ->
-                Shards = mem3_util:build_shards(DbName, Doc),
+                Shards = mem3_util:build_ordered_shards(DbName, Doc),
                 gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
-                [create_if_missing(Name) || #shard{name=Name, node=Node}
-                    <- Shards, Node =:= node()]
+                [create_if_missing(mem3:name(S)) || S
+                    <- Shards, mem3:node(S) =:= node()]
             end
         end
     end,
@@ -244,7 +268,7 @@ load_shards_from_disk(DbName) when is_binary(DbName) ->
 load_shards_from_db(#db{} = ShardDb, DbName) ->
     case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of
     {ok, #doc{body = {Props}}} ->
-        Shards = mem3_util:build_shards(DbName, Props),
+        Shards = mem3_util:build_ordered_shards(DbName, Props),
         gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
         Shards;
     {not_found, _} ->
@@ -326,4 +350,3 @@ cache_clear(St) ->
     true = ets:delete_all_objects(?SHARDS),
     true = ets:delete_all_objects(?ATIMES),
     St#st{cur_size=0}.
-

http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/f9c22769/src/mem3_util.erl
----------------------------------------------------------------------
diff --git a/src/mem3_util.erl b/src/mem3_util.erl
index 6ed8924..a63d9a0 100644
--- a/src/mem3_util.erl
+++ b/src/mem3_util.erl
@@ -17,6 +17,9 @@
     shard_info/1, ensure_exists/1, open_db_doc/1]).
 -export([owner/2, is_deleted/1]).
 
+%% do not use outside mem3.
+-export([build_ordered_shards/2, downcast/1]).
+
 -export([create_partition_map/4, name_shard/1]).
 -deprecated({create_partition_map, 4, eventually}).
 -deprecated({name_shard, 1, eventually}).
@@ -34,10 +37,17 @@ hash(Item) ->
 name_shard(Shard) ->
     name_shard(Shard, "").
 
-name_shard(#shard{dbname = DbName, range=[B,E]} = Shard, Suffix) ->
-    Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
-        couch_util:to_hex(<<E:32/integer>>), "/", DbName, Suffix],
-    Shard#shard{name = ?l2b(Name)}.
+name_shard(#shard{dbname = DbName, range=Range} = Shard, Suffix) ->
+    Name = make_name(DbName, Range, Suffix),
+    Shard#shard{name = ?l2b(Name)};
+
+name_shard(#ordered_shard{dbname = DbName, range=Range} = Shard, Suffix) ->
+    Name = make_name(DbName, Range, Suffix),
+    Shard#ordered_shard{name = ?l2b(Name)}.
+
+make_name(DbName, [B,E], Suffix) ->
+    ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
+     couch_util:to_hex(<<E:32/integer>>), "/", DbName, Suffix].
 
 create_partition_map(DbName, N, Q, Nodes) ->
     create_partition_map(DbName, N, Q, Nodes, "").
@@ -122,7 +132,25 @@ delete_db_doc(DbName, DocId, ShouldMutate) ->
         couch_db:close(Db)
     end.
 
+%% Always returns original #shard records.
+-spec build_shards(binary(), list()) -> [#shard{}].
 build_shards(DbName, DocProps) ->
+    build_shards_by_node(DbName, DocProps).
+
+%% Will return #ordered_shard records if by_node and by_range
+%% are symmetrical, #shard records otherwise.
+-spec build_ordered_shards(binary(), list()) ->
+    [#shard{}] | [#ordered_shard{}].
+build_ordered_shards(DbName, DocProps) ->
+    ByNode = build_shards_by_node(DbName, DocProps),
+    ByRange = build_shards_by_range(DbName, DocProps),
+    Symmetrical = lists:sort(ByNode) =:= lists:sort(downcast(ByRange)),
+    case Symmetrical of
+        true  -> ByRange;
+        false -> ByNode
+    end.
+
+build_shards_by_node(DbName, DocProps) ->
     {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
     Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
     lists:flatmap(fun({Node, Ranges}) ->
@@ -138,6 +166,23 @@ build_shards(DbName, DocProps) ->
         end, Ranges)
     end, ByNode).
 
+build_shards_by_range(DbName, DocProps) ->
+    {ByRange} = couch_util:get_value(<<"by_range">>, DocProps, {[]}),
+    Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+    lists:flatmap(fun({Range, Nodes}) ->
+        lists:map(fun({Node, Order}) ->
+            [B,E] = string:tokens(?b2l(Range), "-"),
+            Beg = httpd_util:hexlist_to_integer(B),
+            End = httpd_util:hexlist_to_integer(E),
+            name_shard(#ordered_shard{
+                dbname = DbName,
+                node = to_atom(Node),
+                range = [Beg, End],
+                order = Order
+            }, Suffix)
+        end, lists:zip(Nodes, lists:seq(1, length(Nodes))))
+    end, ByRange).
+
 to_atom(Node) when is_binary(Node) ->
     list_to_atom(binary_to_list(Node));
 to_atom(Node) when is_atom(Node) ->
@@ -194,3 +239,16 @@ is_deleted(Change) ->
     Else ->
         Else
     end.
+
+downcast(#shard{}=S) ->
+    S;
+downcast(#ordered_shard{}=S) ->
+    #shard{
+       name = S#ordered_shard.name,
+       node = S#ordered_shard.node,
+       dbname = S#ordered_shard.dbname,
+       range = S#ordered_shard.range,
+       ref = S#ordered_shard.ref
+      };
+downcast(Shards) when is_list(Shards) ->
+    [downcast(Shard) || Shard <- Shards].