You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ja...@apache.org on 2020/07/26 18:10:12 UTC

[couchdb] 10/17: feat(access): main db update and validation logic

This is an automated email from the ASF dual-hosted git repository.

jan pushed a commit to branch feat/access-master-clean
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 672a790139febef2c0924cd1875d167b7ae95031
Author: Jan Lehnardt <ja...@apache.org>
AuthorDate: Sun Jul 26 20:02:35 2020 +0200

    feat(access): main db update and validation logic
---
 src/couch/src/couch_db.erl         | 230 +++++++++++++++++++++++++++++++------
 src/couch/src/couch_db_updater.erl | 147 ++++++++++++++++++++----
 2 files changed, 319 insertions(+), 58 deletions(-)

diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index e1d726d..cac768d 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -30,6 +30,9 @@
     is_admin/1,
     check_is_admin/1,
     check_is_member/1,
+    validate_access/2,
+    check_access/2,
+    has_access_enabled/1,
 
     name/1,
     get_after_doc_read_fun/1,
@@ -135,6 +138,8 @@
 
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
 -include("couch_db_int.hrl").
 
 -define(DBNAME_REGEX,
@@ -276,6 +281,12 @@ wait_for_compaction(#db{main_pid=Pid}=Db, Timeout) ->
             ok
     end.
 
+has_access_enabled(#db{access=true}) -> true;
+has_access_enabled(_) -> false.
+
+is_read_from_ddoc_cache(Options) ->
+    lists:member(ddoc_cache, Options).
+
 delete_doc(Db, Id, Revisions) ->
     DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
     {ok, [Result]} = update_docs(Db, DeletedDocs, []),
@@ -284,26 +295,36 @@ delete_doc(Db, Id, Revisions) ->
 open_doc(Db, IdOrDocInfo) ->
     open_doc(Db, IdOrDocInfo, []).
 
-open_doc(Db, Id, Options) ->
+open_doc(Db, Id, Options0) ->
     increment_stat(Db, [couchdb, database_reads]),
+    Options = case has_access_enabled(Db) of
+        true -> Options0 ++ [conflicts];
+        _Else -> Options0
+    end,
     case open_doc_int(Db, Id, Options) of
     {ok, #doc{deleted=true}=Doc} ->
         case lists:member(deleted, Options) of
         true ->
-            apply_open_options({ok, Doc},Options);
+            {ok, Doc};
         false ->
             {not_found, deleted}
         end;
     Else ->
-        apply_open_options(Else,Options)
+        Else
     end.
 
-apply_open_options({ok, Doc},Options) ->
-    apply_open_options2(Doc,Options);
-apply_open_options(Else,_Options) ->
+apply_open_options(Db, {ok, Doc}, Options) ->
+    ok = validate_access(Db, Doc, Options),
+    apply_open_options1({ok, Doc}, Options);
+apply_open_options(_Db, Else, _Options) ->
+    Else.
+
+apply_open_options1({ok, Doc}, Options) ->
+    apply_open_options2(Doc, Options);
+apply_open_options1(Else, _Options) ->
     Else.
 
-apply_open_options2(Doc,[]) ->
+apply_open_options2(Doc, []) ->
     {ok, Doc};
 apply_open_options2(#doc{atts=Atts0,revs=Revs}=Doc,
         [{atts_since, PossibleAncestors}|Rest]) ->
@@ -317,8 +338,8 @@ apply_open_options2(#doc{atts=Atts0,revs=Revs}=Doc,
     apply_open_options2(Doc#doc{atts=Atts}, Rest);
 apply_open_options2(Doc, [ejson_body | Rest]) ->
     apply_open_options2(couch_doc:with_ejson_body(Doc), Rest);
-apply_open_options2(Doc,[_|Rest]) ->
-    apply_open_options2(Doc,Rest).
+apply_open_options2(Doc, [_|Rest]) ->
+    apply_open_options2(Doc, Rest).
 
 
 find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
@@ -336,7 +357,7 @@ find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
 open_doc_revs(Db, Id, Revs, Options) ->
     increment_stat(Db, [couchdb, database_reads]),
     [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
-    {ok, [apply_open_options(Result, Options) || Result <- Results]}.
+    {ok, [apply_open_options(Db, Result, Options) || Result <- Results]}.
 
 % Each returned result is a list of tuples:
 % {Id, MissingRevs, PossibleAncestors}
@@ -577,7 +598,8 @@ get_db_info(Db) ->
         name = Name,
         compactor_pid = Compactor,
         instance_start_time = StartTime,
-        committed_update_seq = CommittedUpdateSeq
+        committed_update_seq = CommittedUpdateSeq,
+        access = Access
     } = Db,
     {ok, DocCount} = get_doc_count(Db),
     {ok, DelDocCount} = get_del_doc_count(Db),
@@ -609,7 +631,8 @@ get_db_info(Db) ->
         {committed_update_seq, CommittedUpdateSeq},
         {compacted_seq, CompactedSeq},
         {props, Props},
-        {uuid, Uuid}
+        {uuid, Uuid},
+        {access, Access}
     ],
     {ok, InfoList}.
 
@@ -724,6 +747,73 @@ security_error_type(#user_ctx{name=null}) ->
 security_error_type(#user_ctx{name=_}) ->
     forbidden.
 
+is_per_user_ddoc(#doc{access=[]}) -> false;
+is_per_user_ddoc(#doc{access=[<<"_users">>]}) -> false;
+is_per_user_ddoc(_) -> true.
+    
+
+validate_access(Db, Doc) ->
+    validate_access(Db, Doc, []).
+
+validate_access(Db, Doc, Options) ->
+    validate_access1(has_access_enabled(Db), Db, Doc, Options).
+
+validate_access1(false, _Db, _Doc, _Options) -> ok;
+validate_access1(true, Db, #doc{meta=Meta}=Doc, Options) ->
+    case proplists:get_value(conflicts, Meta) of
+        undefined -> % no conflicts
+            case is_read_from_ddoc_cache(Options) andalso is_per_user_ddoc(Doc) of
+                true -> throw({not_found, missing});
+                _False -> validate_access2(Db, Doc)
+            end;
+        _Else -> % only admins can read conflicted docs in _access dbs
+            case is_admin(Db) of
+                true -> ok;
+                _Else2 -> throw({forbidden, <<"document is in conflict">>})
+            end
+    end.
+validate_access2(Db, Doc) ->
+    validate_access3(check_access(Db, Doc)).
+
+validate_access3(true) -> ok;
+validate_access3(_) -> throw({forbidden, <<"can't touch this">>}).
+
+check_access(Db, #doc{access=Access}=Doc) ->
+    check_access(Db, Access);
+check_access(Db, Access) ->
+    #user_ctx{
+        name=UserName,
+        roles=UserRoles
+    } = Db#db.user_ctx,
+    case Access of
+    [] ->
+        % if doc has no _access, userCtX must be admin
+        is_admin(Db);
+    Access ->
+        % if doc has _access, userCtx must be admin OR matching user or role
+        % _access = ["a", "b", ]
+        case is_admin(Db) of
+        true ->
+            true;
+        _ ->
+            case {check_name(UserName, Access), check_roles(UserRoles, Access)} of
+            {true, _} -> true;
+            {_, true} -> true;
+            _ -> false
+            end
+        end
+    end.
+
+check_name(null, _Access) -> true;
+check_name(UserName, Access) ->
+            lists:member(UserName, Access).
+% nicked from couch_db:check_security
+
+check_roles(Roles, Access) ->
+    UserRolesSet = ordsets:from_list(Roles),
+    RolesSet = ordsets:from_list(Access ++ ["_users"]),
+    not ordsets:is_disjoint(UserRolesSet, RolesSet).
+
 
 get_admins(#db{security=SecProps}) ->
     couch_util:get_value(<<"admins">>, SecProps, {[]}).
@@ -863,9 +953,14 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
     end.
 
 validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}=Doc, _GetDiskDocFun) ->
-    case catch check_is_admin(Db) of
-        ok -> validate_ddoc(Db, Doc);
-        Error -> Error
+    case couch_doc:has_access(Doc) of
+        true ->
+            validate_ddoc(Db, Doc);
+        _Else ->
+            case catch check_is_admin(Db) of
+                ok -> validate_ddoc(Db, Doc);
+                Error -> Error
+            end
     end;
 validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) ->
     ValidationFuns = load_validation_funs(Db),
@@ -1172,6 +1267,32 @@ doc_tag(#doc{meta=Meta}) ->
         Else -> throw({invalid_doc_tag, Else})
     end.
 
+validate_update(Db, Doc) ->
+    case catch validate_access(Db, Doc) of
+        ok -> Doc;
+        Error -> Error
+    end.
+
+
+validate_docs_access(Db, DocBuckets, DocErrors) ->
+            validate_docs_access1(Db, DocBuckets, {[], DocErrors}).
+
+validate_docs_access1(_Db, [], {DocBuckets0, DocErrors}) ->
+            DocBuckets1 = lists:reverse(lists:map(fun lists:reverse/1, DocBuckets0)),
+    DocBuckets = case DocBuckets1 of
+        [[]] -> [];
+        Else -> Else
+    end,
+    {ok, DocBuckets, lists:reverse(DocErrors)};
+validate_docs_access1(Db, [DocBucket|RestBuckets], {DocAcc, ErrorAcc}) ->
+    {NewBuckets, NewErrors} = lists:foldl(fun(Doc, {Acc, ErrAcc}) ->
+        case catch validate_access(Db, Doc) of
+            ok -> {[Doc|Acc], ErrAcc};
+            Error -> {Acc, [{doc_tag(Doc), Error}|ErrAcc]}
+        end
+    end, {[], ErrorAcc}, DocBucket),
+    validate_docs_access1(Db, RestBuckets, {[NewBuckets | DocAcc], NewErrors}).
+
 update_docs(Db, Docs0, Options, replicated_changes) ->
     Docs = tag_docs(Docs0),
 
@@ -1185,9 +1306,31 @@ update_docs(Db, Docs0, Options, replicated_changes) ->
 
     DocBuckets2 = [[doc_flush_atts(Db, check_dup_atts(Doc))
             || Doc <- Bucket] || Bucket <- DocBuckets],
-    {ok, _} = write_and_commit(Db, DocBuckets2,
+    {ok, Results} = write_and_commit(Db, DocBuckets2,
         NonRepDocs, [merge_conflicts | Options]),
-    {ok, DocErrors};
+
+    case couch_db:has_access_enabled(Db) of
+        false ->
+            % we’re done here
+            {ok, DocErrors};
+        _ ->
+            AccessViolations = lists:filter(fun({_Ref, Tag}) -> Tag =:= access end, Results),
+            case length(AccessViolations) of
+                0 ->
+                    % we’re done here
+                    {ok, DocErrors};
+                _ ->
+                    % dig out FDIs from Docs matching our tags/refs
+                    DocsDict = lists:foldl(fun(Doc, Dict) ->
+                        Tag = doc_tag(Doc),
+                        dict:store(Tag, Doc, Dict)
+                    end, dict:new(), Docs),
+                    AccessResults = lists:map(fun({Ref, Access}) ->
+                        { dict:fetch(Ref, DocsDict), Access }
+                    end, AccessViolations),
+                    {ok, AccessResults}
+            end
+    end;
 
 update_docs(Db, Docs0, Options, interactive_edit) ->
     Docs = tag_docs(Docs0),
@@ -1272,7 +1415,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
     MergeConflicts = lists:member(merge_conflicts, Options),
     MRef = erlang:monitor(process, Pid),
     try
-        Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts},
+        Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, Ctx},
         case collect_results_with_metrics(Pid, MRef, []) of
         {ok, Results} -> {ok, Results};
         retry ->
@@ -1286,7 +1429,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
             % We only retry once
             DocBuckets3 = prepare_doc_summaries(Db2, DocBuckets2),
             close(Db2),
-            Pid ! {update_docs, self(), DocBuckets3, NonRepDocs, MergeConflicts},
+            Pid ! {update_docs, self(), DocBuckets3, NonRepDocs, MergeConflicts, Ctx},
             case collect_results_with_metrics(Pid, MRef, []) of
             {ok, Results} -> {ok, Results};
             retry -> throw({update_error, compaction_retry})
@@ -1475,6 +1618,11 @@ open_read_stream(Db, AttState) ->
 is_active_stream(Db, StreamEngine) ->
     couch_db_engine:is_active_stream(Db, StreamEngine).
 
+changes_since(Db, StartSeq, Fun, Options, Acc) when is_record(Db, db) ->
+    case couch_db:has_access_enabled(Db) and not couch_db:is_admin(Db) of
+        true -> couch_mrview:query_changes_access(Db, StartSeq, Fun, Options, Acc);
+        false -> couch_db_engine:fold_changes(Db, StartSeq, Fun, Options, Acc)
+    end.
 
 calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
     Seq;
@@ -1594,8 +1742,10 @@ fold_changes(Db, StartSeq, UserFun, UserAcc) ->
 
 
 fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
-    couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts).
-
+    case couch_db:has_access_enabled(Db) and not couch_db:is_admin(Db) of
+        true -> couch_mrview:query_changes_access(Db, StartSeq, UserFun, Opts, UserAcc);
+        false -> couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts)
+    end.
 
 fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
     fold_purge_infos(Db, StartPurgeSeq, Fun, Acc, []).
@@ -1616,7 +1766,7 @@ open_doc_revs_int(Db, IdRevs, Options) ->
     lists:zipwith(
         fun({Id, Revs}, Lookup) ->
             case Lookup of
-            #full_doc_info{rev_tree=RevTree} ->
+            #full_doc_info{rev_tree=RevTree, access=Access} ->
                 {FoundRevs, MissingRevs} =
                 case Revs of
                 all ->
@@ -1636,7 +1786,7 @@ open_doc_revs_int(Db, IdRevs, Options) ->
                         % we have the rev in our list but know nothing about it
                         {{not_found, missing}, {Pos, Rev}};
                     #leaf{deleted=IsDeleted, ptr=SummaryPtr} ->
-                        {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
+                        {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath, Access)}
                     end
                 end, FoundRevs),
                 Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
@@ -1652,21 +1802,27 @@ open_doc_revs_int(Db, IdRevs, Options) ->
 open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
     case couch_db_engine:open_local_docs(Db, [Id]) of
     [#doc{} = Doc] ->
-        apply_open_options({ok, Doc}, Options);
+        case Doc#doc.body of
+            { Body } ->
+                Access = couch_util:get_value(<<"_access">>, Body),
+                apply_open_options(Db, {ok, Doc#doc{access = Access}}, Options);
+            _Else ->
+                apply_open_options(Db, {ok, Doc}, Options)
+        end;
     [not_found] ->
         {not_found, missing}
     end;
-open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) ->
+open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_],access=Access}=DocInfo, Options) ->
     #rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo,
-    Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}),
-    apply_open_options(
+    Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}, Access),
+    apply_open_options(Db,
        {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}, Options);
-open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
+open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree,access=Access}=FullDocInfo, Options) ->
     #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} =
         DocInfo = couch_doc:to_doc_info(FullDocInfo),
     {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
-    Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
-    apply_open_options(
+    Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath, Access),
+    apply_open_options(Db,
         {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options);
 open_doc_int(Db, Id, Options) ->
     case get_full_doc_info(Db, Id) of
@@ -1716,22 +1872,28 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
     true -> [{local_seq, Seq}]
     end.
 
+make_doc(Db, Id, Deleted, Bp, RevisionPath) ->
+    make_doc(Db, Id, Deleted, Bp, RevisionPath, []);
+make_doc(Db, Id, Deleted, Bp, {Pos, Revs}) ->
+    make_doc(Db, Id, Deleted, Bp, {Pos, Revs}, []).
 
-make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
+make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath, Access) ->
     #doc{
         id = Id,
         revs = RevisionPath,
         body = [],
         atts = [],
-        deleted = Deleted
+        deleted = Deleted,
+        access = Access
     };
