You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2020/02/19 08:22:23 UTC

[couchdb] 11/23: background indexing for mango

This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch fdb-mango-indexes
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 0d7b4b7aab1314cb4bd029f254f19c610767bb85
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Feb 5 14:06:18 2020 +0200

    background indexing for mango
---
 src/couch_views/src/couch_views_indexer.erl        |   3 +-
 src/fabric/src/fabric2_fdb.erl                     |  12 +-
 src/mango/src/mango.hrl                            |  10 +-
 src/mango/src/mango_fdb.erl                        | 128 ++++++--
 src/mango/src/mango_idx.erl                        |  19 +-
 src/mango/src/mango_idx.hrl                        |   3 +-
 src/mango/src/mango_idx_view.erl                   |   3 +-
 src/mango/src/mango_indexer.erl                    |  24 +-
 src/mango/src/mango_indexer_server.erl             | 103 ++++++
 src/mango/src/mango_jobs.erl                       |  53 +++
 src/mango/src/mango_jobs_indexer.erl               | 358 +++++++++++++++++++++
 src/mango/src/mango_sup.erl                        |  14 +-
 src/mango/test/01-index-crud-test.py               |   1 +
 src/mango/test/eunit/mango_indexer_test.erl        |   5 +-
 ...ndexer_test.erl => mango_jobs_indexer_test.erl} | 108 ++++---
 src/mango/test/mango.py                            |   8 +-
 16 files changed, 754 insertions(+), 98 deletions(-)

diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 31cd8e6..1e9da99 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -18,7 +18,8 @@
 
 
 -export([
-    init/0
+    init/0,
+    fetch_docs/2
 ]).
 
 -include("couch_views.hrl").
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index 130901a..0a5bf9b 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -64,6 +64,8 @@
     seq_to_vs/1,
     next_vs/1,
 
+    new_versionstamp/1,
+
     debug_cluster/0,
     debug_cluster/2
 ]).
@@ -974,6 +976,11 @@ next_vs({versionstamp, VS, Batch, TxId}) ->
     {versionstamp, V, B, T}.
 
 
