You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2020/02/05 12:06:37 UTC
[couchdb] branch fdb-mango-indexes updated: background indexing for
mango
This is an automated email from the ASF dual-hosted git repository.
garren pushed a commit to branch fdb-mango-indexes
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/fdb-mango-indexes by this push:
new f337fc6 background indexing for mango
f337fc6 is described below
commit f337fc6df4f7d6123ed496170f62e053320ead59
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Feb 5 14:06:18 2020 +0200
background indexing for mango
---
src/couch_views/src/couch_views_indexer.erl | 3 +-
src/fabric/src/fabric2_fdb.erl | 12 +-
src/mango/src/mango.hrl | 10 +-
src/mango/src/mango_fdb.erl | 128 ++++++--
src/mango/src/mango_idx.erl | 19 +-
src/mango/src/mango_idx.hrl | 3 +-
src/mango/src/mango_idx_view.erl | 3 +-
src/mango/src/mango_indexer.erl | 24 +-
src/mango/src/mango_indexer_server.erl | 103 ++++++
src/mango/src/mango_jobs.erl | 53 +++
src/mango/src/mango_jobs_indexer.erl | 358 +++++++++++++++++++++
src/mango/src/mango_sup.erl | 14 +-
src/mango/test/01-index-crud-test.py | 1 +
src/mango/test/eunit/mango_indexer_test.erl | 5 +-
...ndexer_test.erl => mango_jobs_indexer_test.erl} | 108 ++++---
src/mango/test/mango.py | 8 +-
16 files changed, 754 insertions(+), 98 deletions(-)
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 75e4b36..745f7e0 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -18,7 +18,8 @@
-export([
- init/0
+ init/0,
+ fetch_docs/2
]).
-include("couch_views.hrl").
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index a814742..6d21ba1 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -60,6 +60,8 @@
seq_to_vs/1,
next_vs/1,
+ new_versionstamp/1,
+
debug_cluster/0,
debug_cluster/2
]).
@@ -872,6 +874,11 @@ next_vs({versionstamp, VS, Batch, TxId}) ->
{versionstamp, V, B, T}.
+new_versionstamp(Tx) ->
+ TxId = erlfdb:get_next_tx_id(Tx),
+ {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
+
+
debug_cluster() ->
debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
@@ -1476,11 +1483,6 @@ get_transaction_id(Tx, LayerPrefix) ->
end.
-new_versionstamp(Tx) ->
- TxId = erlfdb:get_next_tx_id(Tx),
- {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
-
-
on_commit(Tx, Fun) when is_function(Fun, 0) ->
% Here we rely on Tx objects matching. However they contain a nif resource
% object. Before Erlang 20.0 those would have been represented as empty
diff --git a/src/mango/src/mango.hrl b/src/mango/src/mango.hrl
index d3445a8..a1f9325 100644
--- a/src/mango/src/mango.hrl
+++ b/src/mango/src/mango.hrl
@@ -12,5 +12,11 @@
-define(MANGO_ERROR(R), throw({mango_error, ?MODULE, R})).
--define(MANGO_IDX_BUILD_STATUS, 0).
--define(MANGO_IDX_RANGE, 1).
+-define(MANGO_IDX_BUILD_STATUS, 1).
+-define(MANGO_UPDATE_SEQ, 2).
+-define(MANGO_IDX_RANGE, 3).
+
+-define(MANGO_INDEX_JOB_TYPE, <<"mango">>).
+
+-define(MANGO_INDEX_BUILDING, <<"building">>).
+-define(MANGO_INDEX_READY, <<"ready">>).
diff --git a/src/mango/src/mango_fdb.erl b/src/mango/src/mango_fdb.erl
index def942f..a54d658 100644
--- a/src/mango/src/mango_fdb.erl
+++ b/src/mango/src/mango_fdb.erl
@@ -22,13 +22,104 @@
-export([
- query_all_docs/4,
+ create_build_vs/2,
+ set_build_vs/4,
+ get_build_vs/2,
+ get_build_status/2,
+ get_update_seq/2,
+ set_update_seq/3,
remove_doc/3,
write_doc/3,
+ query_all_docs/4,
query/4
]).
+create_build_vs(TxDb, #idx{} = Idx) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ Key = build_vs_key(TxDb, Idx#idx.ddoc),
+ VS = fabric2_fdb:new_versionstamp(Tx),
+ Value = erlfdb_tuple:pack_vs({VS, ?MANGO_INDEX_BUILDING}),
+ erlfdb:set_versionstamped_value(Tx, Key, Value).
+
+
+set_build_vs(TxDb, #idx{} = Idx, VS, State) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ Key = build_vs_key(TxDb, Idx#idx.ddoc),
+ Value = erlfdb_tuple:pack({VS, State}),
+ ok = erlfdb:set(Tx, Key, Value).
+
+
+get_build_vs(TxDb, #idx{} = Idx) ->
+ get_build_vs(TxDb, Idx#idx.ddoc);
+
+get_build_vs(TxDb, DDoc) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+ Key = build_vs_key(TxDb, DDoc),
+ EV = erlfdb:wait(erlfdb:get(Tx, Key)),
+ case EV of
+ not_found -> not_found;
+ EV -> erlfdb_tuple:unpack(EV)
+ end.
+
+
+get_build_status(TxDb, DDoc) ->
+ case get_build_vs(TxDb, DDoc) of
+ not_found -> ?MANGO_INDEX_BUILDING;
+ {_, BuildState} -> BuildState
+ end.
+
+
+get_update_seq(TxDb, #idx{ddoc = DDoc}) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ case erlfdb:wait(erlfdb:get(Tx, seq_key(DbPrefix, DDoc))) of
+ not_found -> <<>>;
+ UpdateSeq -> UpdateSeq
+ end.
+
+
+set_update_seq(TxDb, #idx{ddoc = DDoc}, Seq) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+ ok = erlfdb:set(Tx, seq_key(DbPrefix, DDoc), Seq).
+
+
+remove_doc(TxDb, DocId, IdxResults) ->
+ lists:foreach(fun (IdxResult) ->
+ #{
+ ddoc_id := DDocId,
+ results := Results
+ } = IdxResult,
+ MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
+ clear_key(TxDb, MangoIdxPrefix, Results, DocId)
+ end, IdxResults).
+
+
+write_doc(TxDb, DocId, IdxResults) ->
+ lists:foreach(fun (IdxResult) ->
+ #{
+ ddoc_id := DDocId,
+ results := Results
+ } = IdxResult,
+ MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
+ add_key(TxDb, MangoIdxPrefix, Results, DocId)
+ end, IdxResults).
+
+
query_all_docs(Db, CallBack, Cursor, Args) ->
Opts = args_to_fdb_opts(Args) ++ [{include_docs, true}],
fabric2_db:fold_docs(Db, CallBack, Cursor, Opts).
@@ -133,7 +224,7 @@ fold_cb({Key, _}, Acc) ->
{{_, DocId}} = erlfdb_tuple:unpack(Key, MangoIdxPrefix),
{ok, Doc} = fabric2_db:open_doc(Db, DocId),
JSONDoc = couch_doc:to_json_obj(Doc, []),
- io:format("PRINT ~p ~p ~n", [DocId, JSONDoc]),
+%% io:format("PRINT ~p ~p ~n", [DocId, JSONDoc]),
case Callback({doc, JSONDoc}, Cursor) of
{ok, Cursor1} ->
Acc#{
@@ -144,33 +235,24 @@ fold_cb({Key, _}, Acc) ->
end.
-remove_doc(TxDb, DocId, IdxResults) ->
- lists:foreach(fun (IdxResult) ->
- #{
- ddoc_id := DDocId,
- results := Results
- } = IdxResult,
- MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
- clear_key(TxDb, MangoIdxPrefix, Results, DocId)
- end, IdxResults).
+mango_idx_prefix(TxDb, Id) ->
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+ Key = {?DB_MANGO, Id, ?MANGO_IDX_RANGE},
+ erlfdb_tuple:pack(Key, DbPrefix).
-write_doc(TxDb, DocId, IdxResults) ->
- lists:foreach(fun (IdxResult) ->
- #{
- ddoc_id := DDocId,
- results := Results
- } = IdxResult,
- MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
- add_key(TxDb, MangoIdxPrefix, Results, DocId)
- end, IdxResults).
+seq_key(DbPrefix, DDoc) ->
+ Key = {?DB_MANGO, DDoc, ?MANGO_UPDATE_SEQ},
+ erlfdb_tuple:pack(Key, DbPrefix).
-mango_idx_prefix(TxDb, Id) ->
+build_vs_key(Db, DDoc) ->
#{
db_prefix := DbPrefix
- } = TxDb,
- Key = {?DB_MANGO, Id, ?MANGO_IDX_RANGE},
+ } = Db,
+ Key = {?DB_MANGO, DDoc, ?MANGO_IDX_BUILD_STATUS},
erlfdb_tuple:pack(Key, DbPrefix).
diff --git a/src/mango/src/mango_idx.erl b/src/mango/src/mango_idx.erl
index cf3f507..3aadd49 100644
--- a/src/mango/src/mango_idx.erl
+++ b/src/mango/src/mango_idx.erl
@@ -58,7 +58,7 @@ list(Db) ->
rows => []
},
{ok, Indexes} = fabric2_db:fold_design_docs(Db, fun ddoc_fold_cb/2, Acc0, []),
- io:format("INDEXES ~p ~n", [Indexes]),
+%% io:format("INDEXES ~p ~n", [Indexes]),
Indexes ++ special(Db).
@@ -237,13 +237,16 @@ from_ddoc(Db, {Props}) ->
%% [mango_idx_view]
%% end,
Idxs = lists:flatmap(fun(Mod) -> Mod:from_ddoc({Props}) end, IdxMods),
- lists:map(fun(Idx) ->
- Idx#idx{
- dbname = DbName,
- ddoc = DDoc,
- partitioned = get_idx_partitioned(Db, Props)
- }
- end, Idxs).
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:map(fun(Idx) ->
+ Idx#idx{
+ dbname = DbName,
+ ddoc = DDoc,
+ partitioned = get_idx_partitioned(Db, Props),
+ build_status = mango_fdb:get_build_status(TxDb, DDoc)
+ }
+ end, Idxs)
+ end).
special(Db) ->
diff --git a/src/mango/src/mango_idx.hrl b/src/mango/src/mango_idx.hrl
index 9725950..f5f827b 100644
--- a/src/mango/src/mango_idx.hrl
+++ b/src/mango/src/mango_idx.hrl
@@ -17,5 +17,6 @@
type,
def,
partitioned,
- opts
+ opts,
+ build_status
}).
diff --git a/src/mango/src/mango_idx_view.erl b/src/mango/src/mango_idx_view.erl
index 5ec2a10..949c69b 100644
--- a/src/mango/src/mango_idx_view.erl
+++ b/src/mango/src/mango_idx_view.erl
@@ -105,7 +105,8 @@ to_json(Idx) ->
{name, Idx#idx.name},
{type, Idx#idx.type},
{partitioned, Idx#idx.partitioned},
- {def, {def_to_json(Idx#idx.def)}}
+ {def, {def_to_json(Idx#idx.def)}},
+ {build_status, Idx#idx.build_status}
]}.
diff --git a/src/mango/src/mango_indexer.erl b/src/mango/src/mango_indexer.erl
index c22b9cf..c7632a7 100644
--- a/src/mango/src/mango_indexer.erl
+++ b/src/mango/src/mango_indexer.erl
@@ -17,11 +17,14 @@
-export([
create_doc/2,
update_doc/3,
- delete_doc/2
+ delete_doc/2,
+
+ write_doc/3
]).
-include_lib("couch/include/couch_db.hrl").
+-include("mango.hrl").
-include("mango_idx.hrl").
@@ -42,7 +45,7 @@ modify(Db, Change, Doc, PrevDoc) ->
modify_int(Db, Change, Doc, PrevDoc)
catch
Error:Reason ->
- io:format("ERROR ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]),
+ io:format("ERROR INDEXER ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]),
#{
name := DbName
} = Db,
@@ -66,9 +69,16 @@ doc_id(#doc{id = DocId}, _) ->
% Design doc
% Todo: Check if design doc is mango index and kick off background worker
% to build new index
-modify_int(_Db, _Change, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc,
+modify_int(Db, _Change, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc,
_PrevDoc) ->
- ok;
+ {Props} = JSONDoc = couch_doc:to_json_obj(Doc, []),
+ case proplists:get_value(<<"language">>, Props) of
+ <<"query">> ->
+ [Idx] = mango_idx:from_ddoc(Db, JSONDoc),
+ {ok, _} = mango_jobs:build_index(Db, Idx);
+ _ ->
+ ok
+ end;
modify_int(Db, delete, _, PrevDoc) ->
remove_doc(Db, PrevDoc, json_indexes(Db));
@@ -138,15 +148,13 @@ get_index_entries(IdxDef, Doc) ->
get_index_values(Fields, Doc) ->
- Out1 = lists:map(fun({Field, _Dir}) ->
+ lists:map(fun({Field, _Dir}) ->
case mango_doc:get_field(Doc, Field) of
not_found -> not_found;
bad_path -> not_found;
Value -> Value
end
- end, Fields),
- io:format("OUT ~p ~p ~n", [Fields, Out1]),
- Out1.
+ end, Fields).
get_index_partial_filter_selector(IdxDef) ->
diff --git a/src/mango/src/mango_indexer_server.erl b/src/mango/src/mango_indexer_server.erl
new file mode 100644
index 0000000..29530bb
--- /dev/null
+++ b/src/mango/src/mango_indexer_server.erl
@@ -0,0 +1,103 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mango_indexer_server).
+
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0
+]).
+
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-define(MAX_WORKERS, 1).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+ process_flag(trap_exit, true),
+ mango_jobs:set_timeout(),
+ St = #{
+ workers => #{},
+ max_workers => max_workers()
+ },
+ {ok, spawn_workers(St)}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'EXIT', Pid, Reason}, St) ->
+ #{workers := Workers} = St,
+ case maps:is_key(Pid, Workers) of
+ true ->
+ if Reason == normal -> ok; true ->
+ LogMsg = "~p : indexer process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason])
+ end,
+ NewWorkers = maps:remove(Pid, Workers),
+ {noreply, spawn_workers(St#{workers := NewWorkers})};
+ false ->
+ LogMsg = "~p : unknown process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {stop, {unknown_pid_exit, Pid}, St}
+ end;
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+spawn_workers(St) ->
+ #{
+ workers := Workers,
+ max_workers := MaxWorkers
+ } = St,
+ case maps:size(Workers) < MaxWorkers of
+ true ->
+ Pid = mango_jobs_indexer:spawn_link(),
+ NewSt = St#{workers := Workers#{Pid => true}},
+ spawn_workers(NewSt);
+ false ->
+ St
+ end.
+
+
+max_workers() ->
+ config:get_integer("mango", "max_workers", ?MAX_WORKERS).
diff --git a/src/mango/src/mango_jobs.erl b/src/mango/src/mango_jobs.erl
new file mode 100644
index 0000000..6739d62
--- /dev/null
+++ b/src/mango/src/mango_jobs.erl
@@ -0,0 +1,53 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+
+
+-module(mango_jobs).
+
+-include("mango_idx.hrl").
+-include("mango.hrl").
+
+
+-export([
+ set_timeout/0,
+ build_index/2
+]).
+
+
+set_timeout() ->
+ couch_jobs:set_type_timeout(?MANGO_INDEX_JOB_TYPE, 6).
+
+
+build_index(TxDb, #idx{} = Idx) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ mango_fdb:create_build_vs(TxDb, Idx),
+
+ JobId = job_id(TxDb, Idx),
+ JobData = job_data(TxDb, Idx),
+ ok = couch_jobs:add(undefined, ?MANGO_INDEX_JOB_TYPE, JobId, JobData),
+ {ok, JobId}.
+
+
+job_id(#{name := DbName}, #idx{ddoc = DDoc}) ->
+ <<DbName/binary, "-",DDoc/binary>>.
+
+
+job_data(Db, Idx) ->
+ #{
+ db_name => fabric2_db:name(Db),
+ ddoc_id => mango_idx:ddoc(Idx),
+ columns => mango_idx:columns(Idx),
+ retries => 0
+ }.
+
diff --git a/src/mango/src/mango_jobs_indexer.erl b/src/mango/src/mango_jobs_indexer.erl
new file mode 100644
index 0000000..ce6b850
--- /dev/null
+++ b/src/mango/src/mango_jobs_indexer.erl
@@ -0,0 +1,358 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Todo: this is a copy-pasta of couch_views_indexer
+% We need to make the indexing generic and have only the specific mango
+% logic here
+-module(mango_jobs_indexer).
+
+-export([
+ spawn_link/0
+]).
+
+
+-export([
+ init/0
+]).
+
+-include("mango.hrl").
+-include("mango_idx.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+
+spawn_link() ->
+ proc_lib:spawn_link(?MODULE, init, []).
+
+
+init() ->
+ {ok, Job, Data} = couch_jobs:accept(?MANGO_INDEX_JOB_TYPE, #{}),
+ #{
+ <<"db_name">> := DbName,
+ <<"ddoc_id">> := DDocId,
+ <<"columns">> := JobColumns,
+ <<"retries">> := Retries
+ } = Data,
+
+ {ok, Db} = try
+ fabric2_db:open(DbName, [?ADMIN_CTX])
+ catch error:database_does_not_exist ->
+ couch_jobs:finish(undefined, Job, Data#{
+ error => db_deleted,
+ reason => "Database was deleted"
+ }),
+ exit(normal)
+ end,
+
+ [Idx] = case fabric2_db:open_doc(Db, DDocId) of
+ {ok, DDoc} ->
+ JSONDDoc = couch_doc:to_json_obj(DDoc, []),
+ mango_idx:from_ddoc(Db, JSONDDoc);
+ {not_found, _} ->
+ couch_jobs:finish(undefined, Job, Data#{
+ error => ddoc_deleted,
+ reason => "Design document was deleted"
+ }),
+ exit(normal)
+ end,
+
+ Columns = mango_idx:columns(Idx),
+
+ if JobColumns == Columns -> ok; true ->
+ couch_jobs:finish(undefined, Job, Data#{
+ error => index_changed,
+ reason => <<"Design document was modified">>
+ }),
+ exit(normal)
+ end,
+
+
+ State = #{
+ tx_db => undefined,
+ idx_vs => undefined,
+ idx_seq => undefined,
+ last_seq => undefined,
+ job => Job,
+ job_data => Data,
+ count => 0,
+ limit => num_changes(),
+ doc_acc => []
+ },
+
+ try
+ update(Db, Idx, State)
+ catch
+ exit:normal ->
+ ok;
+ Error:Reason ->
+ io:format("ERROR in index worker ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]),
+ NewRetry = Retries + 1,
+ RetryLimit = retry_limit(),
+
+ case should_retry(NewRetry, RetryLimit, Reason) of
+ true ->
+ DataErr = Data#{<<"retries">> := NewRetry},
+ StateErr = State#{job_data := DataErr},
+ report_progress(StateErr, update);
+ false ->
+ NewData = add_error(Error, Reason, Data),
+ couch_jobs:finish(undefined, Job, NewData),
+ exit(normal)
+ end
+ end.
+
+
+% Transaction limit exceeded don't retry
+should_retry(_, _, {erlfdb_error, 2101}) ->
+ false;
+
+should_retry(Retries, RetryLimit, _) when Retries < RetryLimit ->
+ true;
+
+should_retry(_, _, _) ->
+ false.
+
+
+add_error(error, {erlfdb_error, Code}, Data) ->
+ CodeBin = couch_util:to_binary(Code),
+ CodeString = erlfdb:get_error_string(Code),
+ Data#{
+ error => foundationdb_error,
+ reason => list_to_binary([CodeBin, <<"-">>, CodeString])
+ };
+
+add_error(Error, Reason, Data) ->
+ Data#{
+ error => couch_util:to_binary(Error),
+ reason => couch_util:to_binary(Reason)
+ }.
+
+
+update(#{} = Db, #idx{} = Idx, State0) ->
+ {Idx2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ % In the first iteration of update we need
+ % to populate our db and view sequences
+ State1 = case State0 of
+ #{idx_vs := undefined} ->
+ #{
+ job := Job,
+ job_data := Data
+ } = State0,
+
+ {IdxVS, BuildState} = mango_fdb:get_build_vs(TxDb, Idx),
+ if BuildState == ?MANGO_INDEX_BUILDING -> ok; true ->
+ couch_jobs:finish(undefined, Job, Data#{
+ error => index_built,
+ reason => <<"Index is already built">>
+ }),
+ exit(normal)
+ end,
+
+ IdxSeq = mango_fdb:get_update_seq(TxDb, Idx),
+
+ State0#{
+ tx_db := TxDb,
+ idx_vs := IdxVS,
+ idx_seq := IdxSeq
+ };
+ _ ->
+ State0#{
+ tx_db := TxDb
+ }
+ end,
+
+ {ok, State2} = fold_changes(State1),
+
+ #{
+ idx_vs := IdxVS1,
+ count := Count,
+ limit := Limit,
+ doc_acc := DocAcc,
+ idx_seq := IdxSeq1
+ } = State2,
+
+ DocAcc1 = couch_views_indexer:fetch_docs(TxDb, DocAcc),
+ index_docs(TxDb, Idx, DocAcc1),
+ mango_fdb:set_update_seq(TxDb, Idx, IdxSeq1),
+ case Count < Limit of
+ true ->
+ mango_fdb:set_build_vs(TxDb, Idx, IdxVS1, ?MANGO_INDEX_READY),
+ report_progress(State2, finished),
+ {Idx, finished};
+ false ->
+ State3 = report_progress(State2, update),
+ {Idx, State3#{
+ tx_db := undefined,
+ count := 0,
+ doc_acc := [],
+ idx_seq := IdxSeq1
+ }}
+ end
+ end),
+
+ case State4 of
+ finished ->
+ ok;
+ _ ->
+ update(Db, Idx2, State4)
+ end.
+
+
+fold_changes(State) ->
+ #{
+ idx_seq := SinceSeq,
+ limit := Limit,
+ tx_db := TxDb
+ } = State,
+
+ Fun = fun process_changes/2,
+ fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
+
+
+process_changes(Change, Acc) ->
+ #{
+ doc_acc := DocAcc,
+ count := Count,
+ idx_vs := IdxVS
+ } = Acc,
+
+ #{
+ id := Id,
+ sequence := LastSeq
+ } = Change,
+
+ DocVS = fabric2_fdb:next_vs(fabric2_fdb:seq_to_vs(LastSeq)),
+
+ case IdxVS =< DocVS of
+ true ->
+ {stop, Acc};
+ false ->
+ Acc1 = case Id of
+ <<?DESIGN_DOC_PREFIX, _/binary>> ->
+ maps:merge(Acc, #{
+ count => Count + 1,
+ idx_seq => LastSeq
+ });
+ _ ->
+ Acc#{
+ doc_acc := DocAcc ++ [Change],
+ count := Count + 1,
+ idx_seq := LastSeq
+ }
+ end,
+ {ok, Acc1}
+ end.
+
+
+index_docs(Db, Idx, Docs) ->
+ lists:foreach(fun (Doc) ->
+ index_doc(Db, Idx, Doc)
+ end, Docs).
+
+
+index_doc(_Db, _Idx, #{deleted := true}) ->
+ ok;
+
+index_doc(Db, Idx, #{doc := Doc}) ->
+ mango_indexer:write_doc(Db, Doc, [Idx]).
+
+
+%%fetch_docs(Db, Changes) ->
+%% {Deleted, NotDeleted} = lists:partition(fun(Doc) ->
+%% #{deleted := Deleted} = Doc,
+%% Deleted
+%% end, Changes),
+%%
+%% RevState = lists:foldl(fun(Change, Acc) ->
+%% #{id := Id} = Change,
+%% RevFuture = fabric2_fdb:get_winning_revs_future(Db, Id, 1),
+%% Acc#{
+%% RevFuture => {Id, Change}
+%% }
+%% end, #{}, NotDeleted),
+%%
+%% RevFutures = maps:keys(RevState),
+%% BodyState = lists:foldl(fun(RevFuture, Acc) ->
+%% {Id, Change} = maps:get(RevFuture, RevState),
+%% Revs = fabric2_fdb:get_winning_revs_wait(Db, RevFuture),
+%%
+%% % I'm assuming that in this changes transaction that the winning
+%% % doc body exists since it is listed in the changes feed as not deleted
+%% #{winner := true} = RevInfo = lists:last(Revs),
+%% BodyFuture = fabric2_fdb:get_doc_body_future(Db, Id, RevInfo),
+%% Acc#{
+%% BodyFuture => {Id, RevInfo, Change}
+%% }
+%% end, #{}, erlfdb:wait_for_all(RevFutures)),
+%%
+%% BodyFutures = maps:keys(BodyState),
+%% ChangesWithDocs = lists:map(fun (BodyFuture) ->
+%% {Id, RevInfo, Change} = maps:get(BodyFuture, BodyState),
+%% Doc = fabric2_fdb:get_doc_body_wait(Db, Id, RevInfo, BodyFuture),
+%% Change#{doc => Doc}
+%% end, erlfdb:wait_for_all(BodyFutures)),
+%%
+%% % This combines the deleted changes with the changes that contain docs
+%% % Important to note that this is now unsorted. Which is fine for now
+%% % But later could be an issue if we split this across transactions
+%% Deleted ++ ChangesWithDocs.
+
+
+report_progress(State, UpdateType) ->
+ #{
+ tx_db := TxDb,
+ job := Job1,
+ job_data := JobData
+ } = State,
+
+ #{
+ <<"db_name">> := DbName,
+ <<"ddoc_id">> := DDocId,
+ <<"columns">> := Columns,
+ <<"retries">> := Retries
+ } = JobData,
+
+ % Reconstruct from scratch to remove any
+ % possible existing error state.
+ NewData = #{
+ <<"db_name">> => DbName,
+ <<"ddoc_id">> => DDocId,
+ <<"columns">> => Columns,
+ <<"retries">> => Retries
+ },
+
+ case UpdateType of
+ update ->
+ case couch_jobs:update(TxDb, Job1, NewData) of
+ {ok, Job2} ->
+ State#{job := Job2};
+ {error, halt} ->
+ couch_log:error("~s job halted :: ~w", [?MODULE, Job1]),
+ exit(normal)
+ end;
+ finished ->
+ case couch_jobs:finish(TxDb, Job1, NewData) of
+ ok ->
+ State;
+ {error, halt} ->
+ couch_log:error("~s job halted :: ~w", [?MODULE, Job1]),
+ exit(normal)
+ end
+ end.
+
+
+num_changes() ->
+ config:get_integer("mango", "change_limit", 100).
+
+
+retry_limit() ->
+ config:get_integer("mango", "retry_limit", 3).
diff --git a/src/mango/src/mango_sup.erl b/src/mango/src/mango_sup.erl
index b0dedf1..fc12dfe 100644
--- a/src/mango/src/mango_sup.erl
+++ b/src/mango/src/mango_sup.erl
@@ -21,4 +21,16 @@ start_link(Args) ->
supervisor:start_link({local,?MODULE}, ?MODULE, Args).
init([]) ->
- {ok, {{one_for_one, 3, 10}, couch_epi:register_service(mango_epi, [])}}.
+ Flags = #{
+ strategy => one_for_one,
+ intensity => 3,
+ period => 10
+ },
+
+ Children = [
+ #{
+ id => mango_indexer_server,
+ start => {mango_indexer_server, start_link, []}
+ }
+ ] ++ couch_epi:register_service(mango_epi, []),
+ {ok, {Flags, Children}}.
diff --git a/src/mango/test/01-index-crud-test.py b/src/mango/test/01-index-crud-test.py
index dd9ab1a..3434c66 100644
--- a/src/mango/test/01-index-crud-test.py
+++ b/src/mango/test/01-index-crud-test.py
@@ -91,6 +91,7 @@ class IndexCrudTests(mango.DbPerClass):
for idx in self.db.list_indexes():
if idx["name"] != "idx_01":
continue
+ print(idx)
self.assertEqual(idx["def"]["fields"], [{"foo": "asc"}, {"bar": "asc"}])
return
raise AssertionError("index not created")
diff --git a/src/mango/test/eunit/mango_indexer_test.erl b/src/mango/test/eunit/mango_indexer_test.erl
index 778caea..ee24b21 100644
--- a/src/mango/test/eunit/mango_indexer_test.erl
+++ b/src/mango/test/eunit/mango_indexer_test.erl
@@ -41,10 +41,7 @@ indexer_test_() ->
setup() ->
Ctx = test_util:start_couch([
- fabric,
- couch_jobs,
- couch_js,
- couch_views
+ fabric
]),
Ctx.
diff --git a/src/mango/test/eunit/mango_indexer_test.erl b/src/mango/test/eunit/mango_jobs_indexer_test.erl
similarity index 62%
copy from src/mango/test/eunit/mango_indexer_test.erl
copy to src/mango/test/eunit/mango_jobs_indexer_test.erl
index 778caea..7a8cb24 100644
--- a/src/mango/test/eunit/mango_indexer_test.erl
+++ b/src/mango/test/eunit/mango_jobs_indexer_test.erl
@@ -10,13 +10,15 @@
% License for the specific language governing permissions and limitations under
% the License.
--module(mango_indexer_test).
+-module(mango_jobs_indexer_test).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("mango/src/mango.hrl").
-include_lib("mango/src/mango_cursor.hrl").
--include_lib("fabric/test/fabric2_test.hrl").
+-include_lib("mango/src/mango_idx.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
indexer_test_() ->
{
@@ -29,11 +31,11 @@ indexer_test_() ->
foreach,
fun foreach_setup/0,
fun foreach_teardown/1,
- [with([
- ?TDEF(index_docs),
- ?TDEF(update_doc),
- ?TDEF(delete_doc)
- ])]
+ [
+ with([?TDEF(index_docs)]),
+ with([?TDEF(index_lots_of_docs, 10)]),
+ with([?TDEF(index_can_recover_from_crash, 60)])
+ ]
}
}
}.
@@ -43,9 +45,9 @@ setup() ->
Ctx = test_util:start_couch([
fabric,
couch_jobs,
- couch_js,
- couch_views
+ mango
]),
+%% couch_jobs:set_type_timeout(?MANGO_INDEX_JOB_TYPE, 1),
Ctx.
@@ -54,57 +56,70 @@ cleanup(Ctx) ->
foreach_setup() ->
- {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]),
-
- DDoc = create_idx_ddoc(Db),
- fabric2_db:update_docs(Db, [DDoc]),
-
- Docs = make_docs(3),
- fabric2_db:update_docs(Db, Docs),
- {Db, couch_doc:to_json_obj(DDoc, [])}.
+ DbName = ?tempdb(),
+ {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]),
+ Db.
-foreach_teardown({Db, _}) ->
+foreach_teardown(Db) ->
+ meck:unload(),
ok = fabric2_db:delete(fabric2_db:name(Db), []).
-index_docs({Db, DDoc}) ->
+index_docs(Db) ->
+ DDoc = generate_docs(Db, 5),
+ wait_while_ddoc_builds(Db),
Docs = run_query(Db, DDoc),
?assertEqual([
[{id, <<"1">>}, {value, 1}],
[{id, <<"2">>}, {value, 2}],
- [{id, <<"3">>}, {value, 3}]
- ], Docs).
-
-update_doc({Db, DDoc}) ->
- {ok, Doc} = fabric2_db:open_doc(Db, <<"2">>),
- JsonDoc = couch_doc:to_json_obj(Doc, []),
- JsonDoc2 = couch_util:json_apply_field({<<"value">>, 4}, JsonDoc),
- Doc2 = couch_doc:from_json_obj(JsonDoc2),
- fabric2_db:update_doc(Db, Doc2),
-
- Docs = run_query(Db, DDoc),
- ?assertEqual([
- [{id, <<"1">>}, {value, 1}],
[{id, <<"3">>}, {value, 3}],
- [{id, <<"2">>}, {value, 4}]
- ], Docs).
-
+ [{id, <<"4">>}, {value, 4}],
+ [{id, <<"5">>}, {value, 5}]
+], Docs).
-delete_doc({Db, DDoc}) ->
- {ok, Doc} = fabric2_db:open_doc(Db, <<"2">>),
- JsonDoc = couch_doc:to_json_obj(Doc, []),
- JsonDoc2 = couch_util:json_apply_field({<<"_deleted">>, true}, JsonDoc),
- Doc2 = couch_doc:from_json_obj(JsonDoc2),
- fabric2_db:update_doc(Db, Doc2),
+index_lots_of_docs(Db) ->
+ DDoc = generate_docs(Db, 150),
+ wait_while_ddoc_builds(Db),
+ Docs = run_query(Db, DDoc),
+ ?assertEqual(length(Docs), 150).
+
+
+index_can_recover_from_crash(Db) ->
+ meck:new(mango_indexer, [passthrough]),
+ meck:expect(mango_indexer, write_doc, fun (Db, Doc, Idxs) ->
+ ?debugFmt("doc ~p ~p ~n", [Doc, Idxs]),
+ Id = Doc#doc.id,
+ case Id == <<"2">> of
+ true ->
+ meck:unload(mango_indexer),
+ throw({fake_crash, test_jobs_restart});
+ false ->
+ meck:passthrough([Db, Doc, Idxs])
+ end
+ end),
+ DDoc = generate_docs(Db, 3),
+ wait_while_ddoc_builds(Db),
Docs = run_query(Db, DDoc),
?assertEqual([
[{id, <<"1">>}, {value, 1}],
+ [{id, <<"2">>}, {value, 2}],
[{id, <<"3">>}, {value, 3}]
], Docs).
+wait_while_ddoc_builds(Db) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ Idxs = mango_idx:list(TxDb),
+ [Idx] = lists:filter(fun (Idx) -> Idx#idx.type == <<"json">> end, Idxs),
+ if Idx#idx.build_status == ?MANGO_INDEX_READY -> ok; true ->
+ timer:sleep(100),
+ wait_while_ddoc_builds(Db)
+ end
+ end).
+
+
run_query(Db, DDoc) ->
Args = #{
start_key => [],
@@ -131,6 +146,16 @@ run_query(Db, DDoc) ->
end, Acc).
+generate_docs(Db, Count) ->
+ Docs = make_docs(Count),
+ fabric2_db:update_docs(Db, Docs),
+
+
+ DDoc = create_idx_ddoc(Db),
+ fabric2_db:update_docs(Db, [DDoc]),
+ couch_doc:to_json_obj(DDoc, []).
+
+
create_idx_ddoc(Db) ->
Opts = [
{def, {[{<<"fields">>,{[{<<"value">>,<<"asc">>}]}}]}},
@@ -162,4 +187,3 @@ query_cb({doc, Doc}, #cursor{user_acc = Acc} = Cursor) ->
{ok, Cursor#cursor{
user_acc = Acc ++ [Doc]
}}.
-
diff --git a/src/mango/test/mango.py b/src/mango/test/mango.py
index a39476d..92cf211 100644
--- a/src/mango/test/mango.py
+++ b/src/mango/test/mango.py
@@ -161,8 +161,12 @@ class Database(object):
created = r.json()["result"] == "created"
if created:
- # wait until the database reports the index as available
- while len(self.get_index(r.json()["id"], r.json()["name"])) < 1:
+ # wait until the database reports the index as available and build
+ while len([
+ i
+ for i in self.get_index(r.json()["id"], r.json()["name"])
+ if i["build_status"] == "ready"
+ ]) < 1:
delay(t=0.1)
return created