You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by be...@apache.org on 2014/02/13 17:35:52 UTC
[19/23] goldrush commit: updated refs/heads/import-master to 71e6321
Make data recovery impregnable
Project: http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/commit/0f1f848b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/tree/0f1f848b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/diff/0f1f848b
Branch: refs/heads/import-master
Commit: 0f1f848bd96f4d580358ef7c2365f8bb957b8ee0
Parents: 1b77f97
Author: Pedram Nimreezi <de...@deadzen.com>
Authored: Fri Nov 8 01:46:11 2013 -0500
Committer: Pedram Nimreezi <de...@deadzen.com>
Committed: Fri Nov 8 01:46:11 2013 -0500
----------------------------------------------------------------------
src/glc.erl | 20 +++++++
src/glc_code.erl | 4 +-
src/gr_counter.erl | 109 +++++++++++++++++++++++++++-------
src/gr_manager.erl | 6 +-
src/gr_param.erl | 152 +++++++++++++++++++++++++++++++++++++-----------
5 files changed, 230 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/blob/0f1f848b/src/glc.erl
----------------------------------------------------------------------
diff --git a/src/glc.erl b/src/glc.erl
index 95e3ebf..e99c861 100644
--- a/src/glc.erl
+++ b/src/glc.erl
@@ -496,6 +496,26 @@ events_test_() ->
?assertEqual(0, Mod:info(filter)),
?assertEqual(0, Mod:info(output))
end
+ },
+ {"ets data recovery test",
+ fun() ->
+ Self = self(),
+ {compiled, Mod} = setup_query(testmod15,
+ glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)),
+ glc:handle(Mod, gre:make([{a,1}], [list])),
+ ?assertEqual(1, Mod:info(output)),
+ ?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end),
+ ?assertEqual(1, length(gr_param:list(Mod:table(params)))),
+ ?assertEqual(3, length(gr_param:list(Mod:table(counters)))),
+ true = exit(whereis(Mod:table(params)), kill),
+ true = exit(whereis(Mod:table(counters)), kill),
+ ?assertEqual(1, Mod:info(input)),
+ glc:handle(Mod, gre:make([{'a', 1}], [list])),
+ ?assertEqual(2, Mod:info(input)),
+ ?assertEqual(2, Mod:info(output)),
+ ?assertEqual(1, length(gr_param:list(Mod:table(params)))),
+ ?assertEqual(3, length(gr_counter:list(Mod:table(counters))))
+ end
}
]
}.
http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/blob/0f1f848b/src/glc_code.erl
----------------------------------------------------------------------
diff --git a/src/glc_code.erl b/src/glc_code.erl
index e3d69fa..be75b9f 100644
--- a/src/glc_code.erl
+++ b/src/glc_code.erl
@@ -287,7 +287,7 @@ abstract_getparam_(Term, OnBound, #state{paramstab=ParamsTable,
[{_, Key2}] ->
Key2;
[] ->
- Key2 = gr_param:size(ParamsTable),
+ Key2 = gr_param:info_size(ParamsTable),
gr_param:insert(ParamsTable, {Term, Key2}),
Key2
end,
@@ -338,7 +338,7 @@ param_variable(Key) ->
%% @todo Pass state record. Only Generate code if `statistics' is enabled.
-spec abstract_count(atom()) -> syntaxTree().
abstract_count(Counter) ->
- abstract_apply(gr_counter, update,
+ abstract_apply(gr_counter, update_counter,
[abstract_apply(table, [?erl:atom(counters)]),
?erl:abstract(Counter),
?erl:abstract({2,1})]).
http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/blob/0f1f848b/src/gr_counter.erl
----------------------------------------------------------------------
diff --git a/src/gr_counter.erl b/src/gr_counter.erl
index 82d99e8..60662b9 100644
--- a/src/gr_counter.erl
+++ b/src/gr_counter.erl
@@ -19,7 +19,7 @@
%% API
-export([start_link/1,
list/1, lookup_element/2,
- update/3, reset_counters/2]).
+ update_counter/3, reset_counters/2]).
%% gen_server callbacks
-export([init/1,
@@ -29,22 +29,47 @@
terminate/2,
code_change/3]).
--record(state, {init=true, table_id}).
+-record(state, {table_id, waiting=[]}).
%%%===================================================================
%%% API
%%%===================================================================
list(Server) ->
- gen_server:call(Server, list).
-
-lookup_element(Server, Counter) ->
- gen_server:call(Server, {lookup_element, Counter}).
-
-update(Server, Counter, Value) ->
+ case (catch gen_server:call(Server, list)) of
+ {'EXIT', _Reason} ->
+ list(gr_manager:wait_for_pid(Server));
+ Else -> Else
+ end.
+
+lookup_element(Server, Term) ->
+ case (catch gen_server:call(Server, {lookup_element, Term})) of
+ {'EXIT', _Reason} ->
+ lookup_element(gr_manager:wait_for_pid(Server), Term);
+ Else -> Else
+ end.
+
+update_counter(Server, Counter, Value) when is_atom(Server) ->
+ case whereis(Server) of
+ undefined ->
+ update_counter(gr_manager:wait_for_pid(Server), Counter, Value);
+ Pid ->
+ case erlang:is_process_alive(Pid) of
+ true ->
+ update_counter(Pid, Counter, Value);
+ false ->
+ ServerPid = gr_manager:wait_for_pid(Server),
+ update_counter(ServerPid, Counter, Value)
+ end
+ end;
+update_counter(Server, Counter, Value) when is_pid(Server) ->
gen_server:cast(Server, {update, Counter, Value}).
reset_counters(Server, Counter) ->
- gen_server:call(Server, {reset_counters, Counter}).
+ case (catch gen_server:call(Server, {reset_counters, Counter})) of
+ {'EXIT', _Reason} ->
+ reset_counters(gr_manager:wait_for_pid(Server), Counter);
+ Else -> Else
+ end.
%%--------------------------------------------------------------------
%% @doc
@@ -88,21 +113,34 @@ init([]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_call(list, _From, State) ->
+handle_call(list=Call, From, State) ->
TableId = State#state.table_id,
- {reply, {ok, ets:tab2list(TableId)}, State};
-handle_call({lookup_element, Counter}, _From, State) ->
+ Waiting = State#state.waiting,
+ case TableId of
+ undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
+ _ -> {reply, handle_list(TableId), State}
+ end;
+handle_call({lookup_element, Term}=Call, From, State) ->
TableId = State#state.table_id,
- {reply, ets:lookup_element(TableId, Counter, 2), State};
-handle_call({reset_counters, Counter}, _From, State) ->
- TableId = State#state.table_id,
- Reset = case Counter of
+ Waiting = State#state.waiting,
+ case TableId of
+ undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
+ _ -> {reply, handle_lookup_element(TableId, Term), State}
+ end;
+handle_call({reset_counters, Counter}, From, State) ->
+ Term = case Counter of
_ when is_list(Counter) ->
[{Item, 0} || Item <- Counter];
_ when is_atom(Counter) ->
[{Counter, 0}]
end,
- {reply, ets:insert(TableId, Reset), State};
+ Call = {insert, Term},
+ TableId = State#state.table_id,
+ Waiting = State#state.waiting,
+ case TableId of
+ undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
+ _ -> {reply, handle_insert(TableId, Term), State}
+ end;
handle_call(_Request, _From, State) ->
Reply = {error, unhandled_message},
{reply, Reply, State}.
@@ -117,10 +155,15 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_cast({update, Counter, Value}, State) ->
+handle_cast({update, Counter, Value}=Call, State) ->
TableId = State#state.table_id,
- ets:update_counter(TableId, Counter, Value),
- {noreply, State};
+ Waiting = State#state.waiting,
+ State2 = case TableId of
+ undefined -> State#state{waiting=[Call|Waiting]};
+ _ -> handle_update_counter(TableId, Counter, Value),
+ State
+ end,
+ {noreply, State2};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -135,7 +178,11 @@ handle_cast(_Msg, State) ->
%% @end
%%--------------------------------------------------------------------
handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) ->
- {noreply, State#state{table_id=TableId}};
+ [ gen_server:reply(From, perform_call(TableId, Call))
+ || {Call, From} <- State#state.waiting ],
+ [ handle_update_counter(TableId, Counter, Value)
+ || {update, Counter, Value} <- State#state.waiting ],
+ {noreply, State#state{table_id=TableId, waiting=[]}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -168,4 +215,24 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================
+perform_call(TableId, Call) ->
+ case Call of
+ list ->
+ handle_list(TableId);
+ {insert, Term} ->
+ handle_insert(TableId, Term);
+ {lookup_element, Term} ->
+ handle_lookup_element(TableId, Term)
+ end.
+
+handle_list(TableId) ->
+ ets:tab2list(TableId).
+
+handle_update_counter(TableId, Counter, Value) ->
+ ets:update_counter(TableId, Counter, Value).
+
+handle_insert(TableId, Term) ->
+ ets:insert(TableId, Term).
+handle_lookup_element(TableId, Term) ->
+ ets:lookup_element(TableId, Term, 2).
http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/blob/0f1f848b/src/gr_manager.erl
----------------------------------------------------------------------
diff --git a/src/gr_manager.erl b/src/gr_manager.erl
index c64f74e..93bf735 100644
--- a/src/gr_manager.erl
+++ b/src/gr_manager.erl
@@ -17,7 +17,7 @@
-behaviour(gen_server).
%% API
--export([start_link/3]).
+-export([start_link/3, wait_for_pid/1]).
%% gen_server callbacks
-export([init/1,
@@ -125,12 +125,12 @@ handle_info({'ETS-TRANSFER', TableId, _Pid, Data}, State = #state{managee=Manage
ets:give_away(TableId, ManageePid, Data),
{noreply, State#state{table_id=TableId}}.
-wait_for_pid(Managee) ->
+wait_for_pid(Managee) when is_atom(Managee), Managee =/= undefined ->
case whereis(Managee) of
undefined ->
timer:sleep(1),
wait_for_pid(Managee);
- Pid -> Pid
+ ManageePid -> ManageePid
end.
http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/blob/0f1f848b/src/gr_param.erl
----------------------------------------------------------------------
diff --git a/src/gr_param.erl b/src/gr_param.erl
index e7ac2a2..6fc9ec4 100644
--- a/src/gr_param.erl
+++ b/src/gr_param.erl
@@ -18,9 +18,9 @@
%% API
-export([start_link/1,
- list/1, size/1, insert/2,
+ list/1, insert/2,
lookup/2, lookup_element/2,
- info/1, transform/1]).
+ info/1, info_size/1, transform/1]).
%% gen_server callbacks
-export([init/1,
@@ -30,32 +30,60 @@
terminate/2,
code_change/3]).
--record(state, {init=true, table_id}).
+-record(state, {table_id, waiting=[]}).
%%%===================================================================
%%% API
%%%===================================================================
list(Server) ->
- gen_server:call(Server, list).
+ case (catch gen_server:call(Server, list)) of
+ {'EXIT', _Reason} ->
+ list(gr_manager:wait_for_pid(Server));
+ Else -> Else
+ end.
-size(Server) ->
- gen_server:call(Server, size).
+info_size(Server) ->
+ case (catch gen_server:call(Server, info_size)) of
+ {'EXIT', _Reason} ->
+ info_size(gr_manager:wait_for_pid(Server));
+ Else -> Else
+ end.
-insert(Server, Data) ->
- gen_server:call(Server, {insert, Data}).
+insert(Server, Term) ->
+ case (catch gen_server:call(Server, {insert, Term})) of
+ {'EXIT', _Reason} ->
+ insert(gr_manager:wait_for_pid(Server), Term);
+ Else -> Else
+ end.
lookup(Server, Term) ->
- gen_server:call(Server, {lookup, Term}).
+ case (catch gen_server:call(Server, {lookup, Term})) of
+ {'EXIT', _Reason} ->
+ lookup(gr_manager:wait_for_pid(Server), Term);
+ Else -> Else
+ end.
lookup_element(Server, Term) ->
- gen_server:call(Server, {lookup_element, Term}).
+ case (catch gen_server:call(Server, {lookup_element, Term})) of
+ {'EXIT', _Reason} ->
+ lookup_element(gr_manager:wait_for_pid(Server), Term);
+ Else -> Else
+ end.
info(Server) ->
- gen_server:call(Server, info).
+ case (catch gen_server:call(Server, info)) of
+ {'EXIT', _Reason} ->
+ info(gr_manager:wait_for_pid(Server));
+ Else -> Else
+ end.
%% @doc Transform Term -> Key to Key -> Term
transform(Server) ->
- gen_server:call(Server, transform).
+ case (catch gen_server:call(Server, transform)) of
+ {'EXIT', _Reason} ->
+ transform(gr_manager:wait_for_pid(Server));
+ Else -> Else
+ end.
%%--------------------------------------------------------------------
%% @doc
@@ -99,30 +127,39 @@ init([]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_call(list, _From, State) ->
+handle_call(Call, From, State) when is_atom(Call), Call =:= list;
+ Call =:= info; Call =:= info_size;
+ Call =:= transform ->
TableId = State#state.table_id,
- {reply, ets:tab2list(TableId), State};
-handle_call(size, _From, State) ->
- TableId = State#state.table_id,
- {reply, ets:info(TableId, size), State};
-handle_call({insert, Data}, _From, State) ->
- TableId = State#state.table_id,
- {reply, ets:insert(TableId, Data), State};
-handle_call({lookup, Term}, _From, State) ->
- TableId = State#state.table_id,
- {reply, ets:lookup(TableId, Term), State};
-handle_call({lookup_element, Term}, _From, State) ->
- TableId = State#state.table_id,
- {reply, ets:lookup_element(TableId, Term, 2), State};
-handle_call(info, _From, State) ->
- TableId = State#state.table_id,
- {reply, ets:info(TableId), State};
-handle_call(transform, _From, State) ->
+ Waiting = State#state.waiting,
+ case TableId of
+ undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
+ _ when Call =:= list ->
+ {reply, handle_list(TableId), State};
+ _ when Call =:= info ->
+ {reply, handle_info(TableId), State};
+ _ when Call =:= info_size ->
+ {reply, handle_info_size(TableId), State};
+ _ when Call =:= transform ->
+ {reply, handle_transform(TableId), State}
+ end;
+
+handle_call({Call, Term}, From, State) when is_atom(Call), Call =:= insert;
+ Call =:= lookup;
+ Call =:= lookup_element ->
TableId = State#state.table_id,
- ParamsList = [{K, V} || {V, K} <- ets:tab2list(TableId)],
- ets:delete_all_objects(TableId),
- ets:insert(TableId, ParamsList),
- {reply, ok, State};
+ Waiting = State#state.waiting,
+ case TableId of
+ undefined ->
+ {noreply, State#state{waiting=[{{Call, Term}, From}|Waiting]}};
+ _ when Call =:= insert ->
+ {reply, handle_insert(TableId, Term), State};
+ _ when Call =:= lookup ->
+ {reply, handle_lookup(TableId, Term), State};
+ _ when Call =:= lookup_element ->
+ {reply, handle_lookup_element(TableId, Term), State}
+ end;
+
handle_call(_Request, _From, State) ->
Reply = {error, unhandled_message},
{reply, Reply, State}.
@@ -151,10 +188,14 @@ handle_cast(_Msg, State) ->
%% @end
%%--------------------------------------------------------------------
handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) ->
- {noreply, State#state{table_id=TableId}};
+ [ gen_server:reply(From, perform_call(TableId, Call))
+ || {Call, From} <- State#state.waiting ],
+ {noreply, State#state{table_id=TableId, waiting=[]}};
handle_info(_Info, State) ->
{noreply, State}.
+
+
%%--------------------------------------------------------------------
%% @private
%% @doc
@@ -184,4 +225,45 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================
+perform_call(TableId, Call) ->
+ case Call of
+ list ->
+ handle_list(TableId);
+ info ->
+ handle_info(TableId);
+ info_size ->
+ handle_info_size(TableId);
+ transform ->
+ handle_transform(TableId);
+ {insert, Term} ->
+ handle_insert(TableId, Term);
+ {lookup, Term} ->
+ handle_lookup(TableId, Term);
+ {lookup_element, Term} ->
+ handle_lookup_element(TableId, Term)
+ end.
+
+
+handle_list(TableId) ->
+ ets:tab2list(TableId).
+
+handle_info(TableId) ->
+ ets:info(TableId).
+
+handle_info_size(TableId) ->
+ ets:info(TableId, size).
+
+handle_transform(TableId) ->
+ ParamsList = [{K, V} || {V, K} <- ets:tab2list(TableId)],
+ ets:delete_all_objects(TableId),
+ ets:insert(TableId, ParamsList).
+
+handle_insert(TableId, Term) ->
+ ets:insert(TableId, Term).
+
+handle_lookup(TableId, Term) ->
+ ets:lookup(TableId, Term).
+
+handle_lookup_element(TableId, Term) ->
+ ets:lookup_element(TableId, Term, 2).