+new_versionstamp(Tx) ->
+    TxId = erlfdb:get_next_tx_id(Tx),
+    {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
+
+
 debug_cluster() ->
     debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
 
@@ -1708,11 +1715,6 @@ get_transaction_id(Tx, LayerPrefix) ->
     end.
 
 
-new_versionstamp(Tx) ->
-    TxId = erlfdb:get_next_tx_id(Tx),
-    {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
-
-
 on_commit(Tx, Fun) when is_function(Fun, 0) ->
     % Here we rely on Tx objects matching. However they contain a nif resource
     % object. Before Erlang 20.0 those would have been represented as empty
diff --git a/src/mango/src/mango.hrl b/src/mango/src/mango.hrl
index d3445a8..a1f9325 100644
--- a/src/mango/src/mango.hrl
+++ b/src/mango/src/mango.hrl
@@ -12,5 +12,11 @@
 
 -define(MANGO_ERROR(R), throw({mango_error, ?MODULE, R})).
 
--define(MANGO_IDX_BUILD_STATUS, 0).
--define(MANGO_IDX_RANGE, 1).
+-define(MANGO_IDX_BUILD_STATUS, 1).
+-define(MANGO_UPDATE_SEQ, 2).
+-define(MANGO_IDX_RANGE, 3).
+
+-define(MANGO_INDEX_JOB_TYPE, <<"mango">>).
+
+-define(MANGO_INDEX_BUILDING, <<"building">>).
+-define(MANGO_INDEX_READY, <<"ready">>).
diff --git a/src/mango/src/mango_fdb.erl b/src/mango/src/mango_fdb.erl
index def942f..a54d658 100644
--- a/src/mango/src/mango_fdb.erl
+++ b/src/mango/src/mango_fdb.erl
@@ -22,13 +22,104 @@
 
 
 -export([
-    query_all_docs/4,
+    create_build_vs/2,
+    set_build_vs/4,
+    get_build_vs/2,
+    get_build_status/2,
+    get_update_seq/2,
+    set_update_seq/3,
     remove_doc/3,
     write_doc/3,
+    query_all_docs/4,
     query/4
 ]).
 
 
+create_build_vs(TxDb, #idx{} = Idx) ->
+    #{
+        tx := Tx
+    } = TxDb,
+    Key = build_vs_key(TxDb, Idx#idx.ddoc),
+    VS = fabric2_fdb:new_versionstamp(Tx),
+    Value = erlfdb_tuple:pack_vs({VS, ?MANGO_INDEX_BUILDING}),
+    erlfdb:set_versionstamped_value(Tx, Key, Value).
+
+
+set_build_vs(TxDb, #idx{} = Idx, VS, State) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    Key = build_vs_key(TxDb, Idx#idx.ddoc),
+    Value = erlfdb_tuple:pack({VS, State}),
+    ok = erlfdb:set(Tx, Key, Value).
+
+
+get_build_vs(TxDb, #idx{} = Idx) ->
+    get_build_vs(TxDb, Idx#idx.ddoc);
+
+get_build_vs(TxDb, DDoc) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+    Key = build_vs_key(TxDb, DDoc),
+    EV = erlfdb:wait(erlfdb:get(Tx, Key)),
+    case EV of
+        not_found -> not_found;
+        EV -> erlfdb_tuple:unpack(EV)
+    end.
+
+
+get_build_status(TxDb, DDoc) ->
+    case get_build_vs(TxDb, DDoc) of
+        not_found -> ?MANGO_INDEX_BUILDING;
+        {_, BuildState} -> BuildState
+    end.
+
+
+get_update_seq(TxDb, #idx{ddoc = DDoc}) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+
+    case erlfdb:wait(erlfdb:get(Tx, seq_key(DbPrefix, DDoc))) of
+        not_found -> <<>>;
+        UpdateSeq -> UpdateSeq
+    end.
+
+
+set_update_seq(TxDb, #idx{ddoc = DDoc}, Seq) ->
+    #{
+        tx := Tx,
+        db_prefix := DbPrefix
+    } = TxDb,
+    ok = erlfdb:set(Tx, seq_key(DbPrefix, DDoc), Seq).
+
+
+remove_doc(TxDb, DocId, IdxResults) ->
+    lists:foreach(fun (IdxResult) ->
+        #{
+            ddoc_id := DDocId,
+            results := Results
+        } = IdxResult,
+        MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
+        clear_key(TxDb, MangoIdxPrefix, Results, DocId)
+    end, IdxResults).
+
+
+write_doc(TxDb, DocId, IdxResults) ->
+    lists:foreach(fun (IdxResult) ->
+        #{
+            ddoc_id := DDocId,
+            results := Results
+        } = IdxResult,
+        MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
+        add_key(TxDb, MangoIdxPrefix, Results, DocId)
+    end, IdxResults).
+
+
 query_all_docs(Db, CallBack, Cursor, Args) ->
     Opts = args_to_fdb_opts(Args) ++ [{include_docs, true}],
     fabric2_db:fold_docs(Db, CallBack, Cursor, Opts).
@@ -133,7 +224,7 @@ fold_cb({Key, _}, Acc) ->
     {{_, DocId}} = erlfdb_tuple:unpack(Key, MangoIdxPrefix),
     {ok, Doc} = fabric2_db:open_doc(Db, DocId),
     JSONDoc = couch_doc:to_json_obj(Doc, []),
-    io:format("PRINT ~p ~p ~n", [DocId, JSONDoc]),
+%%    io:format("PRINT ~p ~p ~n", [DocId, JSONDoc]),
     case Callback({doc, JSONDoc}, Cursor) of
         {ok, Cursor1} ->
             Acc#{
@@ -144,33 +235,24 @@ fold_cb({Key, _}, Acc) ->
     end.
 
 
-remove_doc(TxDb, DocId, IdxResults) ->
-    lists:foreach(fun (IdxResult) ->
-        #{
-            ddoc_id := DDocId,
-            results := Results
-        } = IdxResult,
-        MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
-        clear_key(TxDb, MangoIdxPrefix, Results, DocId)
-    end, IdxResults).
+mango_idx_prefix(TxDb, Id) ->
+    #{
+        db_prefix := DbPrefix
+    } = TxDb,
+    Key = {?DB_MANGO, Id, ?MANGO_IDX_RANGE},
+    erlfdb_tuple:pack(Key, DbPrefix).
 
 
-write_doc(TxDb, DocId, IdxResults) ->
-    lists:foreach(fun (IdxResult) ->
-        #{
-            ddoc_id := DDocId,
-            results := Results
-        } = IdxResult,
-        MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
-        add_key(TxDb, MangoIdxPrefix, Results, DocId)
-        end, IdxResults).
+seq_key(DbPrefix, DDoc) ->
+    Key = {?DB_MANGO, DDoc, ?MANGO_UPDATE_SEQ},
+    erlfdb_tuple:pack(Key, DbPrefix).
 
 
-mango_idx_prefix(TxDb, Id) ->
+build_vs_key(Db, DDoc) ->
     #{
         db_prefix := DbPrefix
-    } = TxDb,
-    Key = {?DB_MANGO, Id, ?MANGO_IDX_RANGE},
+    } = Db,
+    Key = {?DB_MANGO, DDoc, ?MANGO_IDX_BUILD_STATUS},
     erlfdb_tuple:pack(Key, DbPrefix).
 
 
