You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/10/26 15:46:24 UTC

[couchdb] branch feature/user-partitioned-databases-davisp updated (90ba6a4 -> a79f938)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch feature/user-partitioned-databases-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard 90ba6a4  Optimize offset/limit for partition queries
 discard dc9358e  Optimize all_docs queries in a single partition
 discard 4d175ff  Implement partitioned views
 discard 9e46879  Implement `couch_db:get_partition_info/2`
 discard bba2491  Implement partitioned dbs
     new b48c84f  Implement partitioned dbs
     new a6dc8f0  Implement `couch_db:get_partition_info/2`
     new 3a7d756  Implement partitioned views
     new 6c30650  Optimize all_docs queries in a single partition
     new a79f938  Optimize offset/limit for partition queries

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (90ba6a4)
            \
             N -- N -- N   refs/heads/feature/user-partitioned-databases-davisp (a79f938)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/couch/src/couch_partition.erl | 2 +-
 src/mem3/src/mem3.erl             | 4 +++-
 2 files changed, 4 insertions(+), 2 deletions(-)


[couchdb] 02/05: Implement `couch_db:get_partition_info/2`

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch feature/user-partitioned-databases-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a6dc8f0fee2fab8f9b59231200bc182f728bd448
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Oct 23 14:18:35 2018 -0500

    Implement `couch_db:get_partition_info/2`
    
    This feature allows us to fetch statistics for a given partition key
    which will allow for users to find bloated partitions and such forth.
    
    Co-authored-by: Garren Smith <ga...@gmail.com>
    Co-authored-by: Robert Newson <rn...@apache.org>
---
 src/couch/src/couch_bt_engine.erl           | 43 +++++++++++++
 src/couch/src/couch_db.erl                  |  9 +++
 src/couch/src/couch_db_engine.erl           | 24 +++++++
 src/fabric/src/fabric.erl                   | 15 ++++-
 src/fabric/src/fabric_db_partition_info.erl | 98 +++++++++++++++++++++++++++++
 src/fabric/src/fabric_rpc.erl               |  5 +-
 6 files changed, 192 insertions(+), 2 deletions(-)

diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl
index f52f447..824a796 100644
--- a/src/couch/src/couch_bt_engine.erl
+++ b/src/couch/src/couch_bt_engine.erl
@@ -42,6 +42,7 @@
     get_security/1,
     get_props/1,
     get_size_info/1,
+    get_partition_info/2,
     get_update_seq/1,
     get_uuid/1,
 
@@ -277,6 +278,48 @@ get_size_info(#st{} = St) ->
     ].
 
 
