You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by be...@apache.org on 2014/02/13 17:35:52 UTC

[19/23] goldrush commit: updated refs/heads/import-master to 71e6321

Make data recovery impregnable


Project: http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/commit/0f1f848b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/tree/0f1f848b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/diff/0f1f848b

Branch: refs/heads/import-master
Commit: 0f1f848bd96f4d580358ef7c2365f8bb957b8ee0
Parents: 1b77f97
Author: Pedram Nimreezi <de...@deadzen.com>
Authored: Fri Nov 8 01:46:11 2013 -0500
Committer: Pedram Nimreezi <de...@deadzen.com>
Committed: Fri Nov 8 01:46:11 2013 -0500

----------------------------------------------------------------------
 src/glc.erl        |  20 +++++++
 src/glc_code.erl   |   4 +-
 src/gr_counter.erl | 109 +++++++++++++++++++++++++++-------
 src/gr_manager.erl |   6 +-
 src/gr_param.erl   | 152 +++++++++++++++++++++++++++++++++++++-----------
 5 files changed, 230 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/blob/0f1f848b/src/glc.erl
----------------------------------------------------------------------
diff --git a/src/glc.erl b/src/glc.erl
index 95e3ebf..e99c861 100644
--- a/src/glc.erl
+++ b/src/glc.erl
@@ -496,6 +496,26 @@ events_test_() ->
                     ?assertEqual(0, Mod:info(filter)),
                     ?assertEqual(0, Mod:info(output))
                 end
+            },
+            {"ets data recovery test",
+                fun() ->
+                    Self = self(),
+                    {compiled, Mod} = setup_query(testmod15,
+                        glc:with(glc:eq(a, 1), fun(Event) -> Self ! gre:fetch(a, Event) end)),
+                    glc:handle(Mod, gre:make([{a,1}], [list])),
+                    ?assertEqual(1, Mod:info(output)),
+                    ?assertEqual(1, receive Msg -> Msg after 0 -> notcalled end),
+                    ?assertEqual(1, length(gr_param:list(Mod:table(params)))),
+                    ?assertEqual(3, length(gr_param:list(Mod:table(counters)))),
+                    true = exit(whereis(Mod:table(params)), kill),
+                    true = exit(whereis(Mod:table(counters)), kill),
+                    ?assertEqual(1, Mod:info(input)),
+                    glc:handle(Mod, gre:make([{'a', 1}], [list])),
+                    ?assertEqual(2, Mod:info(input)),
+                    ?assertEqual(2, Mod:info(output)),
+                    ?assertEqual(1, length(gr_param:list(Mod:table(params)))),
+                    ?assertEqual(3, length(gr_counter:list(Mod:table(counters))))
+                end
             }
         ]
     }.

