You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2018/07/09 21:40:55 UTC

[couchdb] 01/02: WIP support user-partitioned views

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch user-partitioned-dbs-wip
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 785213db2772e24485dc1a78d2ee17822a540124
Author: Robert Newson <rn...@apache.org>
AuthorDate: Mon Jul 9 18:08:57 2018 +0100

    WIP support user-partitioned views
---
 src/couch_mrview/include/couch_mrview.hrl     |  4 +-
 src/couch_mrview/src/couch_mrview_http.erl    |  2 +
 src/couch_mrview/src/couch_mrview_updater.erl | 15 +++++-
 src/couch_mrview/src/couch_mrview_util.erl    | 66 ++++++++++++++++++++++++---
 4 files changed, 78 insertions(+), 9 deletions(-)

diff --git a/src/couch_mrview/include/couch_mrview.hrl b/src/couch_mrview/include/couch_mrview.hrl
index a341e30..67b3cd9 100644
--- a/src/couch_mrview/include/couch_mrview.hrl
+++ b/src/couch_mrview/include/couch_mrview.hrl
@@ -31,7 +31,8 @@
     doc_acc,
     doc_queue,
     write_queue,
-    qserver=nil
+    qserver=nil,
+    partitioned=false
 }).
 
 
@@ -87,6 +88,7 @@
     conflicts,
     callback,
     sorted = true,
+    partition_key,
     extra = []
 }).
 
diff --git a/src/couch_mrview/src/couch_mrview_http.erl b/src/couch_mrview/src/couch_mrview_http.erl
index 9dae1d8..5ff7285 100644
--- a/src/couch_mrview/src/couch_mrview_http.erl
+++ b/src/couch_mrview/src/couch_mrview_http.erl
@@ -582,6 +582,8 @@ parse_param(Key, Val, Args, IsDecoded) ->
             Args#mrargs{callback=couch_util:to_binary(Val)};
         "sorted" ->
             Args#mrargs{sorted=parse_boolean(Val)};
+        "partition_key" ->
+            Args#mrargs{partition_key=couch_util:to_binary(Val)};
         _ ->
             BKey = couch_util:to_binary(Key),
             BVal = couch_util:to_binary(Val),
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 214f487..e1cc280 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -311,7 +311,8 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
     #mrst{
         id_btree=IdBtree,
         log_btree=LogBtree,
-        first_build=FirstBuild
+        first_build=FirstBuild,
+        partitioned=Partitioned
     } = State,
 
     Revs = dict:from_list(dict:fetch_keys(Log0)),
@@ -328,8 +329,15 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
         _ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild)
     end,
 
-    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) ->
+    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs0, SKVs}}) ->
         #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View,
+        KVs = case Partitioned of
+            true ->
+                [{{[partition(D), K], D}, V} || {{K, D}, V} <- KVs0];
+            false ->
+                KVs0
+        end,
+
         ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
         {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
         NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
@@ -484,3 +492,6 @@ maybe_notify(State, View, KVs, ToRem) ->
         [Key || {Key, _DocId} <- ToRem]
     end,
     couch_index_plugin:index_update(State, View, Updated, Removed).
+
+partition(DocId) ->
+    hd(binary:split(DocId, <<":">>)).
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 086bf9b..35554c4 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -24,7 +24,7 @@
 -export([temp_view_to_ddoc/1]).
 -export([calculate_external_size/1]).
 -export([calculate_active_size/1]).
--export([validate_and_update_args/1]).
+-export([validate_and_update_args/1, validate_and_update_args/2]).
 -export([maybe_load_doc/3, maybe_load_doc/4]).
 -export([maybe_update_index_file/1]).
 -export([extract_view/4, extract_view_reduce/1]).
@@ -33,6 +33,7 @@
 -export([changes_key_opts/2]).
 -export([fold_changes/4]).
 -export([to_key_seq/1]).
+-export([partition_key/2, unpartition_key/1]).
 
 -define(MOD, couch_mrview_index).
 -define(GET_VIEW_RETRY_COUNT, 1).
@@ -281,6 +282,12 @@ init_state(Db, Fd, State, Header) ->
     OpenViewFun = fun(St, View) -> open_view(Db, Fd, Lang, St, View) end,
     Views2 = lists:zipwith(OpenViewFun, ViewStates, Views),
 
+    Partitioned = case couch_db_engine:get_prop(Db, partitioned) of
+        {ok, true} -> true;
+        {ok, false} -> false;
+        {error, no_value} -> false
+    end,
+
     State#mrst{
         fd=Fd,
         fd_monitor=erlang:monitor(process, Fd),
@@ -288,7 +295,8 @@ init_state(Db, Fd, State, Header) ->
         purge_seq=PurgeSeq,
         id_btree=IdBtree,
         log_btree=LogBtree,
-        views=Views2
+        views=Views2,
+        partitioned=Partitioned
     }.
 
 open_view(_Db, Fd, Lang, ViewState, View) ->