diff --git a/src/mango/src/mango_idx.erl b/src/mango/src/mango_idx.erl
index cf3f507..3aadd49 100644
--- a/src/mango/src/mango_idx.erl
+++ b/src/mango/src/mango_idx.erl
@@ -58,7 +58,7 @@ list(Db) ->
         rows => []
     },
     {ok, Indexes} = fabric2_db:fold_design_docs(Db, fun ddoc_fold_cb/2, Acc0, []),
-    io:format("INDEXES ~p ~n", [Indexes]),
+%%    io:format("INDEXES ~p ~n", [Indexes]),
     Indexes ++ special(Db).
 
 
@@ -237,13 +237,16 @@ from_ddoc(Db, {Props}) ->
 %%            [mango_idx_view]
 %%    end,
     Idxs = lists:flatmap(fun(Mod) -> Mod:from_ddoc({Props}) end, IdxMods),
-    lists:map(fun(Idx) ->
-        Idx#idx{
-            dbname = DbName,
-            ddoc = DDoc,
-            partitioned = get_idx_partitioned(Db, Props)
-        }
-    end, Idxs).
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        lists:map(fun(Idx) ->
+            Idx#idx{
+                dbname = DbName,
+                ddoc = DDoc,
+                partitioned = get_idx_partitioned(Db, Props),
+                build_status = mango_fdb:get_build_status(TxDb, DDoc)
+            }
+        end, Idxs)
+    end).
 
 
 special(Db) ->
diff --git a/src/mango/src/mango_idx.hrl b/src/mango/src/mango_idx.hrl
index 9725950..f5f827b 100644
--- a/src/mango/src/mango_idx.hrl
+++ b/src/mango/src/mango_idx.hrl
@@ -17,5 +17,6 @@
     type,
     def,
     partitioned,
-    opts
+    opts,
+    build_status
 }).