http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/blob/0f1f848b/src/glc_code.erl
----------------------------------------------------------------------
diff --git a/src/glc_code.erl b/src/glc_code.erl
index e3d69fa..be75b9f 100644
--- a/src/glc_code.erl
+++ b/src/glc_code.erl
@@ -287,7 +287,7 @@ abstract_getparam_(Term, OnBound, #state{paramstab=ParamsTable,
         [{_, Key2}] ->
             Key2;
         [] ->
-            Key2 = gr_param:size(ParamsTable),
+            Key2 = gr_param:info_size(ParamsTable),
             gr_param:insert(ParamsTable, {Term, Key2}),
             Key2
     end,
@@ -338,7 +338,7 @@ param_variable(Key) ->
 %% @todo Pass state record. Only Generate code if `statistics' is enabled.
 -spec abstract_count(atom()) -> syntaxTree().
 abstract_count(Counter) ->
-    abstract_apply(gr_counter, update,
+    abstract_apply(gr_counter, update_counter,
         [abstract_apply(table, [?erl:atom(counters)]),
          ?erl:abstract(Counter),
          ?erl:abstract({2,1})]).

http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/blob/0f1f848b/src/gr_counter.erl
----------------------------------------------------------------------
diff --git a/src/gr_counter.erl b/src/gr_counter.erl
index 82d99e8..60662b9 100644
--- a/src/gr_counter.erl
+++ b/src/gr_counter.erl
@@ -19,7 +19,7 @@
 %% API
 -export([start_link/1, 
          list/1, lookup_element/2,
-         update/3, reset_counters/2]).
+         update_counter/3, reset_counters/2]).
 
 %% gen_server callbacks
 -export([init/1,
@@ -29,22 +29,47 @@
          terminate/2,
          code_change/3]).
 
--record(state, {init=true, table_id}).
+-record(state, {table_id, waiting=[]}).
 
 %%%===================================================================
 %%% API
 %%%===================================================================
 list(Server) ->
-    gen_server:call(Server, list).
-
-lookup_element(Server, Counter) ->
-    gen_server:call(Server, {lookup_element, Counter}).
-
-update(Server, Counter, Value) ->
+    case (catch gen_server:call(Server, list)) of
+        {'EXIT', _Reason} ->
+            list(gr_manager:wait_for_pid(Server));
+        Else -> Else
+    end.
+
+lookup_element(Server, Term) ->
+    case (catch gen_server:call(Server, {lookup_element, Term})) of
+        {'EXIT', _Reason} ->
+            lookup_element(gr_manager:wait_for_pid(Server), Term);
+        Else -> Else
+    end.
+
+update_counter(Server, Counter, Value) when is_atom(Server) ->
+    case whereis(Server) of
+        undefined -> 
+            update_counter(gr_manager:wait_for_pid(Server), Counter, Value);
+        Pid -> 
+            case erlang:is_process_alive(Pid) of
+                true ->
+                    update_counter(Pid, Counter, Value);
+                false ->
+                    ServerPid = gr_manager:wait_for_pid(Server),
+                    update_counter(ServerPid, Counter, Value)
+            end
+    end;
+update_counter(Server, Counter, Value) when is_pid(Server) ->
     gen_server:cast(Server, {update, Counter, Value}).
 
 reset_counters(Server, Counter) ->
-    gen_server:call(Server, {reset_counters, Counter}).
+    case (catch gen_server:call(Server, {reset_counters, Counter})) of
+        {'EXIT', _Reason} ->
+            reset_counters(gr_manager:wait_for_pid(Server), Counter);
+        Else -> Else
+    end.
 
 %%--------------------------------------------------------------------
 %% @doc
@@ -88,21 +113,34 @@ init([]) ->
 %%                                   {stop, Reason, State}
 %% @end
 %%--------------------------------------------------------------------
-handle_call(list, _From, State) ->
+handle_call(list=Call, From, State) ->
     TableId = State#state.table_id,
-    {reply, {ok, ets:tab2list(TableId)}, State};
-handle_call({lookup_element, Counter}, _From, State) ->
+    Waiting = State#state.waiting,
+    case TableId of
+        undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
+        _ -> {reply, handle_list(TableId), State}
+    end;
+handle_call({lookup_element, Term}=Call, From, State) ->
     TableId = State#state.table_id,
-    {reply, ets:lookup_element(TableId, Counter, 2), State};
-handle_call({reset_counters, Counter}, _From, State) ->
-    TableId = State#state.table_id,
-    Reset = case Counter of
+    Waiting = State#state.waiting,
+    case TableId of
+        undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
+        _ -> {reply, handle_lookup_element(TableId, Term), State}
+    end;
+handle_call({reset_counters, Counter}, From, State) ->
+    Term = case Counter of
         _ when is_list(Counter) -> 
             [{Item, 0} || Item <- Counter];
         _ when is_atom(Counter) -> 
             [{Counter, 0}]
     end,
-    {reply, ets:insert(TableId, Reset), State};
+    Call = {insert, Term},
+    TableId = State#state.table_id,
+    Waiting = State#state.waiting,
+    case TableId of
+        undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
+        _ -> {reply, handle_insert(TableId, Term), State}
+    end;
 handle_call(_Request, _From, State) ->
     Reply = {error, unhandled_message},
     {reply, Reply, State}.
@@ -117,10 +155,15 @@ handle_call(_Request, _From, State) ->
 %%                                  {stop, Reason, State}
 %% @end
 %%--------------------------------------------------------------------
-handle_cast({update, Counter, Value}, State) ->
+handle_cast({update, Counter, Value}=Call, State) ->
     TableId = State#state.table_id,
-    ets:update_counter(TableId, Counter, Value),
-    {noreply, State};
+    Waiting = State#state.waiting,
+    State2 = case TableId of
+        undefined -> State#state{waiting=[Call|Waiting]};
+        _ -> handle_update_counter(TableId, Counter, Value), 
+             State
+    end,
+    {noreply, State2};
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
@@ -135,7 +178,11 @@ handle_cast(_Msg, State) ->
 %% @end
 %%--------------------------------------------------------------------
 handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) ->
-    {noreply, State#state{table_id=TableId}};
+    [ gen_server:reply(From, perform_call(TableId, Call)) 
+      || {Call, From} <- State#state.waiting ],
+    [ handle_update_counter(TableId, Counter, Value) 
+      || {update, Counter, Value} <- State#state.waiting ],
+    {noreply, State#state{table_id=TableId, waiting=[]}};
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -168,4 +215,24 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%%===================================================================
 
+perform_call(TableId, Call) ->
+    case Call of
+        list ->
+            handle_list(TableId);
+        {insert, Term} ->
+            handle_insert(TableId, Term);
+        {lookup_element, Term} ->
+            handle_lookup_element(TableId, Term)
+    end.
+
+handle_list(TableId) ->
+    ets:tab2list(TableId).
+
+handle_update_counter(TableId, Counter, Value) ->
+    ets:update_counter(TableId, Counter, Value).
+
+handle_insert(TableId, Term) ->
+    ets:insert(TableId, Term).
 
+handle_lookup_element(TableId, Term) ->
+    ets:lookup_element(TableId, Term, 2).

http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/blob/0f1f848b/src/gr_manager.erl
----------------------------------------------------------------------
diff --git a/src/gr_manager.erl b/src/gr_manager.erl
index c64f74e..93bf735 100644
--- a/src/gr_manager.erl
+++ b/src/gr_manager.erl
@@ -17,7 +17,7 @@
 -behaviour(gen_server).
 
 %% API
--export([start_link/3]).
+-export([start_link/3, wait_for_pid/1]).
 
 %% gen_server callbacks
 -export([init/1,
@@ -125,12 +125,12 @@ handle_info({'ETS-TRANSFER', TableId, _Pid, Data}, State = #state{managee=Manage
     ets:give_away(TableId, ManageePid, Data),
     {noreply, State#state{table_id=TableId}}.
 
-wait_for_pid(Managee) -> 
+wait_for_pid(Managee) when is_atom(Managee), Managee =/= undefined -> 
     case whereis(Managee) of
         undefined -> 
             timer:sleep(1),
             wait_for_pid(Managee);
-        Pid -> Pid
+        ManageePid -> ManageePid
     end.
 
 

http://git-wip-us.apache.org/repos/asf/couchdb-goldrush/blob/0f1f848b/src/gr_param.erl
----------------------------------------------------------------------
diff --git a/src/gr_param.erl b/src/gr_param.erl
index e7ac2a2..6fc9ec4 100644
--- a/src/gr_param.erl
+++ b/src/gr_param.erl
@@ -18,9 +18,9 @@
 
 %% API
 -export([start_link/1, 
-         list/1, size/1, insert/2, 
+         list/1, insert/2, 
          lookup/2, lookup_element/2,
-         info/1, transform/1]).
+         info/1, info_size/1, transform/1]).
 
 %% gen_server callbacks
 -export([init/1,
@@ -30,32 +30,60 @@
          terminate/2,
          code_change/3]).
 
--record(state, {init=true, table_id}).
+-record(state, {table_id, waiting=[]}).
 
 %%%===================================================================
 %%% API
 %%%===================================================================
 list(Server) ->
-    gen_server:call(Server, list).
+    case (catch gen_server:call(Server, list)) of
+        {'EXIT', _Reason} ->
+            list(gr_manager:wait_for_pid(Server));
+        Else -> Else
+    end.
 
-size(Server) ->
-    gen_server:call(Server, size).
+info_size(Server) ->
+    case (catch gen_server:call(Server, info_size)) of
+        {'EXIT', _Reason} ->
+            info_size(gr_manager:wait_for_pid(Server));
+        Else -> Else
+    end.
 
-insert(Server, Data) ->
-    gen_server:call(Server, {insert, Data}).
+insert(Server, Term) ->
+    case (catch gen_server:call(Server, {insert, Term})) of
+        {'EXIT', _Reason} ->
+            insert(gr_manager:wait_for_pid(Server), Term);
+        Else -> Else
+    end.
 
 lookup(Server, Term) ->
-    gen_server:call(Server, {lookup, Term}).
+    case (catch gen_server:call(Server, {lookup, Term})) of
+        {'EXIT', _Reason} ->
+            lookup(gr_manager:wait_for_pid(Server), Term);
+        Else -> Else
+    end.
 
 lookup_element(Server, Term) ->
-    gen_server:call(Server, {lookup_element, Term}).
+    case (catch gen_server:call(Server, {lookup_element, Term})) of
+        {'EXIT', _Reason} ->
+            lookup_element(gr_manager:wait_for_pid(Server), Term);
+        Else -> Else
+    end.
 
 info(Server) ->
-    gen_server:call(Server, info).
+    case (catch gen_server:call(Server, info)) of
+        {'EXIT', _Reason} ->
+            info(gr_manager:wait_for_pid(Server));
+        Else -> Else
+    end.
 
 %% @doc Transform Term -> Key to Key -> Term
 transform(Server) ->
-    gen_server:call(Server, transform).
+    case (catch gen_server:call(Server, transform)) of
+        {'EXIT', _Reason} ->
+            transform(gr_manager:wait_for_pid(Server));
+        Else -> Else
+    end.
 
 %%--------------------------------------------------------------------
 %% @doc
@@ -99,30 +127,39 @@ init([]) ->
 %%                                   {stop, Reason, State}
 %% @end
 %%--------------------------------------------------------------------
-handle_call(list, _From, State) ->
+handle_call(Call, From, State) when is_atom(Call), Call =:= list; 
+                                     Call =:= info; Call =:= info_size;
+                                     Call =:= transform ->
     TableId = State#state.table_id,
-    {reply, ets:tab2list(TableId), State};
-handle_call(size, _From, State) ->
-    TableId = State#state.table_id,
-    {reply, ets:info(TableId, size), State};
-handle_call({insert, Data}, _From, State) ->
-    TableId = State#state.table_id,
-    {reply, ets:insert(TableId, Data), State};
-handle_call({lookup, Term}, _From, State) ->
-    TableId = State#state.table_id,
-    {reply, ets:lookup(TableId, Term), State};
-handle_call({lookup_element, Term}, _From, State) ->
-    TableId = State#state.table_id,
-    {reply, ets:lookup_element(TableId, Term, 2), State};
-handle_call(info, _From, State) ->
-    TableId = State#state.table_id,
-    {reply, ets:info(TableId), State};
-handle_call(transform, _From, State) ->
+    Waiting = State#state.waiting,
+    case TableId of
+        undefined -> {noreply, State#state{waiting=[{Call, From}|Waiting]}};
+        _ when Call =:= list -> 
+            {reply, handle_list(TableId), State};
+        _ when Call =:= info -> 
+            {reply, handle_info(TableId), State};
+        _ when Call =:= info_size -> 
+            {reply, handle_info_size(TableId), State};
+        _ when Call =:= transform -> 
+            {reply, handle_transform(TableId), State}
+    end;
+
+handle_call({Call, Term}, From, State) when is_atom(Call), Call =:= insert; 
+                                              Call =:= lookup; 
+                                              Call =:= lookup_element ->
     TableId = State#state.table_id,
-    ParamsList = [{K, V} || {V, K} <- ets:tab2list(TableId)],
-    ets:delete_all_objects(TableId),
-    ets:insert(TableId, ParamsList),
-    {reply, ok, State};
+    Waiting = State#state.waiting,
+    case TableId of
+        undefined -> 
+            {noreply, State#state{waiting=[{{Call, Term}, From}|Waiting]}};
+        _ when Call =:= insert -> 
+            {reply, handle_insert(TableId, Term), State};
+        _ when Call =:= lookup -> 
+            {reply, handle_lookup(TableId, Term), State};
+        _ when Call =:= lookup_element -> 
+            {reply, handle_lookup_element(TableId, Term), State}
+    end;
+
 handle_call(_Request, _From, State) ->
     Reply = {error, unhandled_message},
     {reply, Reply, State}.
@@ -151,10 +188,14 @@ handle_cast(_Msg, State) ->
 %% @end
 %%--------------------------------------------------------------------
 handle_info({'ETS-TRANSFER', TableId, _Pid, _Data}, State) ->
-    {noreply, State#state{table_id=TableId}};
+    [ gen_server:reply(From, perform_call(TableId, Call)) 
+      || {Call, From} <- State#state.waiting ],
+    {noreply, State#state{table_id=TableId, waiting=[]}};
 handle_info(_Info, State) ->
     {noreply, State}.
 
+
+
 %%--------------------------------------------------------------------
 %% @private
 %% @doc
@@ -184,4 +225,45 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%%===================================================================
 
+perform_call(TableId, Call) ->
+    case Call of
+        list ->
+            handle_list(TableId);
+        info ->
+            handle_info(TableId);
+        info_size ->
+            handle_info_size(TableId);
+        transform ->
+            handle_transform(TableId);
+        {insert, Term} ->
+            handle_insert(TableId, Term);
+        {lookup, Term} ->
+            handle_lookup(TableId, Term);
+        {lookup_element, Term} ->
+            handle_lookup_element(TableId, Term)
+    end.
+
+
+handle_list(TableId) ->
+    ets:tab2list(TableId).
+
+handle_info(TableId) ->
+    ets:info(TableId).
+
+handle_info_size(TableId) ->
+    ets:info(TableId, size).
+
+handle_transform(TableId) ->
+    ParamsList = [{K, V} || {V, K} <- ets:tab2list(TableId)],
+    ets:delete_all_objects(TableId),
+    ets:insert(TableId, ParamsList).
+
+handle_insert(TableId, Term) ->
+    ets:insert(TableId, Term).
+
+handle_lookup(TableId, Term) ->
+    ets:lookup(TableId, Term).
+
+handle_lookup_element(TableId, Term) ->
+    ets:lookup_element(TableId, Term, 2).