You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2019/07/31 09:09:54 UTC

[couchdb] 04/04: CouchDB map indexes on FDB

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 264ddaeba8b0616c952dd42c8e1a1846580e38ea
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Mon Jun 17 15:45:10 2019 +0200

    CouchDB map indexes on FDB
    
    This adds couch_views which builds map indexes and stores them in FDB.
    
    Co-authored-by: Paul J. Davis <pa...@gmail.com>
---
 rebar.config.script                                |   1 +
 rel/overlay/etc/default.ini                        |   4 +
 rel/reltool.config                                 |   2 +
 src/chttpd/src/chttpd_db.erl                       |   3 +-
 src/chttpd/src/chttpd_view.erl                     |   5 +-
 src/couch_mrview/src/couch_mrview_util.erl         |   2 +-
 src/couch_views/.gitignore                         |  19 +
 src/couch_views/README.md                          |  15 +
 src/couch_views/include/couch_views.hrl            |  26 ++
 src/couch_views/rebar.config                       |  14 +
 src/couch_views/src/couch_views.app.src            |  31 ++
 src/couch_views/src/couch_views.erl                | 140 ++++++
 src/couch_views/src/couch_views_app.erl            |  31 ++
 src/couch_views/src/couch_views_encoding.erl       | 105 +++++
 src/couch_views/src/couch_views_fdb.erl            | 438 +++++++++++++++++
 src/couch_views/src/couch_views_indexer.erl        | 261 +++++++++++
 src/couch_views/src/couch_views_jobs.erl           | 109 +++++
 src/couch_views/src/couch_views_reader.erl         | 208 +++++++++
 src/couch_views/src/couch_views_server.erl         | 103 ++++
 src/couch_views/src/couch_views_sup.erl            |  46 ++
 src/couch_views/src/couch_views_util.erl           |  84 ++++
 src/couch_views/test/couch_views_encoding_test.erl |  94 ++++
 src/couch_views/test/couch_views_indexer_test.erl  | 456 ++++++++++++++++++
 src/couch_views/test/couch_views_map_test.erl      | 517 +++++++++++++++++++++
 src/fabric/include/fabric2.hrl                     |   1 +
 test/elixir/test/basics_test.exs                   |  24 +-
 test/elixir/test/map_test.exs                      | 450 ++++++++++++++++++
 test/elixir/test/view_collation_test.exs           |  28 +-
 28 files changed, 3189 insertions(+), 28 deletions(-)

diff --git a/rebar.config.script b/rebar.config.script
index 116c040..ce79728 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -82,6 +82,7 @@ SubDirs = [
     "src/couch_stats",
     "src/couch_peruser",
     "src/couch_tests",
+    "src/couch_views",
     "src/ddoc_cache",
     "src/fabric",
     "src/couch_jobs",
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 8fd2261..11bd611 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -223,6 +223,10 @@ iterations = 10 ; iterations for password hashing
 ; users_db_public = false
 ; cookie_domain = example.com
 
+; Settings for view indexing
+[couch_views]
+; max_workers = 100
+
 ; CSP (Content Security Policy) Support for _utils
 [csp]
 enable = true
diff --git a/rel/reltool.config b/rel/reltool.config
index 7b2159d..2b088be 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -42,6 +42,7 @@
         couch_stats,
         couch_event,
         couch_peruser,
+        couch_views,
         ddoc_cache,
         ets_lru,
         fabric,
@@ -100,6 +101,7 @@
     {app, couch_stats, [{incl_cond, include}]},
     {app, couch_event, [{incl_cond, include}]},
     {app, couch_peruser, [{incl_cond, include}]},
+    {app, couch_views, [{incl_cond, include}]},
     {app, ddoc_cache, [{incl_cond, include}]},
     {app, ets_lru, [{incl_cond, include}]},
     {app, fabric, [{incl_cond, include}]},
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 0c7e4d5..785ca3f 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -334,7 +334,8 @@ handle_design_req(#httpd{
         path_parts=[_DbName, _Design, Name, <<"_",_/binary>> = Action | _Rest]
     }=Req, Db) ->
     DbName = fabric2_db:name(Db),
-    case ddoc_cache:open(DbName, <<"_design/", Name/binary>>) of
+%%    case ddoc_cache:open(DbName, <<"_design/", Name/binary>>) of
+    case fabric2_db:open_doc(Db, <<"_design/", Name/binary>>) of
     {ok, DDoc} ->
         Handler = chttpd_handlers:design_handler(Action, fun bad_action_req/3),
         Handler(Req, Db, DDoc);
diff --git a/src/chttpd/src/chttpd_view.erl b/src/chttpd/src/chttpd_view.erl
index 26107d7..6765cca 100644
--- a/src/chttpd/src/chttpd_view.erl
+++ b/src/chttpd/src/chttpd_view.erl
@@ -43,10 +43,9 @@ multi_query_view(Req, Db, DDoc, ViewName, Queries) ->
 design_doc_view(Req, Db, DDoc, ViewName, Keys) ->
     Args = couch_mrview_http:parse_params(Req, Keys),
     Max = chttpd:chunked_response_buffer_size(),
+    Fun = fun view_cb/2,
     VAcc = #vacc{db=Db, req=Req, threshold=Max},
