You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2018/08/06 13:46:53 UTC

[couchdb] 03/05: map documents to shards by their partition

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch user-partitioned-dbs-4
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit c8e9dfb40e1469947dae61d68a01c3c871ddde9b
Author: Robert Newson <rn...@apache.org>
AuthorDate: Wed Aug 1 18:45:52 2018 +0100

    map documents to shards by their partition
---
 src/mem3/src/mem3.erl        | 41 ++++++++++++++++++++++++++++++++++++-----
 src/mem3/src/mem3_shards.erl | 20 +++++++++++++++++---
 src/mem3/src/mem3_util.erl   | 19 +++++++++++++++++++
 3 files changed, 72 insertions(+), 8 deletions(-)

diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index 0e5eabf..99c0863 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -13,7 +13,7 @@
 -module(mem3).
 
 -export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2,
-    choose_shards/2, n/1, n/2, dbname/1, ushards/1]).
+    choose_shards/2, n/1, n/2, dbname/1, ushards/1, ushards/2]).
 -export([get_shard/3, local_shards/1, shard_suffix/1, fold_shards/2]).
 -export([sync_security/0, sync_security/1]).
 -export([compare_nodelists/0, compare_shards/1]).
@@ -21,6 +21,7 @@
 -export([live_shards/2]).
 -export([belongs/2, owner/3]).
 -export([get_placement/1]).
+-export([is_partitioned/1]).
 
 %% For mem3 use only.
 -export([name/1, node/1, range/1, engine/1]).
@@ -68,7 +69,7 @@ compare_shards(DbName) ->
 
 -spec n(DbName::iodata()) -> integer().
 n(DbName) ->
-    n(DbName, <<"foo">>).
+    n(DbName, <<"_design/foo">>).
 
 n(DbName, DocId) ->
     length(mem3:shards(DbName, DocId)).
@@ -133,6 +134,12 @@ ushards(DbName) ->
     Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap),
     mem3_util:downcast(Shards).
 
+-spec ushards(DbName::iodata(), DocId::binary()) -> [#shard{}].
+ushards(DbName, DocId) ->
+    Shards = shards_int(DbName, DocId, [ordered]),
+    Shard = hd(Shards),
+    mem3_util:downcast([Shard]).
+
 ushards(DbName, Shards0, ZoneMap) ->
     {L,S,D} = group_by_proximity(Shards0, ZoneMap),
     % Prefer shards in the local zone over shards in a different zone,
@@ -229,13 +236,14 @@ dbname(_) ->
     erlang:error(badarg).
 
 %% @doc Determine if DocId belongs in shard (identified by record or filename)
-belongs(#shard{}=Shard, DocId) when is_binary(DocId) ->
+%% NOTE: only supported for design documents
+belongs(#shard{}=Shard, <<"_design/", _/binary>> = DocId) ->
     [Begin, End] = range(Shard),
     belongs(Begin, End, DocId);
-belongs(<<"shards/", _/binary>> = ShardName, DocId) when is_binary(DocId) ->
+belongs(<<"shards/", _/binary>> = ShardName, <<"_design/", _/binary>> = DocId) ->
     [Begin, End] = range(ShardName),
     belongs(Begin, End, DocId);
-belongs(DbName, DocId) when is_binary(DbName), is_binary(DocId) ->
+belongs(DbName, <<"_design/", _/binary>>) when is_binary(DbName) ->
     true.
 
 belongs(Begin, End, DocId) ->
@@ -331,6 +339,29 @@ engine(Opts) when is_list(Opts) ->
             []
     end.
 
+is_partitioned(DbName0) when is_binary(DbName0) ->
+    DbName = dbname(DbName0),
+    ShardsDbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+    NodesDbName = ?l2b(config:get("mem3", "nodes_db", "_nodes")),
+    case DbName of
+        ShardsDbName ->
+            false;
+        NodesDbName ->
+            false;
+        _ ->
+            is_partitioned(mem3:shards(DbName))
+    end;
+
+is_partitioned(Shards) when is_list(Shards) ->
+    lists:all(fun is_partitioned/1, Shards);
+
+is_partitioned(#shard{opts=Opts}) ->
+    couch_util:get_value(partitioned, Opts) == true;
+
+is_partitioned(#ordered_shard{opts=Opts}) ->
+    couch_util:get_value(partitioned, Opts) == true.
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index da3b69a..c798512 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -66,8 +66,22 @@ for_db(DbName, Options) ->
 for_docid(DbName, DocId) ->
     for_docid(DbName, DocId, []).
 
+%% This function performs one or two lookups now as it is not known
+%% ahead of time if the database is partitioned We first ask for the
+%% shards as if the database is not partitioned and then test the
+%% returned shards for a counter-indication that it was.  If so, we
+%% run the function again with the docid hash option enabled.
 for_docid(DbName, DocId, Options) ->
-    HashKey = mem3_util:hash(DocId),
+    Shards = for_docid(DbName, DocId, Options, []),
+    case mem3:is_partitioned(Shards) of
+        true ->
+            for_docid(DbName, DocId, Options, [partitioned]);
+        false ->
+            Shards
+    end.
+
+for_docid(DbName, DocId, Options, HashOptions) ->
+    HashKey = mem3_util:docid_hash(DocId, HashOptions),
     ShardHead = #shard{
         dbname = DbName,
         range = ['$1', '$2'],
@@ -397,7 +411,8 @@ load_shards_from_db(ShardDb, DbName) ->
 
 load_shards_from_disk(DbName, DocId)->
     Shards = load_shards_from_disk(DbName),
-    HashKey = mem3_util:hash(DocId),
+    Options = [{partitioned, mem3:is_partitioned(Shards)}],
+    HashKey = mem3_util:docid_hash(DocId, Options),
     [S || S <- Shards, in_range(S, HashKey)].
 
 in_range(Shard, HashKey) ->
@@ -521,7 +536,6 @@ filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) ->
 filter_shards_by_name(Name, Matches, [_|Ss]) ->
     filter_shards_by_name(Name, Matches, Ss).
 
-
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index e08d375..cd9b76a 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -16,6 +16,7 @@
     n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
     shard_info/1, ensure_exists/1, open_db_doc/1]).
 -export([is_deleted/1, rotate_list/2]).
+-export([docid_hash/1, docid_hash/2]).
 
 %% do not use outside mem3.
 -export([build_ordered_shards/2, downcast/1]).
@@ -34,6 +35,24 @@ hash(Item) when is_binary(Item) ->
 hash(Item) ->
     erlang:crc32(term_to_binary(Item)).
 
+
+docid_hash(DocId) when is_binary(DocId) ->
+    docid_hash(DocId, []).
+
+docid_hash(<<"_design/", _/binary>> = DocId, _Options) ->
+    erlang:crc32(DocId); % design docs are never placed by partition
+
+docid_hash(DocId, Options) when is_binary(DocId), is_list(Options) ->
+    Data = case lists:member(partitioned, Options) of
+        true ->
+            [Partition, _Rest] = binary:split(DocId, <<":">>),
+            Partition;
+        false ->
+            DocId
+    end,
+    erlang:crc32(Data).
+
+
 name_shard(Shard) ->
     name_shard(Shard, "").