diff --git a/src/mango/src/mango_idx_view.erl b/src/mango/src/mango_idx_view.erl
index 5ec2a10..949c69b 100644
--- a/src/mango/src/mango_idx_view.erl
+++ b/src/mango/src/mango_idx_view.erl
@@ -105,7 +105,8 @@ to_json(Idx) ->
         {name, Idx#idx.name},
         {type, Idx#idx.type},
         {partitioned, Idx#idx.partitioned},
-        {def, {def_to_json(Idx#idx.def)}}
+        {def, {def_to_json(Idx#idx.def)}},
+        {build_status, Idx#idx.build_status}
     ]}.
 
 
diff --git a/src/mango/src/mango_indexer.erl b/src/mango/src/mango_indexer.erl
index c22b9cf..c7632a7 100644
--- a/src/mango/src/mango_indexer.erl
+++ b/src/mango/src/mango_indexer.erl
@@ -17,11 +17,14 @@
 -export([
     create_doc/2,
     update_doc/3,
-    delete_doc/2
+    delete_doc/2,
+
+    write_doc/3
 ]).
 
 
 -include_lib("couch/include/couch_db.hrl").
+-include("mango.hrl").
 -include("mango_idx.hrl").
 
 
@@ -42,7 +45,7 @@ modify(Db, Change, Doc, PrevDoc) ->
         modify_int(Db, Change, Doc, PrevDoc)
     catch
         Error:Reason ->
-            io:format("ERROR ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]),
+            io:format("ERROR INDEXER ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]),
             #{
                 name := DbName
             } = Db,
@@ -66,9 +69,16 @@ doc_id(#doc{id = DocId}, _) ->
 % Design doc
 % Todo: Check if design doc is mango index and kick off background worker
 % to build new index
-modify_int(_Db, _Change, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc,
+modify_int(Db, _Change, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc,
         _PrevDoc) ->
-    ok;
+    {Props} = JSONDoc = couch_doc:to_json_obj(Doc, []),
+    case proplists:get_value(<<"language">>, Props) of
+        <<"query">> ->
+            [Idx] = mango_idx:from_ddoc(Db, JSONDoc),
+            {ok, _} = mango_jobs:build_index(Db, Idx);
+        _ ->
+            ok
+    end;
 
 modify_int(Db, delete, _, PrevDoc)  ->
     remove_doc(Db, PrevDoc, json_indexes(Db));
@@ -138,15 +148,13 @@ get_index_entries(IdxDef, Doc) ->
 
 
 get_index_values(Fields, Doc) ->
-    Out1 = lists:map(fun({Field, _Dir}) ->
+    lists:map(fun({Field, _Dir}) ->
         case mango_doc:get_field(Doc, Field) of
             not_found -> not_found;
             bad_path -> not_found;
             Value -> Value
         end
-    end, Fields),
-    io:format("OUT ~p ~p ~n", [Fields, Out1]),
-    Out1.
+    end, Fields).
 
 
 get_index_partial_filter_selector(IdxDef) ->
diff --git a/src/mango/src/mango_indexer_server.erl b/src/mango/src/mango_indexer_server.erl
new file mode 100644
index 0000000..29530bb
--- /dev/null
+++ b/src/mango/src/mango_indexer_server.erl
@@ -0,0 +1,103 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mango_indexer_server).
+
+
+-behaviour(gen_server).
+
+
+-export([
+    start_link/0
+]).
+
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-define(MAX_WORKERS, 1).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+    process_flag(trap_exit, true),
+    mango_jobs:set_timeout(),
+    St = #{
+        workers => #{},
+        max_workers => max_workers()
+    },
+    {ok, spawn_workers(St)}.
+
+
+terminate(_, _St) ->
+    ok.
+
+
+handle_call(Msg, _From, St) ->
+    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+    {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'EXIT', Pid, Reason}, St) ->
+    #{workers := Workers} = St,
+    case maps:is_key(Pid, Workers) of
+        true ->
+            if Reason == normal -> ok; true ->
+                LogMsg = "~p : indexer process ~p exited with ~p",
+                couch_log:error(LogMsg, [?MODULE, Pid, Reason])
+            end,
+            NewWorkers = maps:remove(Pid, Workers),
+            {noreply, spawn_workers(St#{workers := NewWorkers})};
+        false ->
+            LogMsg = "~p : unknown process ~p exited with ~p",
+            couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+            {stop, {unknown_pid_exit, Pid}, St}
+    end;
+
+handle_info(Msg, St) ->
+    {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+spawn_workers(St) ->
+    #{
+        workers := Workers,
+        max_workers := MaxWorkers
+    } = St,
+    case maps:size(Workers) < MaxWorkers of
+        true ->
+            Pid = mango_jobs_indexer:spawn_link(),
+            NewSt = St#{workers := Workers#{Pid => true}},
+            spawn_workers(NewSt);
+        false ->
+            St
+    end.
+
+
+max_workers() ->
+    config:get_integer("mango", "max_workers", ?MAX_WORKERS).
diff --git a/src/mango/src/mango_jobs.erl b/src/mango/src/mango_jobs.erl
new file mode 100644
index 0000000..6739d62
--- /dev/null
+++ b/src/mango/src/mango_jobs.erl
@@ -0,0 +1,53 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+
+
+-module(mango_jobs).
+
+-include("mango_idx.hrl").
+-include("mango.hrl").
+
+
+-export([
+    set_timeout/0,
+    build_index/2
+]).
+
+
+set_timeout() ->
+    couch_jobs:set_type_timeout(?MANGO_INDEX_JOB_TYPE, 6).
+
+
+build_index(TxDb, #idx{} = Idx) ->
+    #{
+        tx := Tx
+    } = TxDb,
+
+    mango_fdb:create_build_vs(TxDb, Idx),
+
+    JobId = job_id(TxDb, Idx),
+    JobData = job_data(TxDb, Idx),
+    ok = couch_jobs:add(undefined, ?MANGO_INDEX_JOB_TYPE, JobId, JobData),
+    {ok, JobId}.
+
+
+job_id(#{name := DbName}, #idx{ddoc = DDoc}) ->
+    <<DbName/binary, "-",DDoc/binary>>.
+
+
+job_data(Db, Idx) ->
+    #{
+        db_name => fabric2_db:name(Db),
+        ddoc_id => mango_idx:ddoc(Idx),
+        columns => mango_idx:columns(Idx),
+        retries => 0
+    }.
+
diff --git a/src/mango/src/mango_jobs_indexer.erl b/src/mango/src/mango_jobs_indexer.erl
new file mode 100644
index 0000000..ce6b850
--- /dev/null
+++ b/src/mango/src/mango_jobs_indexer.erl
@@ -0,0 +1,358 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Todo: this is a copy-pasta of couch_views_indexer
+% We need to make the indexing generic and have only the specific mango
+% logic here
+-module(mango_jobs_indexer).
+
+-export([
+    spawn_link/0
+]).
+
+
+-export([
+    init/0
+]).
+
+-include("mango.hrl").
+-include("mango_idx.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+
+spawn_link() ->
+    proc_lib:spawn_link(?MODULE, init, []).
+
+
+init() ->
+    {ok, Job, Data} = couch_jobs:accept(?MANGO_INDEX_JOB_TYPE, #{}),
+    #{
+        <<"db_name">> := DbName,
+        <<"ddoc_id">> := DDocId,
+        <<"columns">> := JobColumns,
+        <<"retries">> := Retries
+    } = Data,
+
+    {ok, Db} = try
+        fabric2_db:open(DbName, [?ADMIN_CTX])
+    catch error:database_does_not_exist ->
+        couch_jobs:finish(undefined, Job, Data#{
+            error => db_deleted,
+            reason => "Database was deleted"
+        }),
+        exit(normal)
+    end,
+
+    [Idx] = case fabric2_db:open_doc(Db, DDocId) of
+        {ok, DDoc} ->
+            JSONDDoc = couch_doc:to_json_obj(DDoc, []),
+            mango_idx:from_ddoc(Db, JSONDDoc);
+        {not_found, _} ->
+            couch_jobs:finish(undefined, Job, Data#{
+                error => ddoc_deleted,
+                reason => "Design document was deleted"
+            }),
+            exit(normal)
+    end,
+
+    Columns = mango_idx:columns(Idx),
+
+    if  JobColumns == Columns -> ok; true ->
+        couch_jobs:finish(undefined, Job, Data#{
+            error => index_changed,
+            reason => <<"Design document was modified">>
+        }),
+        exit(normal)
+    end,
+
+
+    State = #{
+        tx_db => undefined,
+        idx_vs => undefined,
+        idx_seq => undefined,
+        last_seq => undefined,
+        job => Job,
+        job_data => Data,
+        count => 0,
+        limit => num_changes(),
+        doc_acc => []
+    },
+
+    try
+        update(Db, Idx, State)
+    catch
+        exit:normal ->
+            ok;
+        Error:Reason  ->
+            io:format("ERROR in index worker ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]),
+            NewRetry = Retries + 1,
+            RetryLimit = retry_limit(),
+
+            case should_retry(NewRetry, RetryLimit, Reason) of
+                true ->
+                    DataErr = Data#{<<"retries">> := NewRetry},
+                    StateErr = State#{job_data := DataErr},
+                    report_progress(StateErr, update);
+                false ->
+                    NewData = add_error(Error, Reason, Data),
+                    couch_jobs:finish(undefined, Job, NewData),
+                    exit(normal)
+            end
+    end.
+
+
+% Transaction limit exceeded don't retry
+should_retry(_, _, {erlfdb_error, 2101}) ->
+    false;
+
+should_retry(Retries, RetryLimit, _) when Retries < RetryLimit ->
+    true;
+
+should_retry(_, _, _) ->
+    false.
+
+
+add_error(error, {erlfdb_error, Code}, Data) ->
+    CodeBin = couch_util:to_binary(Code),
+    CodeString = erlfdb:get_error_string(Code),
+    Data#{
+        error => foundationdb_error,
+        reason => list_to_binary([CodeBin, <<"-">>, CodeString])
+    };
+
+add_error(Error, Reason, Data) ->
+    Data#{
+        error => couch_util:to_binary(Error),
+        reason => couch_util:to_binary(Reason)
+    }.
+
+
+update(#{} = Db, #idx{} = Idx, State0) ->
+    {Idx2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+        % In the first iteration of update we need
+        % to populate our db and view sequences
+        State1 = case State0 of
+            #{idx_vs := undefined} ->
+                #{
+                    job := Job,
+                    job_data := Data
+                } = State0,
+
+                {IdxVS, BuildState} = mango_fdb:get_build_vs(TxDb, Idx),
+                if BuildState == ?MANGO_INDEX_BUILDING -> ok; true ->
+                    couch_jobs:finish(undefined, Job, Data#{
+                        error => index_built,
+                        reason => <<"Index is already built">>
+                    }),
+                    exit(normal)
+                end,
+
+                IdxSeq = mango_fdb:get_update_seq(TxDb, Idx),
+
+                State0#{
+                    tx_db := TxDb,
+                    idx_vs := IdxVS,
+                    idx_seq := IdxSeq
+                };
+            _ ->
+                State0#{
+                    tx_db := TxDb
+                }
+        end,
+
+        {ok, State2} = fold_changes(State1),
+
+        #{
+            idx_vs := IdxVS1,
+            count := Count,
+            limit := Limit,
+            doc_acc := DocAcc,
+            idx_seq := IdxSeq1
+        } = State2,
+
+        DocAcc1 = couch_views_indexer:fetch_docs(TxDb, DocAcc),
+        index_docs(TxDb, Idx, DocAcc1),
+        mango_fdb:set_update_seq(TxDb, Idx, IdxSeq1),
+        case Count < Limit of
+            true ->
+                mango_fdb:set_build_vs(TxDb, Idx, IdxVS1, ?MANGO_INDEX_READY),
+                report_progress(State2, finished),
+                {Idx, finished};
+            false ->
+                State3 = report_progress(State2, update),
+                {Idx, State3#{
+                    tx_db := undefined,
+                    count := 0,
+                    doc_acc := [],
+                    idx_seq := IdxSeq1
+                }}
+        end
+    end),
+
+    case State4 of
+        finished ->
+            ok;
+        _ ->
+            update(Db, Idx2, State4)
+    end.
+
+
+fold_changes(State) ->
+    #{
+        idx_seq := SinceSeq,
+        limit := Limit,
+        tx_db := TxDb
+    } = State,
+
+    Fun = fun process_changes/2,
+    fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
+
+
+process_changes(Change, Acc) ->
+    #{
+        doc_acc := DocAcc,
+        count := Count,
+        idx_vs := IdxVS
+    } = Acc,
+
+    #{
+        id := Id,
+        sequence := LastSeq
+    } = Change,
+
+    DocVS = fabric2_fdb:next_vs(fabric2_fdb:seq_to_vs(LastSeq)),
+
+    case IdxVS =< DocVS of
+        true ->
+            {stop, Acc};
+        false ->
+            Acc1 = case Id of
+                <<?DESIGN_DOC_PREFIX, _/binary>> ->
+                    maps:merge(Acc, #{
+                        count => Count + 1,
+                        idx_seq => LastSeq
+                    });
+                _ ->
+                    Acc#{
+                        doc_acc := DocAcc ++ [Change],
+                        count := Count + 1,
+                        idx_seq := LastSeq
+                    }
+            end,
+            {ok, Acc1}
+    end.
+
+
+index_docs(Db, Idx, Docs) ->
+    lists:foreach(fun (Doc) ->
+        index_doc(Db, Idx, Doc)
+    end, Docs).
+
+
+index_doc(_Db, _Idx, #{deleted := true}) ->
+    ok;
+
+index_doc(Db, Idx, #{doc := Doc}) ->
+    mango_indexer:write_doc(Db, Doc, [Idx]).
+
+
+%%fetch_docs(Db, Changes) ->
+%%    {Deleted, NotDeleted} = lists:partition(fun(Doc) ->
+%%        #{deleted := Deleted} = Doc,
+%%        Deleted
+%%    end, Changes),
+%%
+%%    RevState = lists:foldl(fun(Change, Acc) ->
+%%        #{id := Id} = Change,
+%%        RevFuture = fabric2_fdb:get_winning_revs_future(Db, Id, 1),
+%%        Acc#{
+%%            RevFuture => {Id, Change}
+%%        }
+%%    end, #{}, NotDeleted),
+%%
+%%    RevFutures = maps:keys(RevState),
+%%    BodyState = lists:foldl(fun(RevFuture, Acc) ->
+%%        {Id, Change} = maps:get(RevFuture, RevState),
+%%        Revs = fabric2_fdb:get_winning_revs_wait(Db, RevFuture),
+%%
+%%        % I'm assuming that in this changes transaction that the winning
+%%        % doc body exists since it is listed in the changes feed as not deleted
+%%        #{winner := true} = RevInfo = lists:last(Revs),
+%%        BodyFuture = fabric2_fdb:get_doc_body_future(Db, Id, RevInfo),
+%%        Acc#{
+%%            BodyFuture => {Id, RevInfo, Change}
+%%        }
+%%    end, #{}, erlfdb:wait_for_all(RevFutures)),
+%%
+%%    BodyFutures = maps:keys(BodyState),
+%%    ChangesWithDocs = lists:map(fun (BodyFuture) ->
+%%        {Id, RevInfo, Change} = maps:get(BodyFuture, BodyState),
+%%        Doc = fabric2_fdb:get_doc_body_wait(Db, Id, RevInfo, BodyFuture),
+%%        Change#{doc => Doc}
+%%    end, erlfdb:wait_for_all(BodyFutures)),
+%%
+%%    % This combines the deleted changes with the changes that contain docs
+%%    % Important to note that this is now unsorted. Which is fine for now
+%%    % But later could be an issue if we split this across transactions
+%%    Deleted ++ ChangesWithDocs.
+
+
+report_progress(State, UpdateType) ->
+    #{
+        tx_db := TxDb,
+        job := Job1,
+        job_data := JobData
+    } = State,
+
+    #{
+        <<"db_name">> := DbName,
+        <<"ddoc_id">> := DDocId,
+        <<"columns">> := Columns,
+        <<"retries">> := Retries
+    } = JobData,
+
+    % Reconstruct from scratch to remove any
+    % possible existing error state.
+    NewData = #{
+        <<"db_name">> => DbName,
+        <<"ddoc_id">> => DDocId,
+        <<"columns">> => Columns,
+        <<"retries">> => Retries
+    },
+
+    case UpdateType of
+        update ->
+            case couch_jobs:update(TxDb, Job1, NewData) of
+                {ok, Job2} ->
+                    State#{job := Job2};
+                {error, halt} ->
+                    couch_log:error("~s job halted :: ~w", [?MODULE, Job1]),
+                    exit(normal)
+            end;
+        finished ->
+            case couch_jobs:finish(TxDb, Job1, NewData) of
+                ok ->
+                    State;
+                {error, halt} ->
+                    couch_log:error("~s job halted :: ~w", [?MODULE, Job1]),
+                    exit(normal)
+            end
+    end.
+
+
+num_changes() ->
+    config:get_integer("mango", "change_limit", 100).
+
+
+retry_limit() ->
+    config:get_integer("mango", "retry_limit", 3).
diff --git a/src/mango/src/mango_sup.erl b/src/mango/src/mango_sup.erl
index b0dedf1..fc12dfe 100644
--- a/src/mango/src/mango_sup.erl
+++ b/src/mango/src/mango_sup.erl
@@ -21,4 +21,16 @@ start_link(Args) ->
     supervisor:start_link({local,?MODULE}, ?MODULE, Args).
 
 init([]) ->