+partition_size_cb(traverse, Key, {DC, DDC, Sizes}, {Partition, DCAcc, DDCAcc, SizesAcc}) ->
+    case couch_partition:is_member(Key, Partition) of
+        true ->
+            {skip, {Partition, DC + DCAcc, DDC + DDCAcc, reduce_sizes(Sizes, SizesAcc)}};
+        false ->
+            {ok, {Partition, DCAcc, DDCAcc, SizesAcc}}
+    end;
+
+partition_size_cb(visit, FDI, _PrevReds, {Partition, DCAcc, DDCAcc, Acc}) ->
+    InPartition = couch_partition:is_member(FDI#full_doc_info.id, Partition),
+    Deleted = FDI#full_doc_info.deleted,
+    case {InPartition, Deleted} of
+        {true, true} ->
+            {ok, {Partition, DCAcc, DDCAcc + 1,
+                reduce_sizes(FDI#full_doc_info.sizes, Acc)}};
+        {true, false} ->
+            {ok, {Partition, DCAcc + 1, DDCAcc,
+                reduce_sizes(FDI#full_doc_info.sizes, Acc)}};
+        {false, _} ->
+            {ok, {Partition, DCAcc, DDCAcc, Acc}}
+    end.
+
+
+get_partition_info(#st{} = St, Partition) ->
+    StartKey = <<Partition/binary, ":">>,
+    EndKey = <<Partition/binary, ";">>,
+    Fun = fun partition_size_cb/4,
+    InitAcc = {Partition, 0, 0, #size_info{}},
+    Options = [{start_key, StartKey}, {end_key, EndKey}],
+    {ok, _, OutAcc} = couch_btree:fold(St#st.id_tree, Fun, InitAcc, Options),
+    {Partition, DocCount, DocDelCount, SizeInfo} = OutAcc,
+    [
+        {partition, Partition},
+        {doc_count, DocCount},
+        {doc_del_count, DocDelCount},
+        {sizes, [
+            {active, SizeInfo#size_info.active},
+            {external, SizeInfo#size_info.external}
+        ]}
+    ].
+
+
 get_security(#st{header = Header} = St) ->
     case couch_bt_engine_header:get(Header, security_ptr) of
         undefined ->
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index ff04dab..5eb118d 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -38,6 +38,7 @@
     get_compacted_seq/1,
     get_compactor_pid/1,
     get_db_info/1,
+    get_partition_info/2,
     get_del_doc_count/1,
     get_doc_count/1,
     get_epochs/1,
@@ -630,6 +631,14 @@ get_db_info(Db) ->
     ],
     {ok, InfoList}.
 
+
+get_partition_info(#db{} = Db, Partition) when is_binary(Partition) ->
+    Sizes = couch_db_engine:get_partition_info(Db, Partition),
+    {ok, Sizes};
+get_partition_info(_Db, _Partition) ->
+    throw({bad_request, <<"`partition` is not valid">>}).
+
+
 get_design_docs(#db{name = <<"shards/", _:18/binary, DbFullName/binary>>}) ->
     DbName = ?l2b(filename:rootname(filename:basename(?b2l(DbFullName)))),
     {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl
index eed46e8..d370a98 100644
--- a/src/couch/src/couch_db_engine.erl
+++ b/src/couch/src/couch_db_engine.erl
@@ -44,6 +44,12 @@
 -type purge_info() :: {purge_seq(), uuid(), docid(), revs()}.
 -type epochs() :: [{Node::atom(), UpdateSeq::non_neg_integer()}].
 -type size_info() :: [{Name::atom(), Size::non_neg_integer()}].
+-type partition_info() :: [
+    {partition, Partition::binary()} |
+    {doc_count, DocCount::non_neg_integer()} |
+    {doc_del_count, DocDelCount::non_neg_integer()} |
+    {sizes, size_info()}
+].
 
 -type write_stream_options() :: [
         {buffer_size, Size::pos_integer()} |
@@ -263,6 +269,18 @@
 -callback get_size_info(DbHandle::db_handle()) -> SizeInfo::size_info().
 
 
+% This returns the information for the given partition.
+% It should just be a list of {Name::atom(), Size::non_neg_integer()}
+% It returns the partition name, doc count, deleted doc count and two sizes:
+%
+%   active   - Theoretical minimum number of bytes to store this partition on disk
+%
+%   external - Number of bytes that would be required to represent the
+%              contents of this partition outside of the database
+-callback get_partition_info(DbHandle::db_handle(), Partition::binary()) ->
+    partition_info().
+
+
 % The current update sequence of the database. The update
 % sequence should be incrememnted for every revision added to
 % the database.
@@ -685,6 +703,7 @@
     get_security/1,
     get_props/1,
     get_size_info/1,
+    get_partition_info/2,
     get_update_seq/1,
     get_uuid/1,
 
@@ -861,6 +880,11 @@ get_size_info(#db{} = Db) ->
     Engine:get_size_info(EngineState).
 
 
+get_partition_info(#db{} = Db, Partition) ->
+    #db{engine = {Engine, EngineState}} = Db,
+    Engine:get_partition_info(EngineState, Partition).
+
+
 get_update_seq(#db{} = Db) ->
     #db{engine = {Engine, EngineState}} = Db,
     Engine:get_update_seq(EngineState).
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index bba4a9f..e796c91 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -22,7 +22,7 @@
     set_security/2, set_security/3, get_revs_limit/1, get_security/1,
     get_security/2, get_all_security/1, get_all_security/2,
     get_purge_infos_limit/1, set_purge_infos_limit/3,
-    compact/1, compact/2]).
+    compact/1, compact/2, get_partition_info/2]).
 
 % Documents
 -export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3,
@@ -85,6 +85,19 @@ all_dbs(Prefix) when is_list(Prefix) ->
 get_db_info(DbName) ->
     fabric_db_info:go(dbname(DbName)).
 
+%% @doc returns the size of a given partition
+-spec get_partition_info(dbname(), Partition::binary()) ->
+    {ok, [
+        {db_name, binary()} |
+        {partition, binary()} |
+        {doc_count, non_neg_integer()} |
+        {doc_del_count, non_neg_integer()} |
+        {sizes, json_obj()}
+    ]}.
+get_partition_info(DbName, Partition) ->
+    fabric_db_partition_info:go(dbname(DbName), Partition).
+
+
 %% @doc the number of docs in a database
 -spec get_doc_count(dbname()) ->
     {ok, non_neg_integer()} |
diff --git a/src/fabric/src/fabric_db_partition_info.erl b/src/fabric/src/fabric_db_partition_info.erl
new file mode 100644
index 0000000..9083618
--- /dev/null
+++ b/src/fabric/src/fabric_db_partition_info.erl
@@ -0,0 +1,98 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_db_partition_info).
+
+-export([go/2]).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("mem3/include/mem3.hrl").
+
+go(DbName, Partition) ->
+    Shards = mem3:shards(DbName, <<Partition/binary, ":foo">>),
+    Workers = fabric_util:submit_jobs(Shards, get_partition_info, [Partition]),
+    RexiMon = fabric_util:create_monitors(Shards),
+    Fun = fun handle_message/3,
+    Acc0 = {fabric_dict:init(Workers, nil), []},
+    try
+        case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
+            {ok, Acc} -> {ok, Acc};
+            {timeout, {WorkersDict, _}} ->
+                DefunctWorkers = fabric_util:remove_done_workers(
+                    WorkersDict,
+                    nil
+                ),
+                fabric_util:log_timeout(
+                    DefunctWorkers,
+                    "get_partition_info"
+                ),
+                {error, timeout};
+            {error, Error} -> throw(Error)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
+    case fabric_util:remove_down_workers(Counters, NodeRef) of
+    {ok, NewCounters} ->
+        {ok, {NewCounters, Acc}};
+    error ->
+        {error, {nodedown, <<"progress not possible">>}}
+    end;
+
+handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
+    NewCounters = fabric_dict:erase(Shard, Counters),
+    case fabric_view:is_progress_possible(NewCounters) of
+    true ->
+        {ok, {NewCounters, Acc}};
+    false ->
+        {error, Reason}
+    end;
+
+handle_message({ok, Sizes}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
+    Acc2 = [Sizes | Acc],
+    Counters1 = fabric_dict:erase(Shard, Counters),
+    case fabric_dict:size(Counters1) =:= 0 of
+        true ->
+            [FirstInfo | RestInfos] = Acc2,
+            PartitionInfo = get_max_partition_size(FirstInfo, RestInfos),
+            {stop, [{db_name, Name} | format_partition(PartitionInfo)]};
+        false ->
+            {ok, {Counters1, Acc2}}
+    end;
+    
+handle_message(_, _, Acc) ->
+    {ok, Acc}.
+
+get_max_partition_size(Max, []) ->
+    Max;
+get_max_partition_size(MaxInfo, [NextInfo | Rest]) ->
+    {sizes, MaxSize} = lists:keyfind(sizes, 1, MaxInfo),
+    {sizes, NextSize} = lists:keyfind(sizes, 1, NextInfo),
+
+    {external, MaxExtSize} = lists:keyfind(external, 1, MaxSize),
+    {external, NextExtSize} = lists:keyfind(external, 1, NextSize),
+    case NextExtSize > MaxExtSize of 
+        true ->
+            get_max_partition_size(NextInfo, Rest);
+        false ->
+            get_max_partition_size(MaxInfo, Rest)
+    end.
+
+
+% for JS to work nicely we need to convert the size list
+% to a jiffy object
+format_partition(PartitionInfo) ->
+    {value, {sizes, Size}, PartitionInfo1} = lists:keytake(sizes, 1, PartitionInfo),
+    [{sizes, {Size}} | PartitionInfo1].
+
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 11e6754..873e0c5 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -18,7 +18,7 @@
 -export([all_docs/3, changes/3, map_view/4, reduce_view/4, group_info/2]).
 -export([create_db/1, create_db/2, delete_db/1, reset_validation_funs/1,
     set_security/3, set_revs_limit/3, create_shard_db_doc/2,
-    delete_shard_db_doc/2]).
+    delete_shard_db_doc/2, get_partition_info/2]).
 -export([get_all_security/2, open_shard/2]).
 -export([compact/1, compact/2]).
 -export([get_purge_seq/2, purge_docs/3, set_purge_infos_limit/3]).
@@ -174,6 +174,9 @@ get_db_info(DbName) ->
 get_db_info(DbName, DbOptions) ->
     with_db(DbName, DbOptions, {couch_db, get_db_info, []}).
 
+get_partition_info(DbName, Partition) ->
+    with_db(DbName, [], {couch_db, get_partition_info, [Partition]}).
+
 %% equiv get_doc_count(DbName, [])
 get_doc_count(DbName) ->
     get_doc_count(DbName, []).


[couchdb] 05/05: Optimize offset/limit for partition queries

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch feature/user-partitioned-databases-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a79f938a8db06624df27b6993d14216bc25b4336
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Oct 25 14:27:32 2018 -0500

    Optimize offset/limit for partition queries
    
    Now that a single shard handles the entire response we can optimize work
    normally done in the coordinator by moving it to the RPC worker which
    then removes the need to send an extra `skip` number of rows to the
    coordinator.
    
    Co-authored-by: Robert Newson <rn...@apache.org>
---
 src/fabric/src/fabric_rpc.erl           | 12 +++---------
 src/fabric/src/fabric_view.erl          | 16 ++++++++++++++++
 src/fabric/src/fabric_view_all_docs.erl |  5 +++--
 src/fabric/src/fabric_view_map.erl      |  5 +++--
 src/fabric/src/fabric_view_reduce.erl   |  7 ++++---
 5 files changed, 29 insertions(+), 16 deletions(-)

diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 873e0c5..4ba13bb 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -97,9 +97,8 @@ changes(DbName, Options, StartVector, DbOptions) ->
 
 all_docs(DbName, Options, Args0) ->
     case fabric_util:upgrade_mrargs(Args0) of
-        #mrargs{keys=undefined} = Args1 ->
+        #mrargs{keys=undefined} = Args ->
             set_io_priority(DbName, Options),
-            Args = fix_skip_and_limit(Args1),
             {ok, Db} = get_or_create_db(DbName, Options),
             CB = get_view_cb(Args),
             couch_mrview:query_all_docs(Db, Args, CB, Args)
@@ -123,7 +122,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
     map_view(DbName, DDoc, ViewName, Args0, DbOptions);
 map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
-    Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+    Args = fabric_util:upgrade_mrargs(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     CB = get_view_cb(Args),
     couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args).
@@ -137,16 +136,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
     reduce_view(DbName, DDoc, ViewName, Args0, DbOptions);
 reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
-    Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+    Args = fabric_util:upgrade_mrargs(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     VAcc0 = #vacc{db=Db},
     couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
 
-fix_skip_and_limit(Args) ->
-    #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args,
-    % the coordinator needs to finalize each row, so make sure the shards don't
-    Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}.
-
 create_db(DbName) ->
     create_db(DbName, []).
 
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index 81eb6f0..70d6c06 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -16,6 +16,7 @@
     transform_row/1, keydict/1, extract_view/4, get_shards/2,
     check_down_shards/2, handle_worker_exit/3,
     get_shard_replacements/2, maybe_update_others/5]).
+-export([fix_skip_and_limit/1]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -372,6 +373,21 @@ get_shard_replacements(DbName, UsedShards0) ->
         end
     end, [], UsedShards).
 
+-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, WorkerArgs::#mrargs{}}.
+fix_skip_and_limit(#mrargs{} = Args) ->
+    {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, partition) of
+        undefined ->
+            #mrargs{skip=Skip, limit=Limit}=Args,
+            {Args, Args#mrargs{skip=0, limit=Skip+Limit}};
+        _Partition ->
+            {Args#mrargs{skip=0}, Args}
+    end,
+    %% the coordinator needs to finalize each row, so make sure the shards don't
+    {CoordArgs, remove_finalizer(WorkerArgs)}.
+
+remove_finalizer(Args) ->
+    couch_mrview_util:set_extra(Args, finalizer, null).
+
 % unit test
 is_progress_possible_test() ->
     EndPoint = 2 bsl 31,
diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl
index f0dbb21..0fb3413 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -21,16 +21,17 @@
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
+    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
     DbName = fabric:dbname(Db),
     Shards = shards(Db, QueryArgs),
     Workers0 = fabric_util:submit_jobs(
-            Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
+            Shards, fabric_rpc, all_docs, [Options, WorkerArgs]),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
         case fabric_util:stream_start(Workers0, #shard.ref) of
             {ok, Workers} ->
                 try
-                    go(DbName, Options, Workers, QueryArgs, Callback, Acc)
+                    go(DbName, Options, Workers, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
index 1648623..bc6e15d 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -27,10 +27,11 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo)
 go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
     DbName = fabric:dbname(Db),
     Shards = fabric_view:get_shards(Db, Args),
+    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
     DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
-    RPCArgs = [DocIdAndRev, View, Args, Options],
+    RPCArgs = [DocIdAndRev, View, WorkerArgs, Options],
     StartFun = fun(Shard) ->
         hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
     end,
@@ -42,7 +43,7 @@ go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
-                    go(DbName, Workers, VInfo, Args, Callback, Acc)
+                    go(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
index 7acc67c..712ed24 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -25,9 +25,10 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
 
 go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
     DbName = fabric:dbname(Db),
-    DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
-    RPCArgs = [DocIdAndRev, VName, Args],
     Shards = fabric_view:get_shards(Db, Args),
+    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
+    DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
+    RPCArgs = [DocIdAndRev, VName, WorkerArgs],
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
     StartFun = fun(Shard) ->
@@ -41,7 +42,7 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
-                    go2(DbName, Workers, VInfo, Args, Callback, Acc)
+                    go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;


[couchdb] 04/05: Optimize all_docs queries in a single partition

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch feature/user-partitioned-databases-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6c306502014ee08a6c817153d4d9b8c4e89fc8b5
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Oct 25 14:26:25 2018 -0500

    Optimize all_docs queries in a single partition
    
    If a user specifies document ids that scope the query to a single
    partition key we can automatically determine that we only need to
    consuly a single shard range.
    
    Co-authored-by: Robert Newson <rn...@apache.org>
---
 src/fabric/src/fabric_view_all_docs.erl | 26 ++++++++++++++++++++++++++
 1 file changed, 26 insertions(+)

diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl
index 6acc792..f0dbb21 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -137,6 +137,32 @@ go(DbName, _Options, Workers, QueryArgs, Callback, Acc0) ->
         {ok, Resp}
     end.
 
+shards(Db, Args) ->
+    DbPartitioned = fabric_util:is_partitioned(Db),
+    Partition = couch_mrview_util:get_extra(Args, partition),
+    NewArgs = case {DbPartitioned, Partition} of
+        {true, undefined} ->
+            % If a user specifies the same partition on both
+            % the start and end keys we can optimize the
+            % query by limiting to the partition shard.
+            Start = couch_partition:extract(Args#mrargs.start_key),
+            End = couch_partition:extract(Args#mrargs.end_key),
+            case {Start, End} of
+                {{Partition, SK}, {Partition, EK}} ->
+                    A1 = Args#mrargs{
+                        start_key = SK,
+                        end_key = EK
+                    },
+                    couch_mrview_util:set_extra(A1, partition, Partition);
+                _ ->
+                    Args
+            end;
+        _ ->
+            Args
+    end,
+    fabric_view:get_shards(Db, Args).
+
+
 handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
     fabric_view:check_down_shards(State, NodeRef);
 


[couchdb] 03/05: Implement partitioned views

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch feature/user-partitioned-databases-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3a7d756cb4a4b59c90f156e8a6702dfd648d05b5
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Oct 25 14:19:07 2018 -0500

    Implement partitioned views
    
    The benefit of using partitioned databases is that views can then be
    scoped to a single shard range. This allows for views to scale nearly as
    linearly as document lookups.
    
    Co-authored-by: Garren Smith <ga...@gmail.com>
    Co-authored-by: Robert Newson <rn...@apache.org>
---
 src/chttpd/src/chttpd_db.erl                  |  41 ++++++++-
 src/chttpd/src/chttpd_httpd_handlers.erl      |   1 +
 src/chttpd/test/chttpd_db_bulk_get_test.erl   |  12 +--
 src/couch/src/couch_btree.erl                 |   6 +-
 src/couch/src/couch_ejson_compare.erl         |   4 +
 src/couch_mrview/include/couch_mrview.hrl     |   1 +
 src/couch_mrview/src/couch_mrview.erl         |  15 +++-
 src/couch_mrview/src/couch_mrview_http.erl    |   3 +
 src/couch_mrview/src/couch_mrview_index.erl   |  26 +++++-
 src/couch_mrview/src/couch_mrview_updater.erl |  13 ++-
 src/couch_mrview/src/couch_mrview_util.erl    | 124 +++++++++++++++++++++++++-
 src/fabric/src/fabric.erl                     |  15 ++--
 src/fabric/src/fabric_util.erl                |   1 -
 src/fabric/src/fabric_view.erl                |  32 +++++--
 src/fabric/src/fabric_view_all_docs.erl       |   5 +-
 src/fabric/src/fabric_view_map.erl            |   5 +-
 src/fabric/src/fabric_view_reduce.erl         |   5 +-
 17 files changed, 270 insertions(+), 39 deletions(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 3d6c79f..5e74f71 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -19,7 +19,8 @@
     db_req/2, couch_doc_open/4,handle_changes_req/2,
     update_doc_result_to_json/1, update_doc_result_to_json/2,
     handle_design_info_req/3, handle_view_cleanup_req/2,
-    update_doc/4, http_code_from_status/1]).
+    update_doc/4, http_code_from_status/1,
+    handle_partition_req/2]).
 
 -import(chttpd,
     [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,
@@ -251,6 +252,40 @@ handle_view_cleanup_req(Req, Db) ->
     ok = fabric:cleanup_index_files_all_nodes(Db),
     send_json(Req, 202, {[{ok, true}]}).
 
+
+handle_partition_req(#httpd{method='GET', path_parts=[_,_,PartId]}=Req, Db) ->
+    case couch_db:is_partitioned(Db) of
+        true ->
+            {ok, PartitionInfo} = fabric:get_partition_info(Db, PartId),
+            send_json(Req, {PartitionInfo});
+        false ->
+            throw({bad_request, <<"database is not partitioned">>})
+    end;
+
+handle_partition_req(#httpd{path_parts = [_, _, _]}=Req, _Db) ->
+    send_method_not_allowed(Req, "GET");
+
+handle_partition_req(#httpd{path_parts=[DbName, _, PartId | Rest]}=Req, Db) ->
+    case couch_db:is_partitioned(Db) of
+        true ->
+            QS = chttpd:qs(Req),
+            NewQS = lists:ukeysort(1, [{<<"partition">>, PartId} | QS]),
+            NewReq = Req#httpd{
+                path_parts = [DbName | Rest],
+                qs = NewQS
+            },
+            case Rest of
+                [] ->
+                    do_db_req(Req, Db);
+                [SecondPart|_] ->
+                    Handler = chttpd_handlers:db_handler(SecondPart, fun db_req/2),
+                    do_db_req(Req, Handler)
+            end;
+        false ->
+            throw({bad_request, <<"database is not partitioned">>})
+    end.
+
+
 handle_design_req(#httpd{
         path_parts=[_DbName, _Design, Name, <<"_",_/binary>> = Action | _Rest]
     }=Req, Db) ->
@@ -1659,8 +1694,8 @@ set_namespace(<<"_local_docs">>, Args) ->
     set_namespace(<<"_local">>, Args);
 set_namespace(<<"_design_docs">>, Args) ->
     set_namespace(<<"_design">>, Args);
-set_namespace(NS, #mrargs{extra = Extra} = Args) ->
-    Args#mrargs{extra = [{namespace, NS} | Extra]}.
+set_namespace(NS, #mrargs{} = Args) ->
+    couch_mrview_util:set_extra(Args, namespace, NS).
 
 
 %% /db/_bulk_get stuff
diff --git a/src/chttpd/src/chttpd_httpd_handlers.erl b/src/chttpd/src/chttpd_httpd_handlers.erl
index cb52e2c..000f29b 100644
--- a/src/chttpd/src/chttpd_httpd_handlers.erl
+++ b/src/chttpd/src/chttpd_httpd_handlers.erl
@@ -32,6 +32,7 @@ url_handler(_) -> no_match.
 db_handler(<<"_view_cleanup">>) -> fun chttpd_db:handle_view_cleanup_req/2;
 db_handler(<<"_compact">>)      -> fun chttpd_db:handle_compact_req/2;
 db_handler(<<"_design">>)       -> fun chttpd_db:handle_design_req/2;
+db_handler(<<"_partition">>)    -> fun chttpd_db:handle_partition_req/2;
 db_handler(<<"_temp_view">>)    -> fun chttpd_view:handle_temp_view_req/2;
 db_handler(<<"_changes">>)      -> fun chttpd_db:handle_changes_req/2;
 db_handler(_) -> no_match.
diff --git a/src/chttpd/test/chttpd_db_bulk_get_test.erl b/src/chttpd/test/chttpd_db_bulk_get_test.erl
index f892131..e46c9c3 100644
--- a/src/chttpd/test/chttpd_db_bulk_get_test.erl
+++ b/src/chttpd/test/chttpd_db_bulk_get_test.erl
@@ -95,7 +95,7 @@ should_get_doc_with_all_revs(Pid) ->
     DocRevB = #doc{id = DocId, body = {[{<<"_rev">>, <<"1-CDE">>}]}},
 
     mock_open_revs(all, {ok, [{ok, DocRevA}, {ok, DocRevB}]}),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     [{Result}] = get_results_from_response(Pid),
     ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
@@ -115,7 +115,7 @@ should_validate_doc_with_bad_id(Pid) ->
     DocId = <<"_docudoc">>,
 
     Req = fake_request(DocId),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     [{Result}] = get_results_from_response(Pid),
     ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
@@ -138,7 +138,7 @@ should_validate_doc_with_bad_rev(Pid) ->
     Rev = <<"revorev">>,
 
     Req = fake_request(DocId, Rev),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     [{Result}] = get_results_from_response(Pid),
     ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
@@ -162,7 +162,7 @@ should_validate_missing_doc(Pid) ->
 
     Req = fake_request(DocId, Rev),
     mock_open_revs([{1,<<"revorev">>}], {ok, []}),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     [{Result}] = get_results_from_response(Pid),
     ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
@@ -186,7 +186,7 @@ should_validate_bad_atts_since(Pid) ->
 
     Req = fake_request(DocId, Rev, <<"badattsince">>),
     mock_open_revs([{1,<<"revorev">>}], {ok, []}),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     [{Result}] = get_results_from_response(Pid),
     ?assertEqual(DocId, couch_util:get_value(<<"id">>, Result)),
@@ -210,7 +210,7 @@ should_include_attachments_when_atts_since_specified(_) ->
 
     Req = fake_request(DocId, Rev, [<<"1-abc">>]),
     mock_open_revs([{1,<<"revorev">>}], {ok, []}),
-    chttpd_db:db_req(Req, nil),
+    chttpd_db:db_req(Req, test_util:fake_db([name, <<"foo">>])),
 
     ?_assert(meck:called(fabric, open_revs,
                          [nil, DocId, [{1, <<"revorev">>}],
diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl
index ea224b1..8acefb2 100644
--- a/src/couch/src/couch_btree.erl
+++ b/src/couch/src/couch_btree.erl
@@ -133,7 +133,9 @@ make_group_fun(Bt, exact) ->
     end;
 make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel > 0 ->
     fun
-        ({[_|_] = Key1, _}, {[_|_] = Key2, _}) ->
+        GF({{p, _Partition, Key1}, Val1}, {{p, _Partition, Key2}, Val2}) ->
+            GF({Key1, Val1}, {Key2, Val2});
+        GF({[_|_] = Key1, _}, {[_|_] = Key2, _}) ->
             SL1 = lists:sublist(Key1, GroupLevel),
             SL2 = lists:sublist(Key2, GroupLevel),
             case less(Bt, {SL1, nil}, {SL2, nil}) of
@@ -147,7 +149,7 @@ make_group_fun(Bt, GroupLevel) when is_integer(GroupLevel), GroupLevel > 0 ->
                 _ ->
                     false
             end;
-        ({Key1, _}, {Key2, _}) ->
+        GF({Key1, _}, {Key2, _}) ->
             case less(Bt, {Key1, nil}, {Key2, nil}) of
                 false ->
                     case less(Bt, {Key2, nil}, {Key1, nil}) of
diff --git a/src/couch/src/couch_ejson_compare.erl b/src/couch/src/couch_ejson_compare.erl
index 81adbb8..ca36c86 100644
--- a/src/couch/src/couch_ejson_compare.erl
+++ b/src/couch/src/couch_ejson_compare.erl
@@ -22,6 +22,10 @@ init() ->
     Dir = code:priv_dir(couch),
     ok = erlang:load_nif(filename:join(Dir, ?MODULE), NumScheds).
 
+% partitioned row comparison
+less({p, PA, A}, {p, PB, B}) ->
+    less([PA, A], [PB, B]);
+
 less(A, B) ->
     try
         less_nif(A, B)
diff --git a/src/couch_mrview/include/couch_mrview.hrl b/src/couch_mrview/include/couch_mrview.hrl
index a341e30..e17aaba 100644
--- a/src/couch_mrview/include/couch_mrview.hrl
+++ b/src/couch_mrview/include/couch_mrview.hrl
@@ -20,6 +20,7 @@
     design_opts=[],
     seq_indexed=false,
     keyseq_indexed=false,
+    partitioned=false,
     lib,
     views,
     id_btree=nil,
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl
index 94c6ff0..d342082 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl
@@ -59,6 +59,7 @@ validate_ddoc_fields(DDoc) ->
         [{<<"options">>, object}],
         [{<<"options">>, object}, {<<"include_design">>, boolean}],
         [{<<"options">>, object}, {<<"local_seq">>, boolean}],
+        [{<<"options">>, object}, {<<"partitioned">>, boolean}],
         [{<<"rewrites">>, [string, array]}],
         [{<<"shows">>, object}, {any, [object, string]}],
         [{<<"updates">>, object}, {any, [object, string]}],
@@ -200,9 +201,19 @@ validate(Db,  DDoc) ->
     end,
     {ok, #mrst{
         language = Lang,
-        views = Views
+        views = Views,
+        partitioned = Partitioned
     }} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc),
 
+    case {couch_db:is_partitioned(Db), Partitioned} of
+        {false, true} ->
+            throw({invalid_design_doc,
+                <<"partitioned option cannot be true in a "
+                  "non-partitioned database.">>});
+        {_, _} ->
+            ok
+    end,
+
     try Views =/= [] andalso couch_query_servers:get_os_process(Lang) of
         false ->
             ok;
@@ -616,6 +627,8 @@ red_fold(Db, {NthRed, _Lang, View}=RedView, Args, Callback, UAcc) ->
     end, Acc, OptList),
     finish_fold(Acc2, []).
 
+red_fold({p, _Partition, Key}, Red, Acc) ->
+    red_fold(Key, Red, Acc);
 red_fold(_Key, _Red, #mracc{skip=N}=Acc) when N > 0 ->
     {ok, Acc#mracc{skip=N-1, last_go=ok}};
 red_fold(Key, Red, #mracc{meta_sent=false}=Acc) ->
diff --git a/src/couch_mrview/src/couch_mrview_http.erl b/src/couch_mrview/src/couch_mrview_http.erl
index 004caef..86df796 100644
--- a/src/couch_mrview/src/couch_mrview_http.erl
+++ b/src/couch_mrview/src/couch_mrview_http.erl
@@ -582,6 +582,9 @@ parse_param(Key, Val, Args, IsDecoded) ->
             Args#mrargs{callback=couch_util:to_binary(Val)};
         "sorted" ->
             Args#mrargs{sorted=parse_boolean(Val)};
+        "partition" ->
+            Partition = couch_util:to_binary(Val),
+            couch_mrview_util:set_extra(Args, partition, Partition);
         _ ->
             BKey = couch_util:to_binary(Key),
             BVal = couch_util:to_binary(Val),
diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl
index 4718b56..f7c53ec 100644
--- a/src/couch_mrview/src/couch_mrview_index.erl
+++ b/src/couch_mrview/src/couch_mrview_index.erl
@@ -40,10 +40,12 @@ get(update_options, #mrst{design_opts = Opts}) ->
     LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false),
     SeqIndexed = couch_util:get_value(<<"seq_indexed">>, Opts, false),
     KeySeqIndexed = couch_util:get_value(<<"keyseq_indexed">>, Opts, false),
+    Partitioned = couch_util:get_value(<<"partitioned">>, Opts, false),
     if IncDesign -> [include_design]; true -> [] end
         ++ if LocalSeq -> [local_seq]; true -> [] end
         ++ if KeySeqIndexed -> [keyseq_indexed]; true -> [] end
-        ++ if SeqIndexed -> [seq_indexed]; true -> [] end;
+        ++ if SeqIndexed -> [seq_indexed]; true -> [] end
+        ++ if Partitioned -> [partitioned]; true -> [] end;
 get(fd, #mrst{fd = Fd}) ->
     Fd;
 get(language, #mrst{language = Language}) ->
@@ -94,7 +96,27 @@ get(Other, _) ->
 
 
 init(Db, DDoc) ->
-    couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc).
+    {ok, State} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc),
+    #mrst{
+        design_opts = DesignOpts
+    } = State,
+    DbPartitioned = couch_db:is_partitioned(Db),
+    ViewPartitioned = proplists:get_value(<<"partitioned">>, DesignOpts),
+    IsPartitioned = case {DbPartitioned, ViewPartitioned} of
+        {true, undefined} ->
+            true;
+        {true, true} ->
+            true;
+        {true, false} ->
+            false;
+        {false, undefined} ->
+            false;
+        {false, false} ->
+            false;
+        _ ->
+            throw({bad_request, <<"invalid partiton option">>})
+    end,
+    {ok, State#mrst{partitioned = IsPartitioned}}.
 
 
 open(Db, State) ->
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 3383b49..fdfac0e 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -315,7 +315,8 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
     #mrst{
         id_btree=IdBtree,
         log_btree=LogBtree,
-        first_build=FirstBuild
+        first_build=FirstBuild,
+        partitioned=Partitioned
     } = State,
 
     Revs = dict:from_list(dict:fetch_keys(Log0)),
@@ -332,8 +333,9 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
         _ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild)
     end,
 
-    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) ->
+    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs0, SKVs}}) ->
         #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View,
+        KVs = if Partitioned -> inject_partition(KVs0); true -> KVs0 end,
         ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
         {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
         NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
@@ -382,6 +384,13 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
         log_btree=LogBtree2
     }.
 
+inject_partition(KVs) ->
+    [{{{p, partition(DocId), Key}, DocId}, Value} || {{Key, DocId}, Value} <- KVs].
+
+partition(DocId) ->
+    [Partition, _Rest] = binary:split(DocId, <<":">>),
+    Partition.
+
 update_id_btree(Btree, DocIdKeys, true) ->
     ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
     couch_btree:query_modify(Btree, [], ToAdd, []);
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index 4fd82e0..0c5b11f 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -26,12 +26,13 @@
 -export([temp_view_to_ddoc/1]).
 -export([calculate_external_size/1]).
 -export([calculate_active_size/1]).
--export([validate_args/1]).
+-export([validate_all_docs_args/2, validate_args/1]).
 -export([maybe_load_doc/3, maybe_load_doc/4]).
 -export([maybe_update_index_file/1]).
 -export([extract_view/4, extract_view_reduce/1]).
 -export([get_view_keys/1, get_view_queries/1]).
 -export([set_view_type/3]).
+-export([set_extra/3, get_extra/2, get_extra/3]).
 -export([changes_key_opts/2]).
 -export([fold_changes/4]).
 -export([to_key_seq/1]).
@@ -39,6 +40,12 @@
 -define(MOD, couch_mrview_index).
 -define(GET_VIEW_RETRY_COUNT, 1).
 -define(GET_VIEW_RETRY_DELAY, 50).
+-define(LOWEST_KEY, null).
+-define(HIGHEST_KEY, {<<255, 255, 255, 255>>}).
+-define(PARTITION_START(P), <<P/binary, $:>>).
+-define(PARTITION_END(P), <<P/binary, $;>>).
+-define(LOWEST(A, B), (if A < B -> A; true -> B end)).
+-define(HIGHEST(A, B), (if A > B -> A; true -> B end)).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -94,7 +101,7 @@ get_view(Db, DDoc, ViewName, Args0) ->
 get_view_index_pid(Db, DDoc, ViewName, Args0) ->
     ArgCheck = fun(InitState) ->
         Args1 = set_view_type(Args0, ViewName, InitState#mrst.views),
-        {ok, validate_args(Args1)}
+        {ok, validate_args(InitState, Args1)}
     end,
     couch_index_server:get_index(?MOD, Db, DDoc, ArgCheck).
 
@@ -169,6 +176,7 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
     {DesignOpts} = proplists:get_value(<<"options">>, Fields, {[]}),
     SeqIndexed = proplists:get_value(<<"seq_indexed">>, DesignOpts, false),
     KeySeqIndexed = proplists:get_value(<<"keyseq_indexed">>, DesignOpts, false),
+    Partitioned = proplists:get_value(<<"partitioned">>, DesignOpts, false),
 
     {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
     BySrc = lists:foldl(MakeDict, dict:new(), RawViews),
@@ -189,7 +197,8 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
         language=Language,
         design_opts=DesignOpts,
         seq_indexed=SeqIndexed,
-        keyseq_indexed=KeySeqIndexed
+        keyseq_indexed=KeySeqIndexed,
+        partitioned=Partitioned
     },
     SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)},
     {ok, IdxState#mrst{sig=couch_hash:md5_hash(term_to_binary(SigInfo))}}.
@@ -213,6 +222,19 @@ set_view_type(Args, ViewName, [View | Rest]) ->
     end.
 
 
+set_extra(#mrargs{} = Args, Key, Value) ->
+    Extra0 = Args#mrargs.extra,
+    Extra1 = lists:ukeysort(1, [{Key, Value} | Extra0]),
+    Args#mrargs{extra = Extra1}.
+
+
+get_extra(#mrargs{} = Args, Key) ->
+    couch_util:get_value(Key, Args#mrargs.extra).
+
+get_extra(#mrargs{} = Args, Key, Default) ->
+    couch_util:get_value(Key, Args#mrargs.extra, Default).
+
+
 extract_view(_Lang, _Args, _ViewName, []) ->
     throw({not_found, missing_named_view});
 extract_view(Lang, #mrargs{view_type=map}=Args, Name, [View | Rest]) ->
@@ -476,6 +498,40 @@ fold_reduce({NthRed, Lang, View}, Fun,  Acc, Options) ->
     couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options).
 
 
+validate_args(#mrst{} = State, Args0) ->
+    Args = validate_args(Args0),
+
+    ViewPartitioned = State#mrst.partitioned,
+    Partition = get_extra(Args, partiton),
+
+    case {ViewPartitioned, Partition} of
+        {true, undefined} ->
+            mrverror(<<"`partition` parameter is mandatory for this view.">>);
+        {true, _} ->
+            apply_partition(Args, Partition);
+        {false, undefined} ->
+            Args;
+        {false, Value} when is_binary(Value) ->
+            mrverror(<<"`partition` parameter is not supported on this view">>)
+    end.
+
+
+validate_all_docs_args(Db, Args0) ->
+    Args = validate_args(Args0),
+
+    DbPartitioned = couch_db:is_partitioned(Db),
+    Partition = get_extra(Args, partition),
+
+    case {DbPartitioned, Partition} of
+        {false, <<_/binary>>} ->
+            mrverror(<<"`partition` paramter is not support on this db">>);
+        {_, <<_/binary>>} ->
+            apply_all_docs_partition(Args, Partition);
+        _ ->
+            Args
+    end.
+
+
 validate_args(Args) ->
     GroupLevel = determine_group_level(Args),
     Reduce = Args#mrargs.reduce,
@@ -598,6 +654,12 @@ validate_args(Args) ->
         _ -> mrverror(<<"Invalid value for `sorted`.">>)
     end,
 
+    case get_extra(Args, partition) of
+        undefined -> ok;
+        Partition when is_binary(Partition) -> ok;
+        _ -> mrverror(<<"Invalid value for `partition`.">>)
+    end,
+
     Args#mrargs{
         start_key_docid=SKDocId,
         end_key_docid=EKDocId,
@@ -616,6 +678,62 @@ determine_group_level(#mrargs{group=true, group_level=undefined}) ->
 determine_group_level(#mrargs{group_level=GroupLevel}) ->
     GroupLevel.
 
+apply_partition(#mrargs{keys=[{p, _, _} | _]} = Args, _Partition) ->
+    Args; % already applied
+
+apply_partition(#mrargs{keys=Keys} = Args, Partition) when Keys /= undefined ->
+    Args#mrargs{keys=[{p, Partition, K} || K <- Keys]};
+
+apply_partition(#mrargs{start_key={p, _, _}, end_key={p, _, _}} = Args, _Partition) ->
+    Args; % already applied.
+
+apply_partition(Args, Partition) ->
+    #mrargs{
+        direction = Dir,
+        start_key = StartKey,
+        end_key = EndKey
+    } = Args,
+
+    {DefSK, DefEK} = case Dir of
+        fwd -> {?LOWEST_KEY, ?HIGHEST_KEY};
+        rev -> {?HIGHEST_KEY, ?LOWEST_KEY}
+    end,
+
+    SK0 = if StartKey /= undefined -> StartKey; true -> DefSK end,
+    EK0 = if EndKey /= undefined -> EndKey; true -> DefEK end,
+
+    Args#mrargs{
+        start_key = {p, Partition, SK0},
+        end_key = {p, Partition, EK0}
+    }.
+
+%% all_docs is special as it's not really a view and is already
+%% effectively partitioned as the partition is a prefix of all keys.
+apply_all_docs_partition(#mrargs{} = Args, Partition) ->
+    #mrargs{
+        direction = Dir,
+        start_key = StartKey,
+        end_key = EndKey
+    } = Args,
+
+    {DefSK, DefEK} = case Dir of
+        fwd -> {?PARTITION_START(Partition), ?PARTITION_END(Partition)};
+        rev -> {?PARTITION_END(Partition), ?PARTITION_START(Partition)}
+    end,
+
+    SK0 = if StartKey == undefined -> DefSK; true -> StartKey end,
+    EK0 = if EndKey == undefined -> DefEK; true -> EndKey end,
+
+    {SK1, EK1} = case Dir of
+        fwd -> {?HIGHEST(DefSK, SK0), ?LOWEST(DefEK, EK0)};
+        rev -> {?LOWEST(DefSK, SK0), ?HIGHEST(DefEK, EK0)}
+    end,
+
+    Args#mrargs{
+        start_key = SK1,
+        end_key = EK1
+    }.
+
 
 check_range(#mrargs{start_key=undefined}, _Cmp) ->
     ok;
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index e796c91..49bb863 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -381,10 +381,11 @@ query_view(Db, Options, GroupId, ViewName, Callback, Acc0, QueryArgs)
         when is_binary(GroupId) ->
     DbName = dbname(Db),
     {ok, DDoc} = ddoc_cache:open(DbName, <<"_design/", GroupId/binary>>),
-    query_view(DbName, Options, DDoc, ViewName, Callback, Acc0, QueryArgs);
-query_view(DbName, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
-    Db = dbname(DbName), View = name(ViewName),
-    case fabric_util:is_users_db(Db) of
+    query_view(Db, Options, DDoc, ViewName, Callback, Acc0, QueryArgs);
+query_view(Db342, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
+    DbName = dbname(Db342),
+    View = name(ViewName),
+    case fabric_util:is_users_db(DbName) of
     true ->
         FakeDb = fabric_util:make_cluster_db(DbName, Options),
         couch_users_db:after_doc_read(DDoc, FakeDb);
@@ -392,14 +393,14 @@ query_view(DbName, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
         ok
     end,
     {ok, #mrst{views=Views, language=Lang}} =
-        couch_mrview_util:ddoc_to_mrst(Db, DDoc),
+        couch_mrview_util:ddoc_to_mrst(DbName, DDoc),
     QueryArgs1 = couch_mrview_util:set_view_type(QueryArgs0, View, Views),
     QueryArgs2 = couch_mrview_util:validate_args(QueryArgs1),
     VInfo = couch_mrview_util:extract_view(Lang, QueryArgs2, View, Views),
     case is_reduce_view(QueryArgs2) of
         true ->
             fabric_view_reduce:go(
-                Db,
+                Db342,
                 DDoc,
                 View,
                 QueryArgs2,
@@ -409,7 +410,7 @@ query_view(DbName, Options, DDoc, ViewName, Callback, Acc0, QueryArgs0) ->
             );
         false ->
             fabric_view_map:go(
-                Db,
+                Db342,
                 Options,
                 DDoc,
                 View,
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 4740207..60f7273 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -65,7 +65,6 @@ stream_start(Workers0, Keypos, StartFun, Replacements) ->
     Timeout = request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
         {ok, #stream_acc{workers=Workers}} ->
-            true = fabric_view:is_progress_possible(Workers),
             AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
                 rexi:stream_start(From),
                 [Worker | WorkerAcc]
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index 69f4290..81eb6f0 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -128,8 +128,11 @@ maybe_send_row(State) ->
         try get_next_row(State) of
         {_, NewState} when Skip > 0 ->
             maybe_send_row(NewState#collector{skip=Skip-1});
-        {Row, NewState} ->
-            case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of
+        {Row0, NewState} ->
+            Row1 = possibly_embed_doc(NewState, Row0),
+            Row2 = detach_partition(Row1),
+            Row3 = transform_row(Row2),
+            case Callback(Row3, AccIn) of
             {stop, Acc} ->
                 {stop, NewState#collector{user_acc=Acc, limit=Limit-1}};
             {ok, Acc} ->
@@ -194,6 +197,10 @@ possibly_embed_doc(#collector{db_name=DbName, query_args=Args},
         _ -> Row
     end.
 
+detach_partition(#view_row{key={p, _Partition, Key}} = Row) ->
+    Row#view_row{key = Key};
+detach_partition(#view_row{} = Row) ->
+    Row.
 
 keydict(undefined) ->
     undefined;
@@ -309,10 +316,23 @@ index_of(X, [X|_Rest], I) ->
 index_of(X, [_|Rest], I) ->
     index_of(X, Rest, I+1).
 
-get_shards(DbName, #mrargs{stable=true}) ->
-    mem3:ushards(DbName);
-get_shards(DbName, #mrargs{stable=false}) ->
-    mem3:shards(DbName).
+get_shards(Db, #mrargs{} = Args) ->
+    DbPartitioned = fabric_util:is_partitioned(Db),
+    Partition = couch_mrview_util:get_extra(Args, partition),
+    if DbPartitioned orelse Partition == undefined -> ok; true ->
+        throw({bad_request, <<"partition specified on non-partitioned db">>})
+    end,
+    DbName = fabric:dbname(Db),
+    case {Args#mrargs.stable, Partition} of
+        {true, undefined} ->
+            mem3:ushards(DbName);
+        {true, Partition} ->
+            mem3:ushards(DbName, <<Partition/binary, ":foo">>);
+        {false, undefined} ->
+            mem3:shards(DbName);
+        {false, Partition} ->
+            mem3:shards(DbName, <<Partition/binary, ":foo">>)
+    end.
 
 maybe_update_others(DbName, DDoc, ShardsInvolved, ViewName,
     #mrargs{update=lazy} = Args) ->
diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl
index ac16dac..6acc792 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -20,8 +20,9 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
-go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
-    Shards = mem3:shards(DbName),
+go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
+    DbName = fabric:dbname(Db),
+    Shards = shards(Db, QueryArgs),
     Workers0 = fabric_util:submit_jobs(
             Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
     RexiMon = fabric_util:create_monitors(Workers0),
diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl
index b6a3d6f..1648623 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -24,8 +24,9 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo)
     {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
     go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo);
 
-go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
-    Shards = fabric_view:get_shards(DbName, Args),
+go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
+    DbName = fabric:dbname(Db),
+    Shards = fabric_view:get_shards(Db, Args),
     DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl
index a74be10..7acc67c 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -23,10 +23,11 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
     {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
     go(DbName, DDoc, View, Args, Callback, Acc0, VInfo);
 
-go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
+go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
+    DbName = fabric:dbname(Db),
     DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
     RPCArgs = [DocIdAndRev, VName, Args],
-    Shards = fabric_view:get_shards(DbName, Args),
+    Shards = fabric_view:get_shards(Db, Args),
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
     StartFun = fun(Shard) ->


[couchdb] 01/05: Implement partitioned dbs

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch feature/user-partitioned-databases-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit b48c84f0dbb237c78e1b225a0b799cec0d0e8a10
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Oct 25 17:03:00 2018 -0500

    Implement partitioned dbs
    
    This change introduces the ability for users to place a group of
    documents in a single shard range by specifying a "partition key" in the
    document id. A partition key is denoted by everything preceding a colon
    ':' in the document id.
    
    Every document id (except for design documents) in a partitioned
    database is required to have a partition key.
    
    Co-authored-by: Garren Smith <ga...@gmail.com>
    Co-authored-by: Robert Newson <rn...@apache.org>
---
 src/chttpd/src/chttpd_db.erl        | 29 +++++++++++++++++--
 src/couch/src/couch_db.erl          | 10 +++++++
 src/couch/src/couch_doc.erl         |  6 +++-
 src/couch/src/couch_partition.erl   | 57 +++++++++++++++++++++++++++++++++++++
 src/couch/src/couch_server.erl      |  3 ++
 src/fabric/src/fabric_db_create.erl |  6 +++-
 src/fabric/src/fabric_util.erl      | 10 ++++++-
 src/mem3/src/mem3.erl               | 12 ++++++--
 8 files changed, 126 insertions(+), 7 deletions(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 49d7b58..3d6c79f 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -13,6 +13,7 @@
 -module(chttpd_db).
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("mem3/include/mem3.hrl").
 
 -export([handle_request/1, handle_compact_req/2, handle_design_req/2,
     db_req/2, couch_doc_open/4,handle_changes_req/2,
@@ -285,10 +286,12 @@ create_db_req(#httpd{}=Req, DbName) ->
     Q = chttpd:qs_value(Req, "q", config:get("cluster", "q", "8")),
     P = chttpd:qs_value(Req, "placement", config:get("cluster", "placement")),
     EngineOpt = parse_engine_opt(Req),
+    DbProps = parse_partitioned_opt(Req),
     Options = [
         {n, N},
         {q, Q},
-        {placement, P}
+        {placement, P},
+        {props, DbProps}
     ] ++ EngineOpt,
     DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)),
     case fabric:create_db(DbName, Options) of
@@ -314,7 +317,12 @@ delete_db_req(#httpd{}=Req, DbName) ->
     end.
 
 do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) ->
-    {ok, Db} = couch_db:clustered_db(DbName, Ctx),
+    Shard = hd(mem3:shards(DbName)),
+    Props = couch_util:get_value(props, Shard#shard.opts, []),
+    {ok, Db} = couch_db:clustered_db(DbName, [
+            {usr_ctx, Ctx},
+            {props, Props}
+        ]),
     Fun(Req, Db).
 
 db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) ->
@@ -1453,6 +1461,23 @@ parse_engine_opt(Req) ->
             end
     end.
 
+
+parse_partitioned_opt(Req) ->
+    case chttpd:qs_value(Req, "partitioned") of
+        undefined ->
+            [];
+        "false" ->
+            [];
+        "true" ->
+            [
+                {partitioned, true},
+                {hash, [couch_partition, hash, []]}
+            ];
+        _ ->
+            throw({bad_request, <<"invalid `partitioned` parameter">>})
+    end.
+
+
 parse_doc_query({Key, Value}, Args) ->
     case {Key, Value} of
         {"attachments", "true"} ->
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 1293f0c..ff04dab 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -56,6 +56,7 @@
     is_db/1,
     is_system_db/1,
     is_clustered/1,
+    is_partitioned/1,
 
     set_revs_limit/2,
     set_purge_infos_limit/2,
@@ -214,6 +215,15 @@ is_clustered(#db{}) ->
 is_clustered(?OLD_DB_REC = Db) ->
     ?OLD_DB_MAIN_PID(Db) == undefined.
 
+is_partitioned(#db{options = Options}) ->
+    Props = couch_util:get_value(props, Options, []),
+    case couch_util:get_value(partitioned, Props) of
+        {partitioned, true} ->
+            true;
+        _ ->
+            false
+    end.
+
 ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
     ok = gen_server:call(Pid, full_commit, infinity),
     {ok, StartTime}.
diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl
index f960ec5..22f899f 100644
--- a/src/couch/src/couch_doc.erl
+++ b/src/couch/src/couch_doc.erl
@@ -16,7 +16,7 @@
 -export([from_json_obj/1, from_json_obj_validate/1]).
 -export([from_json_obj/2, from_json_obj_validate/2]).
 -export([to_json_obj/2, has_stubs/1, merge_stubs/2]).
--export([validate_docid/1, validate_docid/2, get_validate_doc_fun/1]).
+-export([validate_docid/1, validate_docid/2, validate_docid/3, get_validate_doc_fun/1]).
 -export([doc_from_multi_part_stream/2, doc_from_multi_part_stream/3]).
 -export([doc_from_multi_part_stream/4]).
 -export([doc_to_multi_part_stream/5, len_doc_to_multi_part_stream/4]).
@@ -199,11 +199,15 @@ parse_revs(_) ->
 
 
 validate_docid(DocId, DbName) ->
+    validate_docid(DocId, DbName, fun(_) -> ok end).
+
+validate_docid(DocId, DbName, Extra) ->
     case DbName =:= ?l2b(config:get("mem3", "shards_db", "_dbs")) andalso
         lists:member(DocId, ?SYSTEM_DATABASES) of
         true ->
             ok;
         false ->
+            Extra(DocId),
             validate_docid(DocId)
     end.
 
diff --git a/src/couch/src/couch_partition.erl b/src/couch/src/couch_partition.erl
new file mode 100644
index 0000000..c2d9375
--- /dev/null
+++ b/src/couch/src/couch_partition.erl
@@ -0,0 +1,57 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_partition).
+
+
+-export([
+    extract/1,
+    from_docid/1,
+    is_member/2,
+
+    hash/1
+]).
+
+
+extract(Value) when is_binary(Value) ->
+    case binary:split(Value, <<":">>) of
+        [Partition, Rest] ->
+            {Partition, Rest};
+        _ ->
+            undefined
+    end.
+
+
+from_docid(DocId) ->
+    case extract(DocId) of
+        undefined ->
+            throw({illegal_docid, <<"doc id must be of form partition:id">>});
+        {Partition, _} ->
+            Partition
+    end.
+
+
+hash(<<"_design/", _/binary>> = DocId) ->
+    erlang:crc32(DocId);
+
+hash(DocId) when is_binary(DocId) ->
+    erlang:crc32(from_docid(DocId)).
+
+
+is_member(DocId, Partition) ->
+    case extract(DocId) of
+        {Partition, _} ->
+            true;
+        _ ->
+            false
+    end.
+
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index c4b7bf1..95892fc 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -221,6 +221,9 @@ init([]) ->
     % Mark pluggable storage engines as a supported feature
     config:enable_feature('pluggable-storage-engines'),
 
+    % Mark partitioned databases as a supported feature
+    config:enable_feature(partitions),
+
     % read config and register for configuration changes
 
     % just stop if one of the config settings change. couch_server_sup
diff --git a/src/fabric/src/fabric_db_create.erl b/src/fabric/src/fabric_db_create.erl
index 94ffd56..2ea3d7b 100644
--- a/src/fabric/src/fabric_db_create.erl
+++ b/src/fabric/src/fabric_db_create.erl
@@ -168,6 +168,10 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix, Options) ->
         E when is_binary(E) -> [{<<"engine">>, E}];
         _ -> []
     end,
+    DbProps = case couch_util:get_value(props, Options) of
+        Props when is_list(Props) -> [{<<"props">>, {Props}}];
+        _ -> []
+    end,
     #doc{
         id = DbName,
         body = {[
@@ -175,7 +179,7 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix, Options) ->
             {<<"changelog">>, lists:sort(RawOut)},
             {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
             {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
-        ] ++ EngineProp}
+        ] ++ EngineProp ++ DbProps}
     }.
 
 db_exists(DbName) -> is_list(catch mem3:shards(DbName)).
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 44446b8..4740207 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -19,7 +19,7 @@
 -export([stream_start/2, stream_start/4]).
 -export([log_timeout/2, remove_done_workers/2]).
 -export([is_users_db/1, is_replicator_db/1]).
--export([make_cluster_db/1, make_cluster_db/2]).
+-export([is_partitioned/1]).
 -export([upgrade_mrargs/1]).
 
 -compile({inline, [{doc_id_and_rev,1}]}).
@@ -327,6 +327,14 @@ doc_id_and_rev(#doc{id=DocId, revs={RevNum, [RevHash|_]}}) ->
     {DocId, {RevNum, RevHash}}.
 
 
+is_partitioned(DbName0) when is_binary(DbName0) ->
+    Shards = mem3:shards(fabric:dbname(DbName0)),
+    is_partitioned(make_cluster_db(hd(Shards)));
+
+is_partitioned(Db) ->
+    couch_db:is_partitioned(Db).
+
+
 upgrade_mrargs(#mrargs{} = Args) ->
     Args;
 
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index ae52104..263b532 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -13,7 +13,7 @@
 -module(mem3).
 
 -export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2,
-    choose_shards/2, n/1, n/2, dbname/1, ushards/1]).
+    choose_shards/2, n/1, n/2, dbname/1, ushards/1, ushards/2]).
 -export([get_shard/3, local_shards/1, shard_suffix/1, fold_shards/2]).
 -export([sync_security/0, sync_security/1]).
 -export([compare_nodelists/0, compare_shards/1]).
@@ -71,7 +71,9 @@ compare_shards(DbName) ->
 
 -spec n(DbName::iodata()) -> integer().
 n(DbName) ->
-    n(DbName, <<"foo">>).
+    % Use _design to avoid issues with
+    % partition validation
+    n(DbName, <<"_design/foo">>).
 
 n(DbName, DocId) ->
     length(mem3:shards(DbName, DocId)).
@@ -136,6 +138,12 @@ ushards(DbName) ->
     Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap),
     mem3_util:downcast(Shards).
 
+-spec ushards(DbName::iodata(), DocId::binary()) -> [#shard{}].
+ushards(DbName, DocId) ->
+    Shards = shards_int(DbName, DocId, [ordered]),
+    Shard = hd(Shards),
+    mem3_util:downcast([Shard]).
+
 ushards(DbName, Shards0, ZoneMap) ->
     {L,S,D} = group_by_proximity(Shards0, ZoneMap),
     % Prefer shards in the local zone over shards in a different zone,