@@ -441,7 +449,7 @@ fold_reduce({NthRed, Lang, View}, Fun,  Acc, Options) ->
     couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options).
 
 
-validate_args(Args) ->
+validate_args(Args, Options) ->
     GroupLevel = determine_group_level(Args),
     Reduce = Args#mrargs.reduce,
     case Reduce == undefined orelse is_boolean(Reduce) of
@@ -551,10 +559,21 @@ validate_args(Args) ->
         _ -> mrverror(<<"Invalid value for `sorted`.">>)
     end,
 
+    case {lists:member(partitioned, Options), Args#mrargs.partition_key} of
+        {true, undefined} ->
+            mrverror(<<"`partition_key` parameter is mandatory for queries to this database.">>);
+        {true, _PartitionKey} ->
+            ok;
+        {false, undefined} ->
+            ok;
+        {false, _PartitionKey} ->
+            mrverror(<<"`partition_key` parameter is not supported in this database.">>)
+    end,
+
     true.
 
 
-update_args(#mrargs{} = Args) ->
+update_args(#mrargs{} = Args, _Options) ->
     GroupLevel = determine_group_level(Args),
 
     SKDocId = case {Args#mrargs.direction, Args#mrargs.start_key_docid} of
@@ -569,7 +588,32 @@ update_args(#mrargs{} = Args) ->
         {_, EKDocId1} -> EKDocId1
     end,
 
+    LowestKey = null,
+    HighestKey = {[{<<239, 191, 176>>, null}]}, % \ufff0
+
+    {StartKey, EndKey} = case Args of
+        #mrargs{partition_key=undefined} ->
+            {Args#mrargs.start_key, Args#mrargs.end_key};
+
+        #mrargs{partition_key=PKey0} when not is_binary(PKey0) ->
+            mrverror(<<"`partition_key` must be a string.">>);
+
+        #mrargs{partition_key=PKey0, start_key=undefined, end_key=undefined} ->
+            {[PKey0, LowestKey], [PKey0, HighestKey]};
+
+        #mrargs{partition_key=PKey0, start_key=SK0, end_key=undefined} ->
+            {[PKey0, SK0], [PKey0, HighestKey]};
+
+        #mrargs{partition_key=PKey0, start_key=undefined, end_key=EK0} ->
+            {[PKey0, LowestKey], [PKey0, EK0]};
+
+        #mrargs{partition_key=PKey0, start_key=SK0, end_key=EK0} ->
+            {[PKey0, SK0], [PKey0, EK0]}
+    end,
+
     Args#mrargs{
+        start_key=StartKey,
+        end_key=EndKey,
         start_key_docid=SKDocId,
         end_key_docid=EKDocId,
         group_level=GroupLevel
@@ -577,8 +621,11 @@ update_args(#mrargs{} = Args) ->
 
 
 validate_and_update_args(#mrargs{} = Args) ->
-    true = validate_args(Args),
-    update_args(Args).
+    validate_and_update_args(Args, []).
+
+validate_and_update_args(#mrargs{} = Args, Options) ->
+    true = validate_args(Args, Options),
+    update_args(Args, Options).
 
 
 determine_group_level(#mrargs{group=undefined, group_level=undefined}) ->
@@ -1211,3 +1258,10 @@ kv_external_size(KVList, Reduction) ->
     lists:foldl(fun([[Key, _], Value], Acc) ->
         ?term_size(Key) + ?term_size(Value) + Acc
     end, ?term_size(Reduction), KVList).
+
+
+partition_key(Key, DocId) ->
+    [hd(binary:split(DocId, <<":">>)), Key].
+
+unpartition_key([_Partition, Key]) ->
+    Key.