-    {ok, {{one_for_one, 3, 10}, couch_epi:register_service(mango_epi, [])}}.
+    Flags = #{
+        strategy => one_for_one,
+        intensity => 3,
+        period => 10
+    },
+
+    Children = [
+        #{
+            id => mango_indexer_server,
+            start => {mango_indexer_server, start_link, []}
+        }
+    ] ++ couch_epi:register_service(mango_epi, []),
+    {ok, {Flags, Children}}.
diff --git a/src/mango/test/01-index-crud-test.py b/src/mango/test/01-index-crud-test.py
index dd9ab1a..3434c66 100644
--- a/src/mango/test/01-index-crud-test.py
+++ b/src/mango/test/01-index-crud-test.py
@@ -91,6 +91,7 @@ class IndexCrudTests(mango.DbPerClass):
         for idx in self.db.list_indexes():
             if idx["name"] != "idx_01":
                 continue
+            print(idx)
             self.assertEqual(idx["def"]["fields"], [{"foo": "asc"}, {"bar": "asc"}])
             return
         raise AssertionError("index not created")
diff --git a/src/mango/test/eunit/mango_indexer_test.erl b/src/mango/test/eunit/mango_indexer_test.erl
index 778caea..ee24b21 100644
--- a/src/mango/test/eunit/mango_indexer_test.erl
+++ b/src/mango/test/eunit/mango_indexer_test.erl
@@ -41,10 +41,7 @@ indexer_test_() ->
 
 setup() ->
     Ctx = test_util:start_couch([
-        fabric,
-        couch_jobs,
-        couch_js,
-        couch_views
+        fabric
     ]),
     Ctx.
 