-make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}) ->
+make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}, Access) ->
     RevsLimit = get_revs_limit(Db),
     Doc0 = couch_db_engine:read_doc_body(Db, #doc{
         id = Id,
         revs = {Pos, lists:sublist(Revs, 1, RevsLimit)},
         body = Bp,
-        deleted = Deleted
+        deleted = Deleted,
+        access = Access
     }),
     Doc1 = case Doc0#doc.atts of
         BinAtts when is_binary(BinAtts) ->
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 1ca804c..41d3661 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -22,7 +22,10 @@
 
 -define(IDLE_LIMIT_DEFAULT, 61000).
 -define(DEFAULT_MAX_PARTITION_SIZE, 16#280000000). % 10 GiB
-
+-define(DEFAULT_SECURITY_OBJECT, [
+ {<<"members">>,{[{<<"roles">>,[<<"_admin">>]}]}},
+ {<<"admins">>, {[{<<"roles">>,[<<"_admin">>]}]}}
+]).
 
 -record(merge_acc, {
     revs_limit,
@@ -37,7 +40,7 @@
 init({Engine, DbName, FilePath, Options0}) ->
     erlang:put(io_priority, {db_update, DbName}),
     update_idle_limit_from_config(),
-    DefaultSecObj = default_security_object(DbName),
+    DefaultSecObj = default_security_object(DbName, Options0),
     Options = [{default_security_object, DefaultSecObj} | Options0],
     try
         {ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options),
@@ -162,7 +165,7 @@ handle_cast(Msg, #db{name = Name} = Db) ->
     {stop, Msg, Db}.
 
 
-handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts},
+handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, UserCtx},
         Db) ->
     GroupedDocs2 = sort_and_tag_grouped_docs(Client, GroupedDocs),
     if NonRepDocs == [] ->
@@ -173,7 +176,7 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts},
         Clients = [Client]
     end,
     NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
-    try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts) of
+    try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts, UserCtx) of
     {ok, Db2, UpdatedDDocIds} ->
         ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
         case {couch_db:get_update_seq(Db), couch_db:get_update_seq(Db2)} of
@@ -248,7 +251,11 @@ sort_and_tag_grouped_docs(Client, GroupedDocs) ->
     % The merge_updates function will fail and the database can end up with
     % duplicate documents if the incoming groups are not sorted, so as a sanity
     % check we sort them again here. See COUCHDB-2735.
-    Cmp = fun([#doc{id=A}|_], [#doc{id=B}|_]) -> A < B end,
+    Cmp = fun
+        ([], []) -> false; % TODO: re-evaluate this addition, might be a
+                           %       superflous now
+        ([#doc{id=A}|_], [#doc{id=B}|_]) -> A < B
+     end,
     lists:map(fun(DocGroup) ->
         [{Client, maybe_tag_doc(D)} || D <- DocGroup]
     end, lists:sort(Cmp, GroupedDocs)).
@@ -298,7 +305,7 @@ init_db(DbName, FilePath, EngineState, Options) ->
 
     BDU = couch_util:get_value(before_doc_update, Options, nil),
     ADR = couch_util:get_value(after_doc_read, Options, nil),
-
+    Access = couch_util:get_value(access, Options, false),
     NonCreateOpts = [Opt || Opt <- Options, Opt /= create],
 
     InitDb = #db{
@@ -308,7 +315,8 @@ init_db(DbName, FilePath, EngineState, Options) ->
         instance_start_time = StartTime,
         options = NonCreateOpts,
         before_doc_update = BDU,
-        after_doc_read = ADR
+        after_doc_read = ADR,
+        access = Access
     },
 
     DbProps = couch_db_engine:get_props(InitDb),
@@ -364,7 +372,8 @@ flush_trees(#db{} = Db,
                             active = WrittenSize,
                             external = ExternalSize
                         },
-                        atts = AttSizeInfo
+                        atts = AttSizeInfo,
+                        access = NewDoc#doc.access
                     },
                     {Leaf, add_sizes(Type, Leaf, SizesAcc)};
                 #leaf{} ->
@@ -439,6 +448,9 @@ doc_tag(#doc{meta=Meta}) ->
         Else -> throw({invalid_doc_tag, Else})
     end.
 
+merge_rev_trees([[]], [], Acc) ->
+    % validate_docs_access left us with no docs to merge
+    {ok, Acc};
 merge_rev_trees([], [], Acc) ->
     {ok, Acc#merge_acc{
         add_infos = lists:reverse(Acc#merge_acc.add_infos)
@@ -611,19 +623,26 @@ maybe_stem_full_doc_info(#full_doc_info{rev_tree = Tree} = Info, Limit) ->
             Info
     end.
 
-update_docs_int(Db, DocsList, LocalDocs, MergeConflicts) ->
+update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, UserCtx) ->
     UpdateSeq = couch_db_engine:get_update_seq(Db),
     RevsLimit = couch_db_engine:get_revs_limit(Db),
 
     Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
+    % TODO: maybe a perf hit, instead of zip3-ing existin Accesses into
+    %       our doc lists, maybe find 404 docs differently down in
+    %       validate_docs_access (revs is [], which we can then use
+    %       to skip validation as we know it is the first doc rev)
+    Accesses = [Access || [{_Client, #doc{access=Access}}|_] <- DocsList],
+
     % lookup up the old documents, if they exist.
     OldDocLookups = couch_db_engine:open_docs(Db, Ids),
-    OldDocInfos = lists:zipwith(fun
-        (_Id, #full_doc_info{} = FDI) ->
+    
+    OldDocInfos = lists:zipwith3(fun
+        (_Id, #full_doc_info{} = FDI, _Access) ->
             FDI;
-        (Id, not_found) ->
-            #full_doc_info{id=Id}
-    end, Ids, OldDocLookups),
+        (Id, not_found, Access) ->
+            #full_doc_info{id=Id,access=Access}
+    end, Ids, OldDocLookups, Accesses),
 
     %% Get the list of full partitions
     FullPartitions = case couch_db:is_partitioned(Db) of
@@ -653,7 +672,14 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts) ->
         cur_seq = UpdateSeq,
         full_partitions = FullPartitions
     },
-    {ok, AccOut} = merge_rev_trees(DocsList, OldDocInfos, AccIn),
+    % Loop over DocsList, validate_access for each OldDocInfo on Db,
+    %.  if no OldDocInfo, then send to DocsListValidated, keep OldDocsInfo
+    %   if valid, then send to DocsListValidated, OldDocsInfo
+    %.  if invalid, then send_result tagged `access`(c.f. `conflict)
+    %.    and don’t add to DLV, nor ODI
+
+    { DocsListValidated, OldDocInfosValidated } = validate_docs_access(Db, UserCtx, DocsList, OldDocInfos),
+    {ok, AccOut} = merge_rev_trees(DocsListValidated, OldDocInfosValidated, AccIn),
     #merge_acc{
         add_infos = NewFullDocInfos,
         rem_seqs = RemSeqs
@@ -663,7 +689,8 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts) ->
     % the trees, the attachments are already written to disk)
     {ok, IndexFDIs} = flush_trees(Db, NewFullDocInfos, []),
     Pairs = pair_write_info(OldDocLookups, IndexFDIs),
-    LocalDocs2 = update_local_doc_revs(LocalDocs),
+    LocalDocs1 = apply_local_docs_access(Db, LocalDocs),
+    LocalDocs2 = update_local_doc_revs(LocalDocs1),
 
     {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2),
 
@@ -676,15 +703,83 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts) ->
         length(LocalDocs2)
     ),
 
-    % Check if we just updated any design documents, and update the validation
-    % funs if we did.
+    % Check if we just updated any non-access design documents,
+    % and update the validation funs if we did.
+    NonAccessIds = [Id || [{_Client, #doc{id=Id,access=[]}}|_] <- DocsList],
     UpdatedDDocIds = lists:flatmap(fun
         (<<"_design/", _/binary>> = Id) -> [Id];
         (_) -> []
-    end, Ids),
+    end, NonAccessIds),
 
     {ok, commit_data(Db1), UpdatedDDocIds}.
 
+% check_access(Db, UserCtx, Access) ->
+%     check_access(Db, UserCtx, couch_db:has_access_enabled(Db), Access).
+%
+% check_access(_Db, UserCtx, false, _Access) ->
+%     true;
+
+% at this point, we already validated this Db is access enabled, so do the checks right away.
+check_access(Db, UserCtx, Access) -> couch_db:check_access(Db#db{user_ctx=UserCtx}, Access).
+
+% TODO: looks like we go into validation here unconditionally and only check in
+%       check_access() whether the Db has_access_enabled(), we should do this
+%       here on the outside. Might be our perf issue.
+%       However, if it is, that means we have to speed this up as it would still
+%       be too slow for when access is enabled.
+validate_docs_access(Db, UserCtx, DocsList, OldDocInfos) ->
+    case couch_db:has_access_enabled(Db) of
+        true -> validate_docs_access_int(Db, UserCtx, DocsList, OldDocInfos);
+        _Else -> { DocsList, OldDocInfos }
+    end.
+
+validate_docs_access_int(Db, UserCtx, DocsList, OldDocInfos) ->
+    validate_docs_access(Db, UserCtx, DocsList, OldDocInfos, [], []).
+
+validate_docs_access(_Db, UserCtx, [], [], DocsListValidated, OldDocInfosValidated) ->
+    { lists:reverse(DocsListValidated), lists:reverse(OldDocInfosValidated) };
+validate_docs_access(Db, UserCtx, [Docs | DocRest], [OldInfo | OldInfoRest], DocsListValidated, OldDocInfosValidated) ->
+    % loop over Docs as {Client,  NewDoc}
+    %   validate Doc
+    %   if valid, then put back in Docs
+    %   if not, then send_result and skip
+    NewDocs = lists:foldl(fun({ Client, Doc }, Acc) ->
+        % check if we are allowed to update the doc, skip when new doc
+        OldDocMatchesAccess = case OldInfo#full_doc_info.rev_tree of
+            [] -> true;
+            _ -> check_access(Db, UserCtx, OldInfo#full_doc_info.access)
+        end,
+        
+        NewDocMatchesAccess = check_access(Db, UserCtx, Doc#doc.access),
+        case OldDocMatchesAccess andalso NewDocMatchesAccess of
+            true -> % if valid, then send to DocsListValidated, OldDocsInfo
+                    % and store the access context on the new doc
+                [{Client, Doc} | Acc];
+            _Else2 -> % if invalid, then send_result tagged `access`(c.f. `conflict)
+                      % and don’t add to DLV, nor ODI
+                send_result(Client, Doc, access),
+                Acc
+        end
+    end, [], Docs),
+
+    { NewDocsListValidated, NewOldDocInfosValidated } = case length(NewDocs) of
+        0 -> % we sent out all docs as invalid access, drop the old doc info associated with it
+            { [NewDocs | DocsListValidated], OldDocInfosValidated };
+        _ ->
+            { [NewDocs | DocsListValidated], [OldInfo | OldDocInfosValidated] }
+    end,
+    validate_docs_access(Db, UserCtx, DocRest, OldInfoRest, NewDocsListValidated, NewOldDocInfosValidated).
+
+apply_local_docs_access(Db, Docs) ->
+    apply_local_docs_access1(couch_db:has_access_enabled(Db), Docs).
+
+apply_local_docs_access1(false, Docs) ->
+    Docs;
+apply_local_docs_access1(true, Docs) ->
+    lists:map(fun({Client, #doc{access = Access, body = {Body}} = Doc}) ->
+        Doc1 = Doc#doc{body = {[{<<"_access">>, Access} | Body]}},
+        {Client, Doc1}
+    end, Docs).
 
 update_local_doc_revs(Docs) ->
     lists:foldl(fun({Client, Doc}, Acc) ->
@@ -850,19 +945,23 @@ get_meta_body_size(Meta) ->
     ExternalSize.
 
 
+default_security_object(DbName, []) ->
+    default_security_object(DbName);
+default_security_object(DbName, Options) ->
+    case lists:member({access, true}, Options) of
+        false -> default_security_object(DbName);
+        true -> ?DEFAULT_SECURITY_OBJECT
+    end.
 default_security_object(<<"shards/", _/binary>>) ->
     case config:get("couchdb", "default_security", "admin_only") of
-        "admin_only" ->
-            [{<<"members">>,{[{<<"roles">>,[<<"_admin">>]}]}},
-             {<<"admins">>,{[{<<"roles">>,[<<"_admin">>]}]}}];
+        "admin_only" -> ?DEFAULT_SECURITY_OBJECT;
         Everyone when Everyone == "everyone"; Everyone == "admin_local" ->
             []
     end;
 default_security_object(_DbName) ->
     case config:get("couchdb", "default_security", "admin_only") of
         Admin when Admin == "admin_only"; Admin == "admin_local" ->
-            [{<<"members">>,{[{<<"roles">>,[<<"_admin">>]}]}},
-             {<<"admins">>,{[{<<"roles">>,[<<"_admin">>]}]}}];
+            ?DEFAULT_SECURITY_OBJECT;
         "everyone" ->
             []
     end.