-    Options = [{user_ctx, Req#httpd.user_ctx}],
-    {ok, Resp} = fabric:query_view(Db, Options, DDoc, ViewName,
-            fun view_cb/2, VAcc, Args),
+    {ok, Resp} = couch_views:query(Db, DDoc, ViewName, Fun, VAcc, Args),
     {ok, Resp#vacc.resp}.
 
 
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index eb68124..18a4be1 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -497,7 +497,7 @@ fold_reduce({NthRed, Lang, View}, Fun,  Acc, Options) ->
 
 
 validate_args(Db, DDoc, Args0) ->
-    {ok, State} = couch_mrview_index:init(Db, DDoc),
+    {ok, State} = couch_mrview_util:ddoc_to_mrst(fabric2_db:name(Db), DDoc),
     Args1 = apply_limit(State#mrst.partitioned, Args0),
     validate_args(State, Args1).
 
diff --git a/src/couch_views/.gitignore b/src/couch_views/.gitignore
new file mode 100644
index 0000000..f1c4554
--- /dev/null
+++ b/src/couch_views/.gitignore
@@ -0,0 +1,19 @@
+.rebar3
+_*
+.eunit
+*.o
+*.beam
+*.plt
+*.swp
+*.swo
+.erlang.cookie
+ebin
+log
+erl_crash.dump
+.rebar
+logs
+_build
+.idea
+*.iml
+rebar3.crashdump
+*~
diff --git a/src/couch_views/README.md b/src/couch_views/README.md
new file mode 100644
index 0000000..49cd82b
--- /dev/null
+++ b/src/couch_views/README.md
@@ -0,0 +1,15 @@
+CouchDB Views
+=====
+
+This is the new application that builds and runs Map/reduce views against FoundationDB.
+Currently only map indexes are supported and it will always return the full index.
+
+Code layout:
+
+* `couch_views` - Main entry point to query a view
+* `couch_views_reader` - Reads from the index for queries
+* `couch_views_indexer` - `couch_jobs` worker that builds an index from the changes feed.
+* `couch_vews_jobs` - `couch_views` interactions with `couch_jobs`. It handles adding index jobs and subscribes to jobs.
+* `couch_views_fdb` - Maps view operations to FoundationDB logic.
+* `couch_views_encoding` - Encodes view keys that are byte comparable following CouchDB view sort order.
+* `couch_views_server` - Spawns `couch_views_indexer` workers to handle index update jobs.
diff --git a/src/couch_views/include/couch_views.hrl b/src/couch_views/include/couch_views.hrl
new file mode 100644
index 0000000..2e443eb
--- /dev/null
+++ b/src/couch_views/include/couch_views.hrl
@@ -0,0 +1,26 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% indexing
+-define(VIEW_UPDATE_SEQ, 0).
+-define(VIEW_ID_INFO, 1).
+-define(VIEW_ID_RANGE, 2).
+-define(VIEW_MAP_RANGE, 3).
+
+-define(VIEW_ROW_COUNT, 0).
+-define(VIEW_KV_SIZE, 1).
+
+-define(VIEW_ROW_KEY, 0).
+-define(VIEW_ROW_VALUE, 1).
+
+% jobs api
+-define(INDEX_JOB_TYPE, <<"views">>).
diff --git a/src/couch_views/rebar.config b/src/couch_views/rebar.config
new file mode 100644
index 0000000..362c878
--- /dev/null
+++ b/src/couch_views/rebar.config
@@ -0,0 +1,14 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{cover_enabled, true}.
+{cover_print_enabled, true}.
diff --git a/src/couch_views/src/couch_views.app.src b/src/couch_views/src/couch_views.app.src
new file mode 100644
index 0000000..c80c30b
--- /dev/null
+++ b/src/couch_views/src/couch_views.app.src
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_views, [
+    {description, "CouchDB Views on FDB"},
+    {vsn, git},
+    {mod, {couch_views_app, []}},
+    {registered, [
+        couch_views_sup,
+        couch_views_server
+    ]},
+    {applications, [
+        kernel,
+        stdlib,
+        erlfdb,
+        couch_log,
+        config,
+        couch_stats,
+        fabric,
+        couch_jobs
+    ]}
+]}.
diff --git a/src/couch_views/src/couch_views.erl b/src/couch_views/src/couch_views.erl
new file mode 100644
index 0000000..7c7588c
--- /dev/null
+++ b/src/couch_views/src/couch_views.erl
@@ -0,0 +1,140 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views).
+
+-export([
+    query/6
+]).
+
+
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+query(Db, DDoc, ViewName, Callback, Acc0, Args0) ->
+    case fabric2_db:is_users_db(Db) of
+        true ->
+            fabric2_users_db:after_doc_read(DDoc, Db);
+        false ->
+            ok
+    end,
+
+    DbName = fabric2_db:name(Db),
+    {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+
+    #mrst{
+        views = Views
+    } = Mrst,
+
+    Args1 = to_mrargs(Args0),
+    Args2 = couch_mrview_util:set_view_type(Args1, ViewName, Views),
+    Args3 = couch_mrview_util:validate_args(Args2),
+    ok = check_range(Args3),
+    case is_reduce_view(Args3) of
+        true -> throw({not_implemented});
+        false -> ok
+    end,
+
+    ok = maybe_update_view(Db, Mrst, Args3),
+
+    try
+        couch_views_reader:read(Db, Mrst, ViewName, Callback, Acc0, Args3)
+    after
+        UpdateAfter = Args3#mrargs.update == lazy,
+        if UpdateAfter == false -> ok; true ->
+            couch_views_jobs:build_view_async(Db, Mrst)
+        end
+    end.
+
+
+maybe_update_view(_Db, _Mrst, #mrargs{update = false}) ->
+    ok;
+
+maybe_update_view(_Db, _Mrst, #mrargs{update = lazy}) ->
+    ok;
+
+maybe_update_view(Db, Mrst, _Args) ->
+    WaitSeq = fabric2_fdb:transactional(Db, fun(TxDb) ->
+        DbSeq = fabric2_db:get_update_seq(TxDb),
+        ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst),
+        case DbSeq == ViewSeq of
+            true -> ready;
+            false -> DbSeq
+        end
+    end),
+
+    if WaitSeq == ready -> ok; true ->
+        couch_views_jobs:build_view(Db, Mrst, WaitSeq)
+    end.
+
+
+is_reduce_view(#mrargs{view_type = ViewType}) ->
+    ViewType =:= red;
+is_reduce_view({Reduce, _, _}) ->
+    Reduce =:= red.
+
+
+to_mrargs(#mrargs{} = Args) ->
+    Args;
+
+to_mrargs(#{} = Args) ->
+    Fields = record_info(fields, mrargs),
+    Indexes = lists:seq(2, record_info(size, mrargs)),
+    LU = lists:zip(Fields, Indexes),
+
+    maps:fold(fun(Key, Value, Acc) ->
+        Index = fabric2_util:get_value(couch_util:to_existing_atom(Key), LU),
+        setelement(Index, Acc, Value)
+    end, #mrargs{}, Args).
+
+
+check_range(#mrargs{start_key = undefined}) ->
+    ok;
+
+check_range(#mrargs{end_key = undefined}) ->
+    ok;
+
+check_range(#mrargs{start_key = K, end_key = K}) ->
+    ok;
+
+check_range(Args) ->
+    #mrargs{
+        direction = Dir,
+        start_key = SK,
+        start_key_docid = SKD,
+        end_key = EK,
+        end_key_docid = EKD
+    } = Args,
+
+    case {Dir, view_cmp(SK, SKD, EK, EKD)} of
+        {fwd, false} ->
+            throw(check_range_error(<<"true">>));
+        {rev, true} ->
+            throw(check_range_error(<<"false">>));
+        _ ->
+            ok
+    end.
+
+
+check_range_error(Descending) ->
+    {query_parse_error,
+        <<"No rows can match your key range, reverse your ",
+            "start_key and end_key or set descending=",
+            Descending/binary>>}.
+
+
+view_cmp(SK, SKD, EK, EKD) ->
+    BinSK = couch_views_encoding:encode(SK, key),
+    BinEK = couch_views_encoding:encode(EK, key),
+    PackedSK = erlfdb_tuple:pack({BinSK, SKD}),
+    PackedEK = erlfdb_tuple:pack({BinEK, EKD}),
+    PackedSK =< PackedEK.
diff --git a/src/couch_views/src/couch_views_app.erl b/src/couch_views/src/couch_views_app.erl
new file mode 100644
index 0000000..5ede5ef
--- /dev/null
+++ b/src/couch_views/src/couch_views_app.erl
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_views_app).
+
+
+-behaviour(application).
+
+
+-export([
+    start/2,
+    stop/1
+]).
+
+
+start(_StartType, StartArgs) ->
+    couch_views_sup:start_link(StartArgs).
+
+
+stop(_State) ->
+    ok.
diff --git a/src/couch_views/src/couch_views_encoding.erl b/src/couch_views/src/couch_views_encoding.erl
new file mode 100644
index 0000000..ef5fed9
--- /dev/null
+++ b/src/couch_views/src/couch_views_encoding.erl
@@ -0,0 +1,105 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_encoding).
+
+
+-export([
+    encode/1,
+    encode/2,
+    decode/1
+]).
+
+
+-define(NULL, 0).
+-define(FALSE, 1).
+-define(TRUE, 2).
+-define(NUMBER, 3).
+-define(STRING, 4).
+-define(LIST, 5).
+-define(OBJECT, 6).
+
+
+encode(X) ->
+    encode(X, value).
+
+
+encode(X, Type) when Type == key; Type == value ->
+    erlfdb_tuple:pack(encode_int(X, Type)).
+
+
+decode(Encoded) ->
+    Val = erlfdb_tuple:unpack(Encoded),
+    decode_int(Val).
+
+
+encode_int(null, _Type) ->
+    {?NULL};
+
+encode_int(false, _Type) ->
+    {?FALSE};
+
+encode_int(true, _Type) ->
+    {?TRUE};
+
+encode_int(Num, key) when is_number(Num) ->
+    {?NUMBER, float(Num)};
+
+encode_int(Num, value) when is_number(Num) ->
+    {?NUMBER, Num};
+
+encode_int(Bin, key) when is_binary(Bin) ->
+    {?STRING, couch_util:get_sort_key(Bin)};
+
+encode_int(Bin, value) when is_binary(Bin) ->
+    {?STRING, Bin};
+
+encode_int(List, Type) when is_list(List) ->
+    Encoded = lists:map(fun(Item) ->
+        encode_int(Item, Type)
+    end, List),
+    {?LIST, list_to_tuple(Encoded)};
+
+encode_int({Props}, Type) when is_list(Props) ->
+    Encoded = lists:map(fun({K, V}) ->
+        EK = encode_int(K, Type),
+        EV = encode_int(V, Type),
+        {EK, EV}
+    end, Props),
+    {?OBJECT, list_to_tuple(Encoded)}.
+
+
+decode_int({?NULL}) ->
+    null;
+
+decode_int({?FALSE}) ->
+    false;
+
+decode_int({?TRUE}) ->
+    true;
+
+decode_int({?STRING, Bin}) ->
+    Bin;
+
+decode_int({?NUMBER, Num}) ->
+    Num;
+
+decode_int({?LIST, List}) ->
+    lists:map(fun decode_int/1, tuple_to_list(List));
+
+decode_int({?OBJECT, Object}) ->
+    Props = lists:map(fun({EK, EV}) ->
+        K = decode_int(EK),
+        V = decode_int(EV),
+        {K, V}
+    end, tuple_to_list(Object)),
+    {Props}.
diff --git a/src/couch_views/src/couch_views_fdb.erl b/src/couch_views/src/couch_views_fdb.erl
new file mode 100644
index 0000000..60ce300
--- /dev/null
+++ b/src/couch_views/src/couch_views_fdb.erl
@@ -0,0 +1,438 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_fdb).
+
+-export([
+    get_update_seq/2,
+    set_update_seq/3,
+
+    get_row_count/3,
+    get_kv_size/3,
+
+    fold_map_idx/6,
+
+    write_doc/4
+]).
+
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
+-define(LIST_VALUE, 0).
+-define(JSON_VALUE, 1).
+-define(VALUE, 2).
+
+
+-include("couch_views.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+
+% View Build Sequence Access
+% (<db>, ?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ) = Sequence
+
+
+get_update_seq(TxDb, #mrst{sig = Sig}) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    case erlfdb:wait(erlfdb:get(Tx, seq_key(DbPrefix, Sig))) of
+        not_found -> <<>>;
+        UpdateSeq -> UpdateSeq
+    end.
+
+
+set_update_seq(TxDb, Sig, Seq) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+    ok = erlfdb:set(Tx, seq_key(DbPrefix, Sig), Seq).
+
+
+get_row_count(TxDb, #mrst{sig = Sig}, ViewId) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    case erlfdb:wait(erlfdb:get(Tx, row_count_key(DbPrefix, Sig, ViewId))) of
+        not_found -> 0; % Can this happen?
+        CountBin -> ?bin2uint(CountBin)
+    end.
+
+
+get_kv_size(TxDb, #mrst{sig = Sig}, ViewId) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    case erlfdb:wait(erlfdb:get(Tx, kv_size_key(DbPrefix, Sig, ViewId))) of
+        not_found -> 0; % Can this happen?
+        SizeBin -> ?bin2uint(SizeBin)
+    end.
+
+
+fold_map_idx(TxDb, Sig, ViewId, Options, Callback, Acc0) ->
+    #{
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    MapIdxPrefix = map_idx_prefix(DbPrefix, Sig, ViewId),
+    FoldAcc = #{
+        prefix => MapIdxPrefix,
+        sort_key => undefined,
+        docid => undefined,
+        dupe_id => undefined,
+        callback => Callback,
+        acc => Acc0
+    },
+
+    {Fun, Acc} = case fabric2_util:get_value(dir, Options, fwd) of
+        fwd ->
+            FwdAcc = FoldAcc#{
+                next => key,
+                key => undefined
+            },
+            {fun fold_fwd/2, FwdAcc};
+        rev ->
+            RevAcc = FoldAcc#{
+                next => value,
+                value => undefined
+            },
+            {fun fold_rev/2, RevAcc}
+    end,
+
+    #{
+        acc := Acc1
+    } = fabric2_fdb:fold_range(TxDb, MapIdxPrefix, Fun, Acc, Options),
+
+    Acc1.
+
+
+write_doc(TxDb, Sig, _ViewIds, #{deleted := true} = Doc) ->
+    #{
+        id := DocId
+    } = Doc,
+
+    ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
+
+    clear_id_idx(TxDb, Sig, DocId),
+    lists:foreach(fun({ViewId, TotalKeys, TotalSize, UniqueKeys}) ->
+        clear_map_idx(TxDb, Sig, ViewId, DocId, UniqueKeys),
+        update_row_count(TxDb, Sig, ViewId, -TotalKeys),
+        update_kv_size(TxDb, Sig, ViewId, -TotalSize)
+    end, ExistingViewKeys);
+
+write_doc(TxDb, Sig, ViewIds, Doc) ->
+    #{
+        id := DocId,
+        results := Results
+    } = Doc,
+
+    ExistingViewKeys = get_view_keys(TxDb, Sig, DocId),
+
+    clear_id_idx(TxDb, Sig, DocId),
+
+    lists:foreach(fun({ViewId, NewRows}) ->
+        update_id_idx(TxDb, Sig, ViewId, DocId, NewRows),
+
+        ExistingKeys = case lists:keyfind(ViewId, 1, ExistingViewKeys) of
+            {ViewId, TotalRows, TotalSize, EKeys} ->
+                RowChange = length(NewRows) - TotalRows,
+                SizeChange = calculate_row_size(NewRows) - TotalSize,
+                update_row_count(TxDb, Sig, ViewId, RowChange),
+                update_kv_size(TxDb, Sig, ViewId, SizeChange),
+                EKeys;
+            false ->
+                RowChange = length(NewRows),
+                SizeChange = calculate_row_size(NewRows),
+                update_row_count(TxDb, Sig, ViewId, RowChange),
+                update_kv_size(TxDb, Sig, ViewId, SizeChange),
+                []
+        end,
+        update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows)
+    end, lists:zip(ViewIds, Results)).
+
+
+% For each row in a map view there are two rows stored in
+% FoundationDB:
+%
+%   `(EncodedSortKey, EncodedKey)`
+%   `(EncodedSortKey, EncodedValue)`
+%
+% The difference between `EncodedSortKey` and `EndcodedKey` is
+% the use of `couch_util:get_sort_key/1` which turns UTF-8
+% strings into binaries that are byte comparable. Given a sort
+% key binary we cannot recover the input so to return unmodified
+% user data we are forced to store the original.
+%
+% These two fold functions exist so that we can be fairly
+% forceful on our assertions about which rows to see. Since
+% when we're folding forward we'll see the key first. When
+% `descending=true` and we're folding in reverse we'll see
+% the value first.
+
+fold_fwd({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
+    #{
+        prefix := Prefix
+    } = Acc,
+
+    {{SortKey, DocId}, DupeId, ?VIEW_ROW_KEY} =
+            erlfdb_tuple:unpack(RowKey, Prefix),
+    Acc#{
+        next := value,
+        key := couch_views_encoding:decode(EncodedOriginalKey),
+        sort_key := SortKey,
+        docid := DocId,
+        dupe_id := DupeId
+    };
+
+fold_fwd({RowKey, EncodedValue}, #{next := value} = Acc) ->
+    #{
+        prefix := Prefix,
+        key := Key,
+        sort_key := SortKey,
+        docid := DocId,
+        dupe_id := DupeId,
+        callback := UserCallback,
+        acc := UserAcc0
+    } = Acc,
+
+    % We're asserting there that this row is paired
+    % correctly with the previous row by relying on
+    % a badmatch if any of these values don't match.
+    {{SortKey, DocId}, DupeId, ?VIEW_ROW_VALUE} =
+            erlfdb_tuple:unpack(RowKey, Prefix),
+
+    Value = couch_views_encoding:decode(EncodedValue),
+    UserAcc1 = UserCallback(DocId, Key, Value, UserAcc0),
+
+    Acc#{
+        next := key,
+        key := undefined,
+        sort_key := undefined,
+        docid := undefined,
+        dupe_id := undefined,
+        acc := UserAcc1
+    }.
+
+
+fold_rev({RowKey, EncodedValue}, #{next := value} = Acc) ->
+    #{
+        prefix := Prefix
+    } = Acc,
+
+    {{SortKey, DocId}, DupeId, ?VIEW_ROW_VALUE} =
+            erlfdb_tuple:unpack(RowKey, Prefix),
+    Acc#{
+        next := key,
+        value := couch_views_encoding:decode(EncodedValue),
+        sort_key := SortKey,
+        docid := DocId,
+        dupe_id := DupeId
+    };
+
+fold_rev({RowKey, EncodedOriginalKey}, #{next := key} = Acc) ->
+    #{
+        prefix := Prefix,
+        value := Value,
+        sort_key := SortKey,
+        docid := DocId,
+        dupe_id := DupeId,
+        callback := UserCallback,
+        acc := UserAcc0
+    } = Acc,
+
+    % We're asserting there that this row is paired
+    % correctly with the previous row by relying on
+    % a badmatch if any of these values don't match.
+    {{SortKey, DocId}, DupeId, ?VIEW_ROW_KEY} =
+            erlfdb_tuple:unpack(RowKey, Prefix),
+
+    Key = couch_views_encoding:decode(EncodedOriginalKey),
+    UserAcc1 = UserCallback(DocId, Key, Value, UserAcc0),
+
+    Acc#{
+        next := value,
+        value := undefined,
+        sort_key := undefined,
+        docid := undefined,
+        dupe_id := undefined,
+        acc := UserAcc1
+    }.
+
+
+clear_id_idx(TxDb, Sig, DocId) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    {Start, End} = id_idx_range(DbPrefix, Sig, DocId),
+    ok = erlfdb:clear_range(Tx, Start, End).
+
+
+clear_map_idx(TxDb, Sig, ViewId, DocId, ViewKeys) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    lists:foreach(fun(ViewKey) ->
+        {Start, End} = map_idx_range(DbPrefix, Sig, ViewId, ViewKey, DocId),
+        ok = erlfdb:clear_range(Tx, Start, End)
+    end, ViewKeys).
+
+
+update_id_idx(TxDb, Sig, ViewId, DocId, NewRows) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    Unique = lists:usort([K || {K, _V} <- NewRows]),
+
+    Key = id_idx_key(DbPrefix, Sig, DocId, ViewId),
+    RowSize = calculate_row_size(NewRows),
+    Val = couch_views_encoding:encode([length(NewRows), RowSize, Unique]),
+    ok = erlfdb:set(Tx, Key, Val).
+
+
+update_map_idx(TxDb, Sig, ViewId, DocId, ExistingKeys, NewRows) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    Unique = lists:usort([K || {K, _V} <- NewRows]),
+
+    KeysToRem = ExistingKeys -- Unique,
+    lists:foreach(fun(RemKey) ->
+        {Start, End} = map_idx_range(DbPrefix, Sig, ViewId, RemKey, DocId),
+        ok = erlfdb:clear_range(Tx, Start, End)
+    end, KeysToRem),
+
+    KVsToAdd = process_rows(NewRows),
+    MapIdxPrefix = map_idx_prefix(DbPrefix, Sig, ViewId),
+
+    lists:foreach(fun({DupeId, Key1, Key2, Val}) ->
+        KK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_KEY),
+        VK = map_idx_key(MapIdxPrefix, {Key1, DocId}, DupeId, ?VIEW_ROW_VALUE),
+        ok = erlfdb:set(Tx, KK, Key2),
+        ok = erlfdb:set(Tx, VK, Val)
+    end, KVsToAdd).
+
+
+get_view_keys(TxDb, Sig, DocId) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+    {Start, End} = id_idx_range(DbPrefix, Sig, DocId),
+    lists:map(fun({K, V}) ->
+        {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId} =
+                erlfdb_tuple:unpack(K, DbPrefix),
+        [TotalKeys, TotalSize, UniqueKeys] = couch_views_encoding:decode(V),
+        {ViewId, TotalKeys, TotalSize, UniqueKeys}
+    end, erlfdb:get_range(Tx, Start, End, [])).
+
+
+update_row_count(TxDb, Sig, ViewId, Increment) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+    Key = row_count_key(DbPrefix, Sig, ViewId),
+    erlfdb:add(Tx, Key, Increment).
+
+
+update_kv_size(TxDb, Sig, ViewId, Increment) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+    Key = kv_size_key(DbPrefix, Sig, ViewId),
+    erlfdb:add(Tx, Key, Increment).
+
+
+seq_key(DbPrefix, Sig) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_UPDATE_SEQ},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+row_count_key(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_ID_INFO, ViewId, ?VIEW_ROW_COUNT},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+kv_size_key(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_ID_INFO, ViewId, ?VIEW_KV_SIZE},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+id_idx_key(DbPrefix, Sig, DocId, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId, ViewId},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+id_idx_range(DbPrefix, Sig, DocId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_ID_RANGE, DocId},
+    erlfdb_tuple:range(Key, DbPrefix).
+
+
+map_idx_prefix(DbPrefix, Sig, ViewId) ->
+    Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId},
+    erlfdb_tuple:pack(Key, DbPrefix).
+
+
+map_idx_key(MapIdxPrefix, MapKey, DupeId, Type) ->
+    Key = {MapKey, DupeId, Type},
+    erlfdb_tuple:pack(Key, MapIdxPrefix).
+
+
+map_idx_range(DbPrefix, Sig, ViewId, MapKey, DocId) ->
+    Encoded = couch_views_encoding:encode(MapKey, key),
+    Key = {?DB_VIEWS, Sig, ?VIEW_MAP_RANGE, ViewId, {Encoded, DocId}},
+    erlfdb_tuple:range(Key, DbPrefix).
+
+
+process_rows(Rows) ->
+    Encoded = lists:map(fun({K, V}) ->
+        EK1 = couch_views_encoding:encode(K, key),
+        EK2 = couch_views_encoding:encode(K, value),
+        EV = couch_views_encoding:encode(V, value),
+        {EK1, EK2, EV}
+    end, Rows),
+
+    Grouped = lists:foldl(fun({K1, K2, V}, Acc) ->
+        dict:append(K1, {K2, V}, Acc)
+    end, dict:new(), Encoded),
+
+    dict:fold(fun(K1, Vals, DAcc) ->
+        Vals1 = lists:keysort(2, Vals),
+        {_, Labeled} = lists:foldl(fun({K2, V}, {Count, Acc}) ->
+            {Count + 1, [{Count, K1, K2, V} | Acc]}
+        end, {0, []}, Vals1),
+        Labeled ++ DAcc
+    end, [], Grouped).
+
+
+calculate_row_size(Rows) ->
+    lists:foldl(fun({K, V}, Acc) ->
+        Acc + erlang:external_size(K) + erlang:external_size(V)
+    end, 0, Rows).
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
new file mode 100644
index 0000000..a317936
--- /dev/null
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -0,0 +1,261 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_indexer).
+
+-export([
+    spawn_link/0
+]).
+
+
+-export([
+    init/0
+]).
+
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+% TODO:
+%  * Handle timeouts of transaction and other errors
+
+
+spawn_link() ->
+    proc_lib:spawn_link(?MODULE, init, []).
+
+
+init() ->
+    {ok, Job, Data} = couch_jobs:accept(?INDEX_JOB_TYPE, #{}),
+    #{
+        <<"db_name">> := DbName,
+        <<"ddoc_id">> := DDocId,
+        <<"sig">> := JobSig
+    } = Data,
+
+    {ok, Db} = fabric2_db:open(DbName, []),
+    {ok, DDoc} = fabric2_db:open_doc(Db, DDocId),
+    {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+    HexSig = fabric2_util:to_hex(Mrst#mrst.sig),
+
+    if  HexSig == JobSig -> ok; true ->
+        couch_jobs:finish(undefined, Job, Data#{
+            error => sig_changed,
+            reason => <<"Design document was modified">>
+        }),
+        exit(normal)
+    end,
+
+    State = #{
+        tx_db => undefined,
+        db_seq => undefined,
+        view_seq => undefined,
+        last_seq => undefined,
+        job => Job,
+        job_data => Data,
+        count => 0,
+        limit => num_changes(),
+        doc_acc => [],
+        design_opts => Mrst#mrst.design_opts
+    },
+
+    update(Db, Mrst, State).
+
+
+update(#{} = Db, Mrst0, State0) ->
+    {Mrst2, State3} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+        % In the first iteration of update we need
+        % to populate our db and view sequences
+        State1 = case State0 of
+            #{db_seq := undefined} ->
+                ViewSeq = couch_views_fdb:get_update_seq(TxDb, Mrst0),
+                State0#{
+                    tx_db := TxDb,
+                    db_seq := fabric2_db:get_update_seq(TxDb),
+                    view_seq := ViewSeq,
+                    last_seq := ViewSeq
+                };
+            _ ->
+                State0#{
+                    tx_db := TxDb
+                }
+        end,
+
+        {ok, State2} = fold_changes(State1),
+
+        #{
+            count := Count,
+            limit := Limit,
+            doc_acc := DocAcc,
+            last_seq := LastSeq
+        } = State2,
+
+        {Mrst1, MappedDocs} = map_docs(Mrst0, DocAcc),
+        write_docs(TxDb, Mrst1, MappedDocs, State2),
+
+        case Count < Limit of
+            true ->
+                report_progress(State2, finished),
+                {Mrst1, finished};
+            false ->
+                report_progress(State2, update),
+                {Mrst1, State2#{
+                    tx_db := undefined,
+                    count := 0,
+                    doc_acc := [],
+                    view_seq := LastSeq
+                }}
+        end
+    end),
+
+    case State3 of
+        finished ->
+            couch_query_servers:stop_doc_map(Mrst2#mrst.qserver);
+        _ ->
+            update(Db, Mrst2, State3)
+    end.
+
+
+fold_changes(State) ->
+    #{
+        view_seq := SinceSeq,
+        limit := Limit,
+        tx_db := TxDb
+    } = State,
+
+    Fun = fun process_changes/2,
+    fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
+
+
+process_changes(Change, Acc) ->
+    #{
+        doc_acc := DocAcc,
+        count := Count,
+        tx_db := TxDb,
+        design_opts := DesignOpts
+    } = Acc,
+
+    #{
+        id := Id,
+        sequence := LastSeq,
+        deleted := Deleted
+    } = Change,
+
+    IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts),
+
+    Acc1 = case {Id, IncludeDesign} of
+        {<<?DESIGN_DOC_PREFIX, _/binary>>, false} ->
+            maps:merge(Acc, #{
+                count => Count + 1,
+                last_seq => LastSeq
+            });
+        _ ->
+            % Making a note here that we should make fetching all the docs
+            % a parallel fdb operation
+            {ok, Doc} = case Deleted of
+                true -> {ok, []};
+                false -> fabric2_db:open_doc(TxDb, Id)
+            end,
+
+            Change1 = maps:put(doc, Doc, Change),
+            Acc#{
+                doc_acc := DocAcc ++ [Change1],
+                count := Count + 1,
+                last_seq := LastSeq
+            }
+    end,
+    {ok, Acc1}.
+
+
+map_docs(Mrst, Docs) ->
+    % Run all the non deleted docs through the view engine and
+    Mrst1 = start_query_server(Mrst),
+    QServer = Mrst1#mrst.qserver,
+    MapFun = fun
+        (#{deleted := true} = Change) ->
+            Change#{results => []};
+        (#{deleted := false} = Change) ->
+            #{doc := Doc} = Change,
+            couch_stats:increment_counter([couchdb, mrview, map_doc]),
+            {ok, RawResults} = couch_query_servers:map_doc_raw(QServer, Doc),
+            JsonResults = couch_query_servers:raw_to_ejson(RawResults),
+            ListResults = lists:map(fun(ViewResults) ->
+                [list_to_tuple(Res) || Res <- ViewResults]
+            end, JsonResults),
+            Change#{results => ListResults}
+    end,
+    {Mrst1, lists:map(MapFun, Docs)}.
+
+
+write_docs(TxDb, Mrst, Docs, State) ->
+    #mrst{
+        views = Views,
+        sig = Sig
+    } = Mrst,
+
+    #{
+        last_seq := LastSeq
+    } = State,
+
+    ViewIds = [View#mrview.id_num || View <- Views],
+
+    lists:foreach(fun(Doc) ->
+        couch_views_fdb:write_doc(TxDb, Sig, ViewIds, Doc)
+    end, Docs),
+
+    couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq).
+
+
+start_query_server(#mrst{} = Mrst) ->
+    #mrst{
+        language = Language,
+        lib = Lib,
+        views = Views
+    } = Mrst,
+    Defs = [View#mrview.def || View <- Views],
+    {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
+    Mrst#mrst{qserver = QServer}.
+
+
+report_progress(State, UpdateType) ->
+    #{
+        tx_db := TxDb,
+        job := Job,
+        job_data := JobData,
+        last_seq := LastSeq
+    } = State,
+
+    #{
+        <<"db_name">> := DbName,
+        <<"ddoc_id">> := DDocId,
+        <<"sig">> := Sig
+    } = JobData,
+
+    % Reconstruct from scratch to remove any
+    % possible existing error state.
+    NewData = #{
+        <<"db_name">> => DbName,
+        <<"ddoc_id">> => DDocId,
+        <<"sig">> => Sig,
+        <<"view_seq">> => LastSeq
+    },
+
+    case UpdateType of
+        update ->
+            couch_jobs:update(TxDb, Job, NewData);
+        finished ->
+            couch_jobs:finish(TxDb, Job, NewData)
+    end.
+
+
+num_changes() ->
+    config:get_integer("couch_views", "change_limit", 100).
diff --git a/src/couch_views/src/couch_views_jobs.erl b/src/couch_views/src/couch_views_jobs.erl
new file mode 100644
index 0000000..16fc410
--- /dev/null
+++ b/src/couch_views/src/couch_views_jobs.erl
@@ -0,0 +1,109 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_jobs).
+
+-export([
+    set_timeout/0,
+    build_view/3,
+    build_view_async/2
+]).
+
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
+
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include("couch_views.hrl").
+
+
+set_timeout() ->
+    couch_jobs:set_type_timeout(?INDEX_JOB_TYPE, 6 * 1000).
+
+
+build_view(TxDb, Mrst, UpdateSeq) ->
+    {ok, JobId} = build_view_async(TxDb, Mrst),
+    case wait_for_job(JobId, UpdateSeq) of
+        ok -> ok;
+        retry -> build_view(TxDb, Mrst, UpdateSeq)
+    end.
+
+
+build_view_async(TxDb, Mrst) ->
+    JobId = job_id(TxDb, Mrst),
+    JobData = job_data(TxDb, Mrst),
+    ok = couch_jobs:add(undefined, ?INDEX_JOB_TYPE, JobId, JobData),
+    {ok, JobId}.
+
+
+wait_for_job(JobId, UpdateSeq) ->
+    case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of
+        {ok, Subscription, _State, _Data} ->
+            wait_for_job(JobId, Subscription, UpdateSeq);
+        {ok, finished, Data} ->
+            case Data of
+                #{<<"view_sig">> := ViewSeq} when ViewSeq >= UpdateSeq ->
+                    ok;
+                _ ->
+                    retry
+            end
+    end.
+
+
+wait_for_job(JobId, Subscription, UpdateSeq) ->
+    case wait(Subscription) of
+        {error, Error} ->
+            erlang:error(Error);
+        {finished, #{<<"error">> := Error, <<"reason">> := Reason}} ->
+            erlang:error({binary_to_existing_atom(Error, latin1), Reason});
+        {finished, #{<<"view_seq">> := ViewSeq}} when ViewSeq >= UpdateSeq ->
+            ok;
+        {finished, _} ->
+            wait_for_job(JobId, UpdateSeq);
+        {_State, #{<<"view_seq">> := ViewSeq}} when ViewSeq >= UpdateSeq ->
+            couch_jobs:unsubscribe(Subscription),
+            ok;
+        {_, _} ->
+            wait_for_job(JobId, Subscription, UpdateSeq)
+    end.
+
+
+job_id(#{name := DbName}, #mrst{sig = Sig}) ->
+    job_id(DbName, Sig);
+
+job_id(DbName, Sig) ->
+    HexSig = fabric2_util:to_hex(Sig),
+    <<DbName/binary, "-", HexSig/binary>>.
+
+
+job_data(Db, Mrst) ->
+    #mrst{
+        idx_name = DDocId,
+        sig = Sig
+    } = Mrst,
+
+    #{
+        db_name => fabric2_db:name(Db),
+        ddoc_id => DDocId,
+        sig => fabric2_util:to_hex(Sig)
+    }.
+
+
+wait(Subscription) ->
+    case couch_jobs:wait(Subscription, infinity) of
+        {?INDEX_JOB_TYPE, _JobId, JobState, JobData} ->
+            {JobState, JobData};
+        timeout ->
+            {error, timeout}
+    end.
diff --git a/src/couch_views/src/couch_views_reader.erl b/src/couch_views/src/couch_views_reader.erl
new file mode 100644
index 0000000..c7989d8
--- /dev/null
+++ b/src/couch_views/src/couch_views_reader.erl
@@ -0,0 +1,208 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_reader).
+
+-export([
+    read/6
+]).
+
+
+-include("couch_views.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+
+read(Db, Mrst, ViewName, UserCallback, UserAcc0, Args) ->
+    #mrst{
+        language = Lang,
+        sig = Sig,
+        views = Views
+    } = Mrst,
+
+    ViewId = get_view_id(Lang, Args, ViewName, Views),
+    Fun = fun handle_row/4,
+
+    try
+        fabric2_fdb:transactional(Db, fun(TxDb) ->
+            TotalRows = couch_views_fdb:get_row_count(TxDb, Mrst, ViewId),
+
+            Meta = {meta, [{total, TotalRows}, {offset, null}]},
+            UserAcc1 = maybe_stop(UserCallback(Meta, UserAcc0)),
+
+            Acc0 = #{
+                db => TxDb,
+                skip => Args#mrargs.skip,
+                mrargs => undefined,
+                callback => UserCallback,
+                acc => UserAcc1
+            },
+
+            Acc1 = lists:foldl(fun(KeyArgs, KeyAcc0) ->
+                Opts = mrargs_to_fdb_options(KeyArgs),
+                KeyAcc1 = KeyAcc0#{
+                    mrargs := KeyArgs
+                },
+                couch_views_fdb:fold_map_idx(
+                        TxDb,
+                        Sig,
+                        ViewId,
+                        Opts,
+                        Fun,
+                        KeyAcc1
+                    )
+            end, Acc0, expand_keys_args(Args)),
+
+            #{
+                acc := UserAcc2
+            } = Acc1,
+            {ok, maybe_stop(UserCallback(complete, UserAcc2))}
+        end)
+    catch throw:{done, Out} ->
+        {ok, Out}
+    end.
+
+
+handle_row(_DocId, _Key, _Value, #{skip := Skip} = Acc) when Skip > 0 ->
+    Acc#{skip := Skip - 1};
+
+handle_row(DocId, Key, Value, Acc) ->
+    #{
+        db := TxDb,
+        mrargs := Args,
+        callback := UserCallback,
+        acc := UserAcc0
+    } = Acc,
+
+    BaseRow = [
+        {id, DocId},
+        {key, Key},
+        {value, Value}
+    ],
+
+    Row = BaseRow ++ if not Args#mrargs.include_docs -> []; true ->
+        DocOpts0 = Args#mrargs.doc_options,
+        DocOpts1 = DocOpts0 ++ case Args#mrargs.conflicts of
+            true -> [conflicts];
+            _ -> []
+        end,
+
+        {TargetDocId, Rev} = get_doc_id(DocId, Value),
+        DocObj = load_doc(TxDb, TargetDocId, Rev, DocOpts1),
+        [{doc, DocObj}]
+    end,
+
+    UserAcc1 = maybe_stop(UserCallback({row, Row}, UserAcc0)),
+    Acc#{acc := UserAcc1}.
+
+
+get_view_id(Lang, Args, ViewName, Views) ->
+    case couch_mrview_util:extract_view(Lang, Args, ViewName, Views) of
+        {map, View, _Args} -> View#mrview.id_num;
+        {red, {_Idx, _Lang, View}} -> View#mrview.id_num
+    end.
+
+
+expand_keys_args(#mrargs{keys = undefined} = Args) ->
+    [Args];
+
+expand_keys_args(#mrargs{keys = Keys} = Args) ->
+    lists:map(fun(Key) ->
+        Args#mrargs{
+            start_key = Key,
+            end_key = Key
+        }
+    end, Keys).
+
+
+mrargs_to_fdb_options(Args) ->
+    #mrargs{
+        start_key = StartKey0,
+        start_key_docid = StartKeyDocId,
+        end_key = EndKey0,
+        end_key_docid = EndKeyDocId,
+        direction = Direction,
+        limit = Limit,
+        skip = Skip,
+        inclusive_end = InclusiveEnd
+    } = Args,
+
+    StartKey1 = if StartKey0 == undefined -> undefined; true ->
+        couch_views_encoding:encode(StartKey0, key)
+    end,
+
+    StartKeyOpts = case {StartKey1, StartKeyDocId} of
+        {undefined, _} ->
+            [];
+        {StartKey1, StartKeyDocId} ->
+            [{start_key, {StartKey1, StartKeyDocId}}]
+    end,
+
+    EndKey1 = if EndKey0 == undefined -> undefined; true ->
+        couch_views_encoding:encode(EndKey0, key)
+    end,
+
+    EndKeyOpts = case {EndKey1, EndKeyDocId, Direction} of
+        {undefined, _, _} ->
+            [];
+        {EndKey1, <<>>, rev} when not InclusiveEnd ->
+            % When we iterate in reverse with
+            % inclusive_end=false we have to set the
+            % EndKeyDocId to <<255>> so that we don't
+            % include matching rows.
+            [{end_key_gt, {EndKey1, <<255>>}}];
+        {EndKey1, <<255>>, _} when not InclusiveEnd ->
+            % When inclusive_end=false we need to
+            % elide the default end_key_docid so as
+            % to not sort past the docids with the
+            % given end key.
+            [{end_key_gt, {EndKey1}}];
+        {EndKey1, EndKeyDocId, _} when not InclusiveEnd ->
+            [{end_key_gt, {EndKey1, EndKeyDocId}}];
+        {EndKey1, EndKeyDocId, _} when InclusiveEnd ->
+            [{end_key, {EndKey1, EndKeyDocId}}]
+    end,
+
+    [
+        {dir, Direction},
+        {limit, Limit * 2 + Skip * 2},
+        {streaming_mode, want_all}
+    ] ++ StartKeyOpts ++ EndKeyOpts.
+
+
+maybe_stop({ok, Acc}) -> Acc;
+maybe_stop({stop, Acc}) -> throw({done, Acc}).
+
+
+get_doc_id(Id, {Props}) ->
+    DocId = couch_util:get_value(<<"_id">>, Props, Id),
+    Rev = couch_util:get_value(<<"_rev">>, Props, null),
+    {DocId, Rev};
+
+get_doc_id(Id, _Value) ->
+    {Id, null}.
+
+
+load_doc(TxDb, Id, null, DocOpts) ->
+    case fabric2_db:open_doc(TxDb, Id, DocOpts) of
+        {ok, Doc} -> couch_doc:to_json_obj(Doc, DocOpts);
+        {not_found, _} -> null
+    end;
+
+load_doc(TxDb, Id, Rev, DocOpts) ->
+    Rev1 = couch_doc:parse_rev(Rev),
+    case (catch fabric2_db:open_doc_revs(TxDb, Id, [Rev1], DocOpts)) of
+        {ok, [{ok, Doc}]} -> couch_doc:to_json_obj(Doc, DocOpts);
+        {ok, [{{not_found, missing}, Rev}]} -> null;
+        {ok, [_Else]} -> null
+    end.
diff --git a/src/couch_views/src/couch_views_server.erl b/src/couch_views/src/couch_views_server.erl
new file mode 100644
index 0000000..d14216e
--- /dev/null
+++ b/src/couch_views/src/couch_views_server.erl
@@ -0,0 +1,103 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_server).
+
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/0
+]).
+
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-define(MAX_WORKERS, 100).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+    process_flag(trap_exit, true),
+    couch_views_jobs:set_timeout(),
+    St = #{
+        workers => #{},
+        max_workers => max_workers()
+    },
+    {ok, spawn_workers(St)}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'EXIT', Pid, Reason}, St) ->
+    #{workers := Workers} = St,
+    case maps:is_key(Pid, Workers) of
+        true ->
+            if Reason == normal -> ok; true ->
+                LogMsg = "~p : indexer process ~p exited with ~p",
+                couch_log:error(LogMsg, [?MODULE, Pid, Reason])
+            end,
+            NewWorkers = maps:remove(Pid, Workers),
+            {noreply, spawn_workers(St#{workers := NewWorkers})};
+        false ->
+            LogMsg = "~p : unknown process ~p exited with ~p",
+            couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+            {stop, {unknown_pid_exit, Pid}, St}
+    end;
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+spawn_workers(St) ->
+    #{
+        workers := Workers,
+        max_workers := MaxWorkers
+    } = St,
+    case maps:size(Workers) < MaxWorkers of
+        true ->
+            Pid = couch_views_indexer:spawn_link(),
+            NewSt = St#{workers := Workers#{Pid => true}},
+            spawn_workers(NewSt);
+        false ->
+            St
+    end.
+
+
+max_workers() ->
+    config:get_integer("couch_views", "max_workers", ?MAX_WORKERS).
diff --git a/src/couch_views/src/couch_views_sup.erl b/src/couch_views/src/couch_views_sup.erl
new file mode 100644
index 0000000..7650fdf
--- /dev/null
+++ b/src/couch_views/src/couch_views_sup.erl
@@ -0,0 +1,46 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_views_sup).
+
+
+-behaviour(supervisor).
+
+
+-export([
+    start_link/1
+]).
+
+
+-export([
+    init/1
+]).
+
+
+start_link(Args) ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, Args).
+
+
+init([]) ->
+    Flags = #{
+        strategy => one_for_one,
+        intensity => 1,
+        period => 5
+    },
+    Children = [
+        #{
+            id => couch_views_server,
+            start => {couch_views_server, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.
diff --git a/src/couch_views/src/couch_views_util.erl b/src/couch_views/src/couch_views_util.erl
new file mode 100644
index 0000000..b88cfcd
--- /dev/null
+++ b/src/couch_views/src/couch_views_util.erl
@@ -0,0 +1,84 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_util).
+
+
+-export([
+    ddoc_to_mrst/2
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include("couch_views.hrl").
+
+
+ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
+    MakeDict = fun({Name, {MRFuns}}, DictBySrcAcc) ->
+        case couch_util:get_value(<<"map">>, MRFuns) of
+            MapSrc when MapSrc /= undefined ->
+                RedSrc = couch_util:get_value(<<"reduce">>, MRFuns, null),
+                {ViewOpts} = couch_util:get_value(<<"options">>, MRFuns, {[]}),
+                View = case dict:find({MapSrc, ViewOpts}, DictBySrcAcc) of
+                    {ok, View0} -> View0;
+                    error -> #mrview{def=MapSrc, options=ViewOpts}
+                end,
+                {MapNames, RedSrcs} = case RedSrc of
+                    null ->
+                        MNames = [Name | View#mrview.map_names],
+                        {MNames, View#mrview.reduce_funs};
+                    _ ->
+                        RedFuns = [{Name, RedSrc} | View#mrview.reduce_funs],
+                        {View#mrview.map_names, RedFuns}
+                end,
+                View2 = View#mrview{map_names=MapNames, reduce_funs=RedSrcs},
+                dict:store({MapSrc, ViewOpts}, View2, DictBySrcAcc);
+            undefined ->
+                DictBySrcAcc
+        end;
+        ({Name, Else}, DictBySrcAcc) ->
+            couch_log:error("design_doc_to_view_group ~s views ~p",
+                [Name, Else]),
+            DictBySrcAcc
+    end,
+    {DesignOpts} = proplists:get_value(<<"options">>, Fields, {[]}),
+    SeqIndexed = proplists:get_value(<<"seq_indexed">>, DesignOpts, false),
+    KeySeqIndexed = proplists:get_value(<<"keyseq_indexed">>,
+        DesignOpts, false),
+    Partitioned = proplists:get_value(<<"partitioned">>, DesignOpts, false),
+
+    {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
+    BySrc = lists:foldl(MakeDict, dict:new(), RawViews),
+
+    NumViews = fun({_, View}, N) ->
+            {View#mrview{id_num=N, seq_indexed=SeqIndexed,
+                keyseq_indexed=KeySeqIndexed}, N+1}
+    end,
+    {Views, _} = lists:mapfoldl(NumViews, 0, lists:sort(dict:to_list(BySrc))),
+
+    Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>),
+    Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}),
+
+    IdxState = #mrst{
+        db_name=DbName,
+        idx_name=Id,
+        lib=Lib,
+        views=Views,
+        language=Language,
+        design_opts=DesignOpts,
+        seq_indexed=SeqIndexed,
+        keyseq_indexed=KeySeqIndexed,
+        partitioned=Partitioned
+    },
+    SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)},
+    {ok, IdxState#mrst{sig=couch_hash:md5_hash(term_to_binary(SigInfo))}}.
diff --git a/src/couch_views/test/couch_views_encoding_test.erl b/src/couch_views/test/couch_views_encoding_test.erl
new file mode 100644
index 0000000..7c26583
--- /dev/null
+++ b/src/couch_views/test/couch_views_encoding_test.erl
@@ -0,0 +1,94 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_encoding_test).
+
+-include_lib("eunit/include/eunit.hrl").
+
+val_encoding_test() ->
+    Values = [
+        null,
+        true,
+        1.0,
+        <<"a">>,
+        {[{<<"a">>, 1.0}, {<<"b">>, <<"hello">>}]}
+    ],
+    lists:foreach(fun (Val) ->
+        EncVal = couch_views_encoding:encode(Val),
+        ?assertEqual(Val, couch_views_encoding:decode(EncVal))
+    end, Values).
+
+
+correct_ordering_test() ->
+    % Load the ICU driver for couch_util:get_sort_key/1
+    {ok, CfgPid} = gen_server:start_link(config, [], []),
+    {ok, DrvPid} = gen_server:start_link(couch_drv, [], []),
+
+    Ordered = [
+        %  Special values sort before all other types
+        null,
+        false,
+        true,
+
+        % Then numbers
+        1,
+        2,
+        3.0,
+        4,
+
+        % Then text, case sensitive
+        <<"a">>,
+        <<"A">>,
+        <<"aa">>,
+        <<"b">>,
+        <<"B">>,
+        <<"ba">>,
+        <<"bb">>,
+
+        % Then arrays, compared element by element until different.
+        % Longer arrays sort after their prefixes
+        [<<"a">>],
+        [<<"b">>],
+        [<<"b">>, <<"c">>],
+        [<<"b">>, <<"c">>, <<"a">>],
+        [<<"b">>, <<"d">>],
+        [<<"b">>, <<"d">>, <<"e">>],
+
+        % Then objects, compared each key value in the list until different.
+        % Larger objects sort after their subset objects
+        {[{<<"a">>, 1}]},
+        {[{<<"a">>, 2}]},
+        {[{<<"b">>, 1}]},
+        {[{<<"b">>, 2}]},
+
+        % Member order does matter for collation
+        {[{<<"b">>, 2}, {<<"a">>, 1}]},
+        {[{<<"b">>, 2}, {<<"c">>, 2}]}
+    ],
+
+    Encoded = lists:map(fun(Elem) ->
+        K = couch_views_encoding:encode(Elem, key),
+        V = couch_views_encoding:encode(Elem, value),
+        {K, V}
+    end, Ordered),
+    Shuffled = shuffle(Encoded),
+    Reordered = lists:sort(Shuffled),
+
+    lists:foreach(fun({Original, {_K, ViewEncoded}}) ->
+        ?assertEqual(Original, couch_views_encoding:decode(ViewEncoded))
+    end, lists:zip(Ordered, Reordered)).
+
+
+shuffle(List) when is_list(List) ->
+    Tagged = [{rand:uniform(), Item} || Item <- List],
+    {_, Randomized} = lists:unzip(lists:sort(Tagged)),
+    Randomized.
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
new file mode 100644
index 0000000..02c8cee
--- /dev/null
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -0,0 +1,456 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_indexer_test).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+-define(I_HEART_EUNIT(Tests), [{with, [T]} || T <- Tests]).
+
+
+indexer_test_() ->
+    {
+        "Test view indexing",
+        {
+            setup,
+            fun setup/0,
+            fun cleanup/1,
+            {
+                foreach,
+                fun foreach_setup/0,
+                fun foreach_teardown/1,
+                ?I_HEART_EUNIT([
+                    fun indexed_empty_db/1,
+                    fun indexed_single_doc/1,
+                    fun updated_docs_are_reindexed/1,
+                    fun updated_docs_without_changes_are_reindexed/1,
+                    fun deleted_docs_not_indexed/1,
+                    fun deleted_docs_are_unindexed/1,
+                    fun multipe_docs_with_same_key/1,
+                    fun multipe_keys_from_same_doc/1,
+                    fun multipe_identical_keys_from_same_doc/1
+                ])
+            }
+        }
+    }.
+
+
+setup() ->
+    Ctx = test_util:start_couch([
+            fabric,
+            couch_jobs,
+            couch_views
+        ]),
+    Ctx.
+
+
+cleanup(Ctx) ->
+    test_util:stop_couch(Ctx).
+
+
+foreach_setup() ->
+    {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]),
+    Db.
+
+
+foreach_teardown(Db) ->
+    ok = fabric2_db:delete(fabric2_db:name(Db), []).
+
+
+indexed_empty_db(Db) ->
+    DDoc = create_ddoc(),
+
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, Out} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([], Out).
+
+
+indexed_single_doc(Db) ->
+    DDoc = create_ddoc(),
+    Doc1 = doc(0),
+
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, _} = fabric2_db:update_doc(Db, Doc1, []),
+
+    {ok, Out} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([{row, [
+            {id, <<"0">>},
+            {key, 0},
+            {value, 0}
+        ]}], Out).
+
+
+updated_docs_are_reindexed(Db) ->
+    DDoc = create_ddoc(),
+    Doc1 = doc(0),
+
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, {Pos, Rev}} = fabric2_db:update_doc(Db, Doc1, []),
+
+    {ok, Out1} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([{row, [
+            {id, <<"0">>},
+            {key, 0},
+            {value, 0}
+        ]}], Out1),
+
+    Doc2 = Doc1#doc{
+        revs = {Pos, [Rev]},
+        body = {[{<<"val">>, 1}]}
+    },
+    {ok, _} = fabric2_db:update_doc(Db, Doc2, []),
+
+    {ok, Out2} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([{row, [
+            {id, <<"0">>},
+            {key, 1},
+            {value, 1}
+        ]}], Out2),
+
+    % Check that our id index is updated properly
+    % as well.
+    DbName = fabric2_db:name(Db),
+    {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+    Sig = Mrst#mrst.sig,
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        ?assertMatch(
+                [{0, 1, _, [1]}, {1, 0, 0, []}],
+                couch_views_fdb:get_view_keys(TxDb, Sig, <<"0">>)
+            )
+    end).
+
+
+updated_docs_without_changes_are_reindexed(Db) ->
+    DDoc = create_ddoc(),
+    Doc1 = doc(0),
+
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, {Pos, Rev}} = fabric2_db:update_doc(Db, Doc1, []),
+
+    {ok, Out1} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([{row, [
+            {id, <<"0">>},
+            {key, 0},
+            {value, 0}
+        ]}], Out1),
+
+    Doc2 = Doc1#doc{
+        revs = {Pos, [Rev]},
+        body = {[{<<"val">>, 0}]}
+    },
+    {ok, _} = fabric2_db:update_doc(Db, Doc2, []),
+
+    {ok, Out2} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([{row, [
+            {id, <<"0">>},
+            {key, 0},
+            {value, 0}
+        ]}], Out2),
+
+    % Check fdb directly to make sure we've also
+    % removed the id idx keys properly.
+    DbName = fabric2_db:name(Db),
+    {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+    Sig = Mrst#mrst.sig,
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        ?assertMatch(
+                [{0, 1, _, [0]}, {1, 0, 0, []}],
+                couch_views_fdb:get_view_keys(TxDb, Sig, <<"0">>)
+            )
+    end).
+
+
+deleted_docs_not_indexed(Db) ->
+    DDoc = create_ddoc(),
+    Doc1 = doc(0),
+
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, {Pos, Rev}} = fabric2_db:update_doc(Db, Doc1, []),
+    Doc2 = Doc1#doc{
+        revs = {Pos, [Rev]},
+        deleted = true,
+        body = {[{<<"val">>, 1}]}
+    },
+    {ok, _} = fabric2_db:update_doc(Db, Doc2, []),
+
+    {ok, Out} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([], Out).
+
+
+deleted_docs_are_unindexed(Db) ->
+    DDoc = create_ddoc(),
+    Doc1 = doc(0),
+
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, {Pos, Rev}} = fabric2_db:update_doc(Db, Doc1, []),
+
+    {ok, Out1} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([{row, [
+            {id, <<"0">>},
+            {key, 0},
+            {value, 0}
+        ]}], Out1),
+
+    Doc2 = Doc1#doc{
+        revs = {Pos, [Rev]},
+        deleted = true,
+        body = {[{<<"val">>, 1}]}
+    },
+    {ok, _} = fabric2_db:update_doc(Db, Doc2, []),
+
+    {ok, Out2} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([], Out2),
+
+    % Check fdb directly to make sure we've also
+    % removed the id idx keys properly.
+    DbName = fabric2_db:name(Db),
+    {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+    Sig = Mrst#mrst.sig,
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        ?assertEqual([], couch_views_fdb:get_view_keys(TxDb, Sig, <<"0">>))
+    end).
+
+
+multipe_docs_with_same_key(Db) ->
+    DDoc = create_ddoc(),
+    Doc1 = doc(0, 1),
+    Doc2 = doc(1, 1),
+
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, _} = fabric2_db:update_docs(Db, [Doc1, Doc2], []),
+
+    {ok, Out} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([
+            {row, [
+                {id, <<"0">>},
+                {key, 1},
+                {value, 1}
+            ]},
+            {row, [
+                {id, <<"1">>},
+                {key, 1},
+                {value, 1}
+            ]}
+        ], Out).
+
+
+multipe_keys_from_same_doc(Db) ->
+    DDoc = create_ddoc(multi_emit_different),
+    Doc = doc(0, 1),
+
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, _} = fabric2_db:update_doc(Db, Doc, []),
+
+    {ok, Out} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([
+            {row, [
+                {id, <<"0">>},
+                {key, 1},
+                {value, 1}
+            ]},
+            {row, [
+                {id, <<"0">>},
+                {key, <<"0">>},
+                {value, <<"0">>}
+            ]}
+        ], Out).
+
+
+multipe_identical_keys_from_same_doc(Db) ->
+    DDoc = create_ddoc(multi_emit_same),
+    Doc = doc(0, 1),
+
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, _} = fabric2_db:update_doc(Db, Doc, []),
+
+    {ok, Out} = couch_views:query(
+            Db,
+            DDoc,
+            <<"map_fun1">>,
+            fun fold_fun/2,
+            [],
+            #mrargs{}
+        ),
+
+    ?assertEqual([
+            {row, [
+                {id, <<"0">>},
+                {key, 1},
+                {value, 1}
+            ]},
+            {row, [
+                {id, <<"0">>},
+                {key, 1},
+                {value, 2}
+            ]}
+        ], Out).
+
+
+fold_fun({meta, _Meta}, Acc) ->
+    {ok, Acc};
+fold_fun({row, _} = Row, Acc) ->
+    {ok, [Row | Acc]};
+fold_fun(complete, Acc) ->
+    {ok, lists:reverse(Acc)}.
+
+
+create_ddoc() ->
+    create_ddoc(simple).
+
+
+create_ddoc(simple) ->
+    couch_doc:from_json_obj({[
+        {<<"_id">>, <<"_design/bar">>},
+        {<<"views">>, {[
+            {<<"map_fun1">>, {[
+                {<<"map">>, <<"function(doc) {emit(doc.val, doc.val);}">>}
+            ]}},
+            {<<"map_fun2">>, {[
+                {<<"map">>, <<"function(doc) {}">>}
+            ]}}
+        ]}}
+    ]});
+
+create_ddoc(multi_emit_different) ->
+    couch_doc:from_json_obj({[
+        {<<"_id">>, <<"_design/bar">>},
+        {<<"views">>, {[
+            {<<"map_fun1">>, {[
+                {<<"map">>, <<"function(doc) { "
+                    "emit(doc._id, doc._id); "
+                    "emit(doc.val, doc.val); "
+                "}">>}
+            ]}},
+            {<<"map_fun2">>, {[
+                {<<"map">>, <<"function(doc) {}">>}
+            ]}}
+        ]}}
+    ]});
+
+create_ddoc(multi_emit_same) ->
+    couch_doc:from_json_obj({[
+        {<<"_id">>, <<"_design/bar">>},
+        {<<"views">>, {[
+            {<<"map_fun1">>, {[
+                {<<"map">>, <<"function(doc) { "
+                    "emit(doc.val, doc.val * 2); "
+                    "emit(doc.val, doc.val); "
+                "}">>}
+            ]}},
+            {<<"map_fun2">>, {[
+                {<<"map">>, <<"function(doc) {}">>}
+            ]}}
+        ]}}
+    ]}).
+
+
+doc(Id) ->
+    doc(Id, Id).
+
+
+doc(Id, Val) ->
+    couch_doc:from_json_obj({[
+        {<<"_id">>, list_to_binary(integer_to_list(Id))},
+        {<<"val">>, Val}
+    ]}).
diff --git a/src/couch_views/test/couch_views_map_test.erl b/src/couch_views/test/couch_views_map_test.erl
new file mode 100644
index 0000000..0b0ab68
--- /dev/null
+++ b/src/couch_views/test/couch_views_map_test.erl
@@ -0,0 +1,517 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_map_test).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(TDEF(A), {atom_to_list(A), fun A/0}).
+
+
+setup() ->
+    test_util:start_couch([fabric, couch_jobs, couch_views]).
+
+
+teardown(State) ->
+    test_util:stop_couch(State).
+
+
+map_views_test_() ->
+    {
+        "Map views",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                ?TDEF(should_map),
+                ?TDEF(should_map_with_startkey),
+                ?TDEF(should_map_with_endkey),
+                ?TDEF(should_map_with_endkey_not_inclusive),
+                ?TDEF(should_map_reverse_and_limit),
+                ?TDEF(should_map_with_range_reverse),
+                ?TDEF(should_map_with_limit_and_skip),
+                ?TDEF(should_map_with_limit_and_skip_reverse),
+                ?TDEF(should_map_with_include_docs),
+                ?TDEF(should_map_with_include_docs_reverse),
+                ?TDEF(should_map_with_startkey_with_key_array),
+                ?TDEF(should_map_with_startkey_and_endkey_with_key_array),
+                ?TDEF(should_map_empty_views),
+                ?TDEF(should_map_duplicate_keys),
+                ?TDEF(should_map_with_doc_emit),
+                ?TDEF(should_map_update_is_false),
+                ?TDEF(should_map_update_is_lazy)
+                % fun should_give_ext_size_seq_indexed_test/1
+            ]
+        }
+    }.
+
+
+should_map() ->
+    Result = run_query(<<"baz">>, #{}),
+    Expect = {ok, [
+        {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+        {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+        {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+        {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+        {row, [{id, <<"5">>}, {key, 5}, {value, 5}]},
+        {row, [{id, <<"6">>}, {key, 6}, {value, 6}]},
+        {row, [{id, <<"7">>}, {key, 7}, {value, 7}]},
+        {row, [{id, <<"8">>}, {key, 8}, {value, 8}]},
+        {row, [{id, <<"9">>}, {key, 9}, {value, 9}]},
+        {row, [{id, <<"10">>}, {key, 10}, {value, 10}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_with_startkey() ->
+    Result = run_query(<<"baz">>, #{start_key => 4}),
+    Expect = {ok, [
+        {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+        {row, [{id, <<"5">>}, {key, 5}, {value, 5}]},
+        {row, [{id, <<"6">>}, {key, 6}, {value, 6}]},
+        {row, [{id, <<"7">>}, {key, 7}, {value, 7}]},
+        {row, [{id, <<"8">>}, {key, 8}, {value, 8}]},
+        {row, [{id, <<"9">>}, {key, 9}, {value, 9}]},
+        {row, [{id, <<"10">>}, {key, 10}, {value, 10}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_with_endkey() ->
+    Result = run_query(<<"baz">>, #{end_key => 5}),
+    Expect = {ok, [
+        {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+        {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+        {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+        {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+        {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_with_endkey_not_inclusive() ->
+    Result = run_query(<<"baz">>, #{
+        end_key => 5,
+        inclusive_end => false
+    }),
+    Expect = {ok, [
+        {row, [{id, <<"1">>}, {key, 1}, {value, 1}]},
+        {row, [{id, <<"2">>}, {key, 2}, {value, 2}]},
+        {row, [{id, <<"3">>}, {key, 3}, {value, 3}]},
+        {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_reverse_and_limit() ->
+    Result = run_query(<<"baz">>, #{
+        direction => rev,
+        limit => 3
+    }),
+    Expect = {ok, [
+        {row, [{id, <<"10">>}, {key, 10}, {value, 10}]},
+        {row, [{id, <<"9">>}, {key, 9}, {value, 9}]},
+        {row, [{id, <<"8">>}, {key, 8}, {value, 8}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_with_range_reverse() ->
+    Result = run_query(<<"baz">>, #{
+        direction => rev,
+        start_key => 5,
+        end_key => 3,
+        inclusive_end => true
+    }),
+    Expect = {ok, [
+        {row, [{id, <<"5">>}, {key, 5}, {value, 5}]},
+        {row, [{id, <<"4">>}, {key, 4}, {value, 4}]},
+        {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_with_limit_and_skip() ->
+    Result = run_query(<<"baz">>, #{
+        start_key => 2,
+        limit => 3,
+        skip => 3
+    }),
+    Expect = {ok, [
+        {row, [{id, <<"5">>}, {key, 5}, {value, 5}]},
+        {row, [{id, <<"6">>}, {key, 6}, {value, 6}]},
+        {row, [{id, <<"7">>}, {key, 7}, {value, 7}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_with_limit_and_skip_reverse() ->
+    Result = run_query(<<"baz">>, #{
+        start_key => 10,
+        limit => 3,
+        skip => 3,
+        direction => rev
+    }),
+    Expect = {ok, [
+        {row, [{id, <<"7">>}, {key, 7}, {value, 7}]},
+        {row, [{id, <<"6">>}, {key, 6}, {value, 6}]},
+        {row, [{id, <<"5">>}, {key, 5}, {value, 5}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_with_include_docs() ->
+    Result = run_query(<<"baz">>, #{
+        start_key => 8,
+        end_key => 8,
+        include_docs => true
+    }),
+    Doc = {[
+        {<<"_id">>, <<"8">>},
+        {<<"_rev">>, <<"1-55b9a29311341e07ec0a7ca13bc1b59f">>},
+        {<<"val">>, 8}
+    ]},
+    Expect = {ok, [
+        {row, [{id, <<"8">>}, {key, 8}, {value, 8}, {doc, Doc}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_with_include_docs_reverse() ->
+    Result = run_query(<<"baz">>, #{
+        start_key => 8,
+        end_key => 8,
+        include_docs => true,
+        direction => rev
+    }),
+    Doc = {[
+        {<<"_id">>, <<"8">>},
+        {<<"_rev">>, <<"1-55b9a29311341e07ec0a7ca13bc1b59f">>},
+        {<<"val">>, 8}
+    ]},
+    Expect = {ok, [
+        {row, [{id, <<"8">>}, {key, 8}, {value, 8}, {doc, Doc}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_with_startkey_with_key_array() ->
+    Rows = [
+        {row, [{id, <<"4">>}, {key, [<<"4">>, 4]}, {value, 4}]},
+        {row, [{id, <<"5">>}, {key, [<<"5">>, 5]}, {value, 5}]},
+        {row, [{id, <<"6">>}, {key, [<<"6">>, 6]}, {value, 6}]},
+        {row, [{id, <<"7">>}, {key, [<<"7">>, 7]}, {value, 7}]},
+        {row, [{id, <<"8">>}, {key, [<<"8">>, 8]}, {value, 8}]},
+        {row, [{id, <<"9">>}, {key, [<<"9">>, 9]}, {value, 9}]}
+    ],
+
+    Result = run_query(<<"boom">>, #{
+        start_key => [<<"4">>]
+    }),
+
+    ?assertEqual({ok, Rows}, Result),
+
+    ResultRev = run_query(<<"boom">>, #{
+        start_key => [<<"9">>, 9],
+        direction => rev,
+        limit => 6
+    }),
+
+    ?assertEqual({ok, lists:reverse(Rows)}, ResultRev).
+
+
+should_map_with_startkey_and_endkey_with_key_array() ->
+    Rows1 = [
+        {row, [{id, <<"4">>}, {key, [<<"4">>, 4]}, {value, 4}]},
+        {row, [{id, <<"5">>}, {key, [<<"5">>, 5]}, {value, 5}]},
+        {row, [{id, <<"6">>}, {key, [<<"6">>, 6]}, {value, 6}]},
+        {row, [{id, <<"7">>}, {key, [<<"7">>, 7]}, {value, 7}]},
+        {row, [{id, <<"8">>}, {key, [<<"8">>, 8]}, {value, 8}]}
+    ],
+
+    Rows2 = [
+        {row, [{id, <<"4">>}, {key, [<<"4">>, 4]}, {value, 4}]},
+        {row, [{id, <<"5">>}, {key, [<<"5">>, 5]}, {value, 5}]},
+        {row, [{id, <<"6">>}, {key, [<<"6">>, 6]}, {value, 6}]},
+        {row, [{id, <<"7">>}, {key, [<<"7">>, 7]}, {value, 7}]},
+        {row, [{id, <<"8">>}, {key, [<<"8">>, 8]}, {value, 8}]},
+        {row, [{id, <<"9">>}, {key, [<<"9">>, 9]}, {value, 9}]}
+    ],
+
+    Result = run_query(<<"boom">>, #{
+        start_key => [<<"4">>],
+        end_key => [<<"8">>, []]
+    }),
+
+    ?assertEqual({ok, Rows1}, Result),
+
+    ResultRev = run_query(<<"boom">>, #{
+        start_key => [<<"8">>, []],
+        end_key => [<<"4">>],
+        direction => rev
+    }),
+
+    ?assertEqual({ok, lists:reverse(Rows1)}, ResultRev),
+
+    ResultRev2 = run_query(<<"boom">>, #{
+        start_key => [<<"9">>, 9],
+        end_key => [<<"4">>],
+        direction => rev,
+        inclusive_end => false
+    }),
+
+    % Here, [<<"4">>] is less than [<<"4">>, 4] so we
+    % expect rows 9-4
+    ?assertEqual({ok, lists:reverse(Rows2)}, ResultRev2),
+
+    ResultRev3 = run_query(<<"boom">>, #{
+        start_key => [<<"9">>, 9],
+        end_key => [<<"4">>, 4],
+        direction => rev,
+        inclusive_end => false
+    }),
+
+    % Here, specifying [<<"4">>, 4] as the key will prevent
+    % us from including that row which leaves rows 9-5
+    ?assertEqual({ok, lists:reverse(lists:nthtail(1, Rows2))}, ResultRev3).
+
+
+should_map_empty_views() ->
+    Result = run_query(<<"bing">>, #{}),
+    Expect = {ok, []},
+    ?assertEqual(Expect, Result).
+
+
+should_map_with_doc_emit() ->
+    Result = run_query(<<"doc_emit">>, #{
+        start_key => 8,
+        limit => 1
+    }),
+    Doc = {[
+        {<<"_id">>, <<"8">>},
+        {<<"_rev">>, <<"1-55b9a29311341e07ec0a7ca13bc1b59f">>},
+        {<<"val">>, 8}
+    ]},
+    Expect = {ok, [
+        {row, [{id, <<"8">>}, {key, 8}, {value, Doc}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_duplicate_keys() ->
+    Result = run_query(<<"duplicate_keys">>, #{
+        limit => 6
+    }),
+    Expect = {ok, [
+        {row, [{id, <<"1">>}, {key, <<"1">>}, {value, 1}]},
+        {row, [{id, <<"1">>}, {key, <<"1">>}, {value, 2}]},
+        {row, [{id, <<"10">>}, {key, <<"10">>}, {value, 10}]},
+        {row, [{id, <<"10">>}, {key, <<"10">>}, {value, 11}]},
+        {row, [{id, <<"2">>}, {key, <<"2">>}, {value, 2}]},
+        {row, [{id, <<"2">>}, {key, <<"2">>}, {value, 3}]}
+    ]},
+    ?assertEqual(Expect, Result).
+
+
+should_map_update_is_false() ->
+    Expect = {ok, [
+        {row, [{id, <<"8">>}, {key, 8}, {value, 8}]},
+        {row, [{id, <<"9">>}, {key, 9}, {value, 9}]},
+        {row, [{id, <<"10">>}, {key, 10}, {value, 10}]}
+    ]},
+
+    Expect1 = {ok, [
+        {row, [{id, <<"8">>}, {key, 8}, {value, 8}]},
+        {row, [{id, <<"9">>}, {key, 9}, {value, 9}]},
+        {row, [{id, <<"10">>}, {key, 10}, {value, 10}]},
+        {row, [{id, <<"11">>}, {key, 11}, {value, 11}]}
+    ]},
+
+    Idx = <<"baz">>,
+    DbName = ?tempdb(),
+
+    {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]),
+
+    DDoc = create_ddoc(),
+    Docs = make_docs(10),
+    fabric2_db:update_docs(Db, [DDoc | Docs]),
+
+    Args1 = #{
+        start_key => 8
+    },
+
+    Result1 = couch_views:query(Db, DDoc, Idx, fun default_cb/2,
+        [], Args1),
+    ?assertEqual(Expect, Result1),
+
+    Doc = doc(11),
+    fabric2_db:update_doc(Db, Doc),
+
+    Args2 = #{
+        start_key => 8,
+        update => false
+    },
+
+    Result2 = couch_views:query(Db, DDoc, Idx, fun default_cb/2,
+        [], Args2),
+    ?assertEqual(Expect, Result2),
+
+    Result3 = couch_views:query(Db, DDoc, Idx, fun default_cb/2,
+        [], Args1),
+    ?assertEqual(Expect1, Result3).
+
+
+should_map_update_is_lazy() ->
+    Expect = {ok, [
+        {row, [{id, <<"8">>}, {key, 8}, {value, 8}]},
+        {row, [{id, <<"9">>}, {key, 9}, {value, 9}]},
+        {row, [{id, <<"10">>}, {key, 10}, {value, 10}]}
+    ]},
+
+    Idx = <<"baz">>,
+    DbName = ?tempdb(),
+
+    {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]),
+
+    DDoc = create_ddoc(),
+    Docs = make_docs(10),
+
+    fabric2_db:update_docs(Db, [DDoc | Docs]),
+
+    Args1 = #{
+        start_key => 8,
+        update => lazy
+    },
+
+    Result1 = couch_views:query(Db, DDoc, Idx, fun default_cb/2,
+        [], Args1),
+    ?assertEqual({ok, []}, Result1),
+
+    {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+    JobId = couch_views_jobs:job_id(Db, Mrst),
+    UpdateSeq = fabric2_db:get_update_seq(Db),
+    ok = couch_views_jobs:wait_for_job(JobId, UpdateSeq),
+
+    Args2 = #{
+        start_key => 8,
+        update => false
+    },
+
+    Result2 = couch_views:query(Db, DDoc, Idx, fun default_cb/2,
+        [], Args2),
+    ?assertEqual(Expect, Result2).
+
+
+% should_give_ext_size_seq_indexed_test(Db) ->
+%     DDoc = couch_doc:from_json_obj({[
+%         {<<"_id">>, <<"_design/seqdoc">>},
+%         {<<"options">>, {[{<<"seq_indexed">>, true}]}},
+%         {<<"views">>, {[
+%                 {<<"view1">>, {[
+%                     {<<"map">>, <<"function(doc){emit(doc._id, doc._id);}">>}
+%                 ]}}
+%             ]}
+%         }
+%     ]}),
+%     {ok, _} = couch_db:update_doc(Db, DDoc, []),
+%     {ok, Db1} = couch_db:open_int(couch_db:name(Db), []),
+%     {ok, DDoc1} = couch_db:open_doc(Db1, <<"_design/seqdoc">>, [ejson_body]),
+%     couch_mrview:query_view(Db1, DDoc1, <<"view1">>, [{update, true}]),
+%     {ok, Info} = couch_mrview:get_info(Db1, DDoc),
+%     Size = couch_util:get_nested_json_value({Info}, [sizes, external]),
+%     ok = couch_db:close(Db1),
+%     ?assert(is_number(Size)).
+
+
+run_query(Idx, Args) ->
+    run_query(Idx, Args, false).
+
+
+run_query(Idx, Args, DebugCluster) ->
+    DbName = ?tempdb(),
+    {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]),
+    DDoc = create_ddoc(),
+    Docs = make_docs(10),
+    fabric2_db:update_docs(Db, [DDoc | Docs]),
+    if not DebugCluster -> ok; true ->
+        couch_views:query(Db, DDoc, Idx, fun default_cb/2, [], #{}),
+        fabric2_fdb:debug_cluster(),
+        ok
+    end,
+    couch_views:query(Db, DDoc, Idx, fun default_cb/2, [], Args).
+
+
+default_cb(complete, Acc) ->
+    {ok, lists:reverse(Acc)};
+default_cb({final, Info}, []) ->
+    {ok, [Info]};
+default_cb({final, _}, Acc) ->
+    {ok, Acc};
+default_cb({meta, _}, Acc) ->
+    {ok, Acc};
+default_cb(ok, ddoc_updated) ->
+    {ok, ddoc_updated};
+default_cb(Row, Acc) ->
+    {ok, [Row | Acc]}.
+
+
+create_ddoc() ->
+    couch_doc:from_json_obj({[
+        {<<"_id">>, <<"_design/bar">>},
+        {<<"views">>, {[
+            {<<"baz">>, {[
+                {<<"map">>, <<"function(doc) {emit(doc.val, doc.val);}">>}
+            ]}},
+            {<<"boom">>, {[
+                {<<"map">>, <<
+                    "function(doc) {\n"
+                    "   emit([doc.val.toString(), doc.val], doc.val);\n"
+                    "}"
+                >>}
+            ]}},
+            {<<"bing">>, {[
+                {<<"map">>, <<"function(doc) {}">>}
+            ]}},
+            {<<"doc_emit">>, {[
+                {<<"map">>, <<"function(doc) {emit(doc.val, doc)}">>}
+            ]}},
+            {<<"duplicate_keys">>, {[
+                {<<"map">>, <<
+                    "function(doc) {\n"
+                    "   emit(doc._id, doc.val);\n"
+                    "   emit(doc._id, doc.val + 1);\n"
+                    "}">>}
+            ]}},
+            {<<"zing">>, {[
+                {<<"map">>, <<
+                    "function(doc) {\n"
+                    "  if(doc.foo !== undefined)\n"
+                    "    emit(doc.foo, 0);\n"
+                    "}"
+                >>}
+            ]}}
+        ]}}
+    ]}).
+
+
+make_docs(Count) ->
+    [doc(I) || I <- lists:seq(1, Count)].
+
+
+doc(Id) ->
+    couch_doc:from_json_obj({[
+        {<<"_id">>, list_to_binary(integer_to_list(Id))},
+        {<<"val">>, Id}
+    ]}).
diff --git a/src/fabric/include/fabric2.hrl b/src/fabric/include/fabric2.hrl
index de1d3d1..6392d12 100644
--- a/src/fabric/include/fabric2.hrl
+++ b/src/fabric/include/fabric2.hrl
@@ -46,6 +46,7 @@
 -define(DB_DOCS, 21).
 -define(DB_LOCAL_DOCS, 22).
 -define(DB_ATTS, 23).
+-define(DB_VIEWS, 24).
 
 
 % Versions
diff --git a/test/elixir/test/basics_test.exs b/test/elixir/test/basics_test.exs
index c28c78c..363972b 100644
--- a/test/elixir/test/basics_test.exs
+++ b/test/elixir/test/basics_test.exs
@@ -178,21 +178,33 @@ defmodule BasicsTest do
 
     assert Couch.get("/#{db_name}").body["doc_count"] == 8
 
+    # Disabling until we figure out reduce functions
+    # # Test reduce function
+    # resp = Couch.get("/#{db_name}/_design/bar/_view/baz")
+    # assert hd(resp.body["rows"])["value"] == 33
+
     # Test reduce function
-    resp = Couch.get("/#{db_name}/_design/bar/_view/baz")
-    assert hd(resp.body["rows"])["value"] == 33
+    resp = Couch.get("/#{db_name}/_design/bar/_view/baz", query: %{:reduce => false})
+    assert resp.body["total_rows"] == 3
 
     # Delete doc and test for updated view results
     doc0 = Couch.get("/#{db_name}/0").body
     assert Couch.delete("/#{db_name}/0?rev=#{doc0["_rev"]}").body["ok"]
 
-    retry_until(fn ->
-      Couch.get("/#{db_name}/_design/foo/_view/baz").body["total_rows"] == 2
-    end)
+    # Disabling until we figure out reduce functions
+    # retry_until(fn ->
+    #  Couch.get("/#{db_name}/_design/foo/_view/baz").body["total_rows"] == 2
+    # end)
+
+    resp = Couch.get("/#{db_name}/_design/bar/_view/baz", query: %{:reduce => false})
+    assert resp.body["total_rows"] == 2
 
     assert Couch.get("/#{db_name}").body["doc_count"] == 7
     assert Couch.get("/#{db_name}/0").status_code == 404
-    refute Couch.get("/#{db_name}/0?rev=#{doc0["_rev"]}").status_code == 404
+
+    # No longer true. Old revisions are not stored after
+    # an update.
+    # refute Couch.get("/#{db_name}/0?rev=#{doc0["_rev"]}").status_code == 404
   end
 
   @tag :with_db
diff --git a/test/elixir/test/map_test.exs b/test/elixir/test/map_test.exs
new file mode 100644
index 0000000..04361ba
--- /dev/null
+++ b/test/elixir/test/map_test.exs
@@ -0,0 +1,450 @@
+defmodule ViewMapTest do
+  use CouchTestCase
+
+  @moduledoc """
+  Test Map functionality for views
+  """
+  def get_ids(resp) do
+    %{:body => %{"rows" => rows}} = resp
+    Enum.map(rows, fn row -> row["id"] end)
+  end
+
+  def get_keys(resp) do
+    %{:body => %{"rows" => rows}} = resp
+    Enum.map(rows, fn row -> row["key"] end)
+  end
+
+  defp create_map_docs(db_name) do
+    docs =
+      for i <- 1..10 do
+        group =
+          if rem(i, 3) == 0 do
+            "one"
+          else
+            "two"
+          end
+
+        %{
+          :_id => "doc-id-#{i}",
+          :value => i,
+          :some => "field",
+          :group => group
+        }
+      end
+
+    resp = Couch.post("/#{db_name}/_bulk_docs", body: %{:docs => docs, :w => 3})
+    assert resp.status_code == 201
+  end
+
+  setup do
+    db_name = random_db_name()
+    {:ok, _} = create_db(db_name)
+    on_exit(fn -> delete_db(db_name) end)
+
+    create_map_docs(db_name)
+
+    map_fun1 = """
+      function(doc) {
+        if (doc.some) {
+            emit(doc.value , doc.value);
+        }
+
+        if (doc._id.indexOf("_design") > -1) {
+            emit(0, "ddoc")
+        }
+      }
+    """
+
+    map_fun2 = """
+      function(doc) {
+        if (doc.group) {
+          emit([doc.some, doc.group], 1);
+        }
+      }
+    """
+
+    map_fun3 = """
+      function(doc) {
+        if (doc.group) {
+            emit(doc.group, 1);
+        }
+      }
+    """
+
+    body = %{
+      :w => 3,
+      :docs => [
+        %{
+          _id: "_design/map",
+          views: %{
+            some: %{map: map_fun1},
+            map_some: %{map: map_fun2},
+            map_group: %{map: map_fun3}
+          }
+        },
+        %{
+          _id: "_design/include_ddocs",
+          views: %{some: %{map: map_fun1}},
+          options: %{include_design: true}
+        }
+      ]
+    }
+
+    resp = Couch.post("/#{db_name}/_bulk_docs", body: body)
+    Enum.each(resp.body, &assert(&1["ok"]))
+
+    {:ok, [db_name: db_name]}
+  end
+
+  def get_reduce_result(resp) do
+    %{:body => %{"rows" => rows}} = resp
+    rows
+  end
+
+  test "query returns docs", context do
+    db_name = context[:db_name]
+
+    url = "/#{db_name}/_design/map/_view/some"
+    resp = Couch.get(url)
+    assert resp.status_code == 200
+
+    ids = get_ids(resp)
+
+    assert ids == [
+             "doc-id-1",
+             "doc-id-2",
+             "doc-id-3",
+             "doc-id-4",
+             "doc-id-5",
+             "doc-id-6",
+             "doc-id-7",
+             "doc-id-8",
+             "doc-id-9",
+             "doc-id-10"
+           ]
+
+    url = "/#{db_name}/_design/map/_view/map_some"
+    resp = Couch.get(url)
+    assert resp.status_code == 200
+
+    ids = get_ids(resp)
+
+    assert ids == [
+             "doc-id-3",
+             "doc-id-6",
+             "doc-id-9",
+             "doc-id-1",
+             "doc-id-10",
+             "doc-id-2",
+             "doc-id-4",
+             "doc-id-5",
+             "doc-id-7",
+             "doc-id-8"
+           ]
+  end
+
+  test "updated docs rebuilds index", context do
+    db_name = context[:db_name]
+
+    url = "/#{db_name}/_design/map/_view/some"
+    resp = Couch.get(url)
+    assert resp.status_code == 200
+    ids = get_ids(resp)
+
+    assert ids == [
+             "doc-id-1",
+             "doc-id-2",
+             "doc-id-3",
+             "doc-id-4",
+             "doc-id-5",
+             "doc-id-6",
+             "doc-id-7",
+             "doc-id-8",
+             "doc-id-9",
+             "doc-id-10"
+           ]
+
+    update_doc_value(db_name, "doc-id-5", 0)
+    update_doc_value(db_name, "doc-id-6", 100)
+
+    resp = Couch.get("/#{db_name}/doc-id-3")
+    doc3 = convert(resp.body)
+    resp = Couch.delete("/#{db_name}/#{doc3["_id"]}", query: %{rev: doc3["_rev"]})
+    assert resp.status_code == 200
+    #
+    resp = Couch.get("/#{db_name}/doc-id-4")
+    doc4 = convert(resp.body)
+    doc4 = Map.delete(doc4, "some")
+    resp = Couch.put("/#{db_name}/#{doc4["_id"]}", body: doc4)
+    assert resp.status_code == 201
+    #
+    resp = Couch.get("/#{db_name}/doc-id-1")
+    doc1 = convert(resp.body)
+    doc1 = Map.put(doc1, "another", "value")
+    resp = Couch.put("/#{db_name}/#{doc1["_id"]}", body: doc1)
+    assert resp.status_code == 201
+
+    url = "/#{db_name}/_design/map/_view/some"
+    resp = Couch.get(url)
+    assert resp.status_code == 200
+    ids = get_ids(resp)
+
+    assert ids == [
+             "doc-id-5",
+             "doc-id-1",
+             "doc-id-2",
+             "doc-id-7",
+             "doc-id-8",
+             "doc-id-9",
+             "doc-id-10",
+             "doc-id-6"
+           ]
+  end
+
+  test "can index design docs", context do
+    db_name = context[:db_name]
+
+    url = "/#{db_name}/_design/include_ddocs/_view/some"
+    resp = Couch.get(url, query: %{limit: 3})
+    assert resp.status_code == 200
+    ids = get_ids(resp)
+
+    assert ids == ["_design/include_ddocs", "_design/map", "doc-id-1"]
+  end
+
+  test "can use key in query string", context do
+    db_name = context[:db_name]
+
+    url = "/#{db_name}/_design/map/_view/map_group"
+    resp = Couch.get(url, query: %{limit: 3, key: "\"one\""})
+    assert resp.status_code == 200
+    ids = get_ids(resp)
+    assert ids == ["doc-id-3", "doc-id-6", "doc-id-9"]
+
+    resp =
+      Couch.get(url,
+        query: %{
+          limit: 3,
+          key: "\"one\"",
+          descending: true
+        }
+      )
+
+    assert resp.status_code == 200
+    ids = get_ids(resp)
+    assert ids == ["doc-id-9", "doc-id-6", "doc-id-3"]
+  end
+
+  test "can use keys in query string", context do
+    db_name = context[:db_name]
+
+    url = "/#{db_name}/_design/map/_view/some"
+    resp = Couch.post(url, body: %{keys: [6, 3, 9]})
+    assert resp.status_code == 200
+    ids = get_ids(resp)
+    assert ids == ["doc-id-6", "doc-id-3", "doc-id-9"]
+
+    # should ignore descending = true
+    resp = Couch.post(url, body: %{keys: [6, 3, 9], descending: true})
+    assert resp.status_code == 200
+    ids = get_ids(resp)
+    assert ids == ["doc-id-6", "doc-id-3", "doc-id-9"]
+  end
+
+  test "inclusive = false", context do
+    db_name = context[:db_name]
+
+    docs = [
+      %{key: "key1"},
+      %{key: "key2"},
+      %{key: "key3"},
+      %{key: "key4"},
+      %{key: "key4"},
+      %{key: "key5"},
+      %{
+        _id: "_design/inclusive",
+        views: %{
+          by_key: %{
+            map: """
+                function (doc) {
+                    if (doc.key) {
+                        emit(doc.key, doc);
+                    }
+                }
+            """
+          }
+        }
+      }
+    ]
+
+    resp = Couch.post("/#{db_name}/_bulk_docs", body: %{:docs => docs, :w => 3})
+    assert resp.status_code == 201
+    url = "/#{db_name}/_design/inclusive/_view/by_key"
+
+    query = %{
+      endkey: "\"key4\"",
+      inclusive_end: false
+    }
+
+    resp = Couch.get(url, query: query)
+    assert resp.status_code == 200
+    keys = get_keys(resp)
+    assert keys == ["key1", "key2", "key3"]
+
+    query = %{
+      startkey: "\"key3\"",
+      endkey: "\"key4\"",
+      inclusive_end: false
+    }
+
+    resp = Couch.get(url, query: query)
+    assert resp.status_code == 200
+    keys = get_keys(resp)
+    assert keys == ["key3"]
+
+    query = %{
+      startkey: "\"key4\"",
+      endkey: "\"key1\"",
+      inclusive_end: false,
+      descending: true
+    }
+
+    resp = Couch.get(url, query: query)
+    assert resp.status_code == 200
+    keys = get_keys(resp)
+    assert keys == ["key4", "key4", "key3", "key2"]
+  end
+
+  test "supports linked documents", context do
+    db_name = context[:db_name]
+
+    docs = [
+      %{_id: "mydoc", foo: "bar"},
+      %{_id: "join-doc", doc_id: "mydoc"},
+      %{
+        _id: "_design/join",
+        views: %{
+          by_doc_id: %{
+            map: """
+                function (doc) {
+                    if (doc.doc_id) {
+                        emit(doc._id, {_id: doc.doc_id});
+                    }
+                }
+            """
+          }
+        }
+      }
+    ]
+
+    resp = Couch.post("/#{db_name}/_bulk_docs", body: %{:docs => docs, :w => 3})
+    assert resp.status_code == 201
+
+    url = "/#{db_name}/_design/join/_view/by_doc_id"
+    resp = Couch.get(url)
+    assert resp.status_code == 200
+    %{:body => %{"rows" => [row]}} = resp
+
+    assert row == %{
+             "id" => "join-doc",
+             "key" => "join-doc",
+             "value" => %{"_id" => "mydoc"}
+           }
+
+    url = "/#{db_name}/_design/join/_view/by_doc_id"
+    resp = Couch.get(url, query: %{include_docs: true})
+    assert resp.status_code == 200
+    %{:body => %{"rows" => [doc]}} = resp
+
+    assert doc["id"] == "join-doc"
+    assert doc["doc"]["_id"] == "mydoc"
+  end
+
+  test "bad range returns error", context do
+    db_name = context[:db_name]
+
+    url = "/#{db_name}/_design/map/_view/some"
+    resp = Couch.get(url, query: %{startkey: "5", endkey: "4"})
+    assert resp.status_code == 400
+    %{:body => %{"error" => error}} = resp
+    assert error == "query_parse_error"
+  end
+
+  test "multiple emits in correct value order", context do
+    db_name = context[:db_name]
+
+    docs = [
+      %{_id: "doc1", foo: "foo", bar: "bar"},
+      %{_id: "doc2", foo: "foo", bar: "bar"},
+      %{
+        _id: "_design/emit",
+        views: %{
+          multiple_emit: %{
+            map: """
+                function (doc) {
+                  if (!doc.foo) {
+                    return;
+                  }
+                  emit(doc.foo);
+                  emit(doc.bar);
+                  emit(doc.foo);
+                  emit(doc.bar, 'multiple values!');
+                  emit(doc.bar, 'crayon!');
+                }
+            """
+          }
+        }
+      }
+    ]
+
+    resp = Couch.post("/#{db_name}/_bulk_docs", body: %{:docs => docs, :w => 3})
+    assert resp.status_code == 201
+
+    url = "/#{db_name}/_design/emit/_view/multiple_emit"
+    resp = Couch.post(url, body: %{keys: ["foo", "bar"]})
+    assert resp.status_code == 200
+    %{:body => %{"rows" => rows}} = resp
+
+    assert Enum.at(rows, 0)["key"] == "foo"
+    assert Enum.at(rows, 0)["id"] == "doc1"
+    assert Enum.at(rows, 1)["key"] == "foo"
+    assert Enum.at(rows, 1)["id"] == "doc1"
+
+    assert Enum.at(rows, 2)["key"] == "foo"
+    assert Enum.at(rows, 2)["id"] == "doc2"
+    assert Enum.at(rows, 3)["key"] == "foo"
+    assert Enum.at(rows, 3)["id"] == "doc2"
+
+    assert Enum.at(rows, 4)["key"] == "bar"
+    assert Enum.at(rows, 4)["id"] == "doc1"
+    assert Enum.at(rows, 4)["value"] == :null
+    assert Enum.at(rows, 5)["key"] == "bar"
+    assert Enum.at(rows, 5)["id"] == "doc1"
+    assert Enum.at(rows, 5)["value"] == "crayon!"
+    assert Enum.at(rows, 6)["key"] == "bar"
+    assert Enum.at(rows, 6)["id"] == "doc1"
+    assert Enum.at(rows, 6)["value"] == "multiple values!"
+
+    assert Enum.at(rows, 7)["key"] == "bar"
+    assert Enum.at(rows, 7)["id"] == "doc2"
+    assert Enum.at(rows, 7)["value"] == :null
+    assert Enum.at(rows, 8)["key"] == "bar"
+    assert Enum.at(rows, 8)["id"] == "doc2"
+    assert Enum.at(rows, 8)["value"] == "crayon!"
+    assert Enum.at(rows, 9)["key"] == "bar"
+    assert Enum.at(rows, 9)["id"] == "doc2"
+    assert Enum.at(rows, 9)["value"] == "multiple values!"
+  end
+
+  def update_doc_value(db_name, id, value) do
+    resp = Couch.get("/#{db_name}/#{id}")
+    doc = convert(resp.body)
+    doc = Map.put(doc, "value", value)
+    resp = Couch.put("/#{db_name}/#{id}", body: doc)
+    assert resp.status_code == 201
+  end
+
+  def convert(value) do
+    :jiffy.decode(:jiffy.encode(value), [:return_maps])
+  end
+end
diff --git a/test/elixir/test/view_collation_test.exs b/test/elixir/test/view_collation_test.exs
index 7563ba4..bf30031 100644
--- a/test/elixir/test/view_collation_test.exs
+++ b/test/elixir/test/view_collation_test.exs
@@ -70,34 +70,28 @@ defmodule ViewCollationTest do
   end
 
   test "ascending collation order", context do
-    retry_until(fn ->
-      resp = Couch.get(url(context))
-      pairs = Enum.zip(resp.body["rows"], @values)
+    resp = Couch.get(url(context))
+    pairs = Enum.zip(resp.body["rows"], @values)
 
-      Enum.each(pairs, fn {row, value} ->
-        assert row["key"] == convert(value)
-      end)
+    Enum.each(pairs, fn {row, value} ->
+      assert row["key"] == convert(value)
     end)
   end
 
   test "descending collation order", context do
-    retry_until(fn ->
-      resp = Couch.get(url(context), query: %{"descending" => "true"})
-      pairs = Enum.zip(resp.body["rows"], Enum.reverse(@values))
+    resp = Couch.get(url(context), query: %{"descending" => "true"})
+    pairs = Enum.zip(resp.body["rows"], Enum.reverse(@values))
 
-      Enum.each(pairs, fn {row, value} ->
-        assert row["key"] == convert(value)
-      end)
+    Enum.each(pairs, fn {row, value} ->
+      assert row["key"] == convert(value)
     end)
   end
 
   test "key query option", context do
     Enum.each(@values, fn value ->
-      retry_until(fn ->
-        resp = Couch.get(url(context), query: %{:key => :jiffy.encode(value)})
-        assert length(resp.body["rows"]) == 1
-        assert Enum.at(resp.body["rows"], 0)["key"] == convert(value)
-      end)
+      resp = Couch.get(url(context), query: %{:key => :jiffy.encode(value)})
+      assert length(resp.body["rows"]) == 1
+      assert Enum.at(resp.body["rows"], 0)["key"] == convert(value)
     end)
   end