diff --git a/src/mango/test/eunit/mango_indexer_test.erl b/src/mango/test/eunit/mango_jobs_indexer_test.erl
similarity index 62%
copy from src/mango/test/eunit/mango_indexer_test.erl
copy to src/mango/test/eunit/mango_jobs_indexer_test.erl
index 778caea..7a8cb24 100644
--- a/src/mango/test/eunit/mango_indexer_test.erl
+++ b/src/mango/test/eunit/mango_jobs_indexer_test.erl
@@ -10,13 +10,15 @@
 % License for the specific language governing permissions and limitations under
 % the License.
 
--module(mango_indexer_test).
+-module(mango_jobs_indexer_test).
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch/include/couch_eunit.hrl").
+-include_lib("mango/src/mango.hrl").
 -include_lib("mango/src/mango_cursor.hrl").
--include_lib("fabric/test/fabric2_test.hrl").
+-include_lib("mango/src/mango_idx.hrl").
 
+-include_lib("fabric/test/fabric2_test.hrl").
 
 indexer_test_() ->
     {
@@ -29,11 +31,11 @@ indexer_test_() ->
                 foreach,
                 fun foreach_setup/0,
                 fun foreach_teardown/1,
-                [with([
-                    ?TDEF(index_docs),
-                    ?TDEF(update_doc),
-                    ?TDEF(delete_doc)
-                ])]
+                [
+                    with([?TDEF(index_docs)]),
+                    with([?TDEF(index_lots_of_docs, 10)]),
+                    with([?TDEF(index_can_recover_from_crash, 60)])
+                ]
             }
         }
     }.
@@ -43,9 +45,9 @@ setup() ->
     Ctx = test_util:start_couch([
         fabric,
         couch_jobs,
-        couch_js,
-        couch_views
+        mango
     ]),
+%%    couch_jobs:set_type_timeout(?MANGO_INDEX_JOB_TYPE, 1),
     Ctx.
 
 
@@ -54,57 +56,70 @@ cleanup(Ctx) ->
 
 
 foreach_setup() ->
-    {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]),
-
-    DDoc = create_idx_ddoc(Db),
-    fabric2_db:update_docs(Db, [DDoc]),
-
-    Docs = make_docs(3),
-    fabric2_db:update_docs(Db, Docs),
-    {Db, couch_doc:to_json_obj(DDoc, [])}.
+    DbName = ?tempdb(),
+    {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]),
+    Db.
 
 
-foreach_teardown({Db, _}) ->
+foreach_teardown(Db) ->
+    meck:unload(),
     ok = fabric2_db:delete(fabric2_db:name(Db), []).
 
 
-index_docs({Db, DDoc}) ->
+index_docs(Db) ->
+    DDoc = generate_docs(Db, 5),
+    wait_while_ddoc_builds(Db),
     Docs = run_query(Db, DDoc),
     ?assertEqual([
         [{id, <<"1">>}, {value, 1}],
         [{id, <<"2">>}, {value, 2}],
-        [{id, <<"3">>}, {value, 3}]
-    ], Docs).
-
-update_doc({Db, DDoc}) ->
-    {ok, Doc} = fabric2_db:open_doc(Db, <<"2">>),
-    JsonDoc = couch_doc:to_json_obj(Doc, []),
-    JsonDoc2 = couch_util:json_apply_field({<<"value">>, 4}, JsonDoc),
-    Doc2 = couch_doc:from_json_obj(JsonDoc2),
-    fabric2_db:update_doc(Db, Doc2),
-
-    Docs = run_query(Db, DDoc),
-    ?assertEqual([
-        [{id, <<"1">>}, {value, 1}],
         [{id, <<"3">>}, {value, 3}],
-        [{id, <<"2">>}, {value, 4}]
-    ], Docs).
-
+        [{id, <<"4">>}, {value, 4}],
+        [{id, <<"5">>}, {value, 5}]
+], Docs).
 
-delete_doc({Db, DDoc}) ->
-    {ok, Doc} = fabric2_db:open_doc(Db, <<"2">>),
-    JsonDoc = couch_doc:to_json_obj(Doc, []),
-    JsonDoc2 = couch_util:json_apply_field({<<"_deleted">>, true}, JsonDoc),
-    Doc2 = couch_doc:from_json_obj(JsonDoc2),
-    fabric2_db:update_doc(Db, Doc2),
 
+index_lots_of_docs(Db) ->
+    DDoc = generate_docs(Db, 150),
+    wait_while_ddoc_builds(Db),
+    Docs = run_query(Db, DDoc),
+    ?assertEqual(length(Docs), 150).
+
+
+index_can_recover_from_crash(Db) ->
+    meck:new(mango_indexer, [passthrough]),
+    meck:expect(mango_indexer, write_doc, fun (Db, Doc, Idxs) ->
+        ?debugFmt("doc ~p ~p ~n", [Doc, Idxs]),
+        Id = Doc#doc.id,
+        case Id == <<"2">> of
+            true ->
+                meck:unload(mango_indexer),
+                throw({fake_crash, test_jobs_restart});
+            false ->
+                meck:passthrough([Db, Doc, Idxs])
+        end
+    end),
+    DDoc = generate_docs(Db, 3),
+    wait_while_ddoc_builds(Db),
     Docs = run_query(Db, DDoc),
     ?assertEqual([
         [{id, <<"1">>}, {value, 1}],
+        [{id, <<"2">>}, {value, 2}],
         [{id, <<"3">>}, {value, 3}]
     ], Docs).
 
 
+wait_while_ddoc_builds(Db) ->
+    fabric2_fdb:transactional(Db, fun(TxDb) ->
+        Idxs = mango_idx:list(TxDb),
+        [Idx] = lists:filter(fun (Idx) -> Idx#idx.type == <<"json">> end, Idxs),
+        if Idx#idx.build_status == ?MANGO_INDEX_READY -> ok; true ->
+            timer:sleep(100),
+            wait_while_ddoc_builds(Db)
+        end
+    end).
+
+
 run_query(Db, DDoc) ->
     Args = #{
         start_key => [],
@@ -131,6 +146,16 @@ run_query(Db, DDoc) ->
     end, Acc).
 
 
+generate_docs(Db, Count) ->
+    Docs = make_docs(Count),
+    fabric2_db:update_docs(Db, Docs),
+
+
+    DDoc = create_idx_ddoc(Db),
+    fabric2_db:update_docs(Db, [DDoc]),
+    couch_doc:to_json_obj(DDoc, []).
+
+
 create_idx_ddoc(Db) ->
     Opts = [
         {def, {[{<<"fields">>,{[{<<"value">>,<<"asc">>}]}}]}},
@@ -162,4 +187,3 @@ query_cb({doc, Doc}, #cursor{user_acc = Acc} = Cursor) ->
     {ok, Cursor#cursor{
         user_acc =  Acc ++ [Doc]
     }}.
-
diff --git a/src/mango/test/mango.py b/src/mango/test/mango.py
index a39476d..92cf211 100644
--- a/src/mango/test/mango.py
+++ b/src/mango/test/mango.py
@@ -161,8 +161,12 @@ class Database(object):
 
         created = r.json()["result"] == "created"
         if created:
-            # wait until the database reports the index as available
-            while len(self.get_index(r.json()["id"], r.json()["name"])) < 1:
+            # wait until the database reports the index as available and build
+            while len([
+                    i
+                    for i in self.get_index(r.json()["id"], r.json()["name"])
+                    if i["build_status"] == "ready"
+                    ]) < 1:
                 delay(t=0.1)
 
         return created