You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2009/04/05 22:34:35 UTC

svn commit: r762153 - in /couchdb/trunk: etc/couchdb/default.ini.tpl.in src/couchdb/Makefile.am src/couchdb/couch_httpd_db.erl src/couchdb/couch_view.erl src/couchdb/couch_view_compactor.erl src/couchdb/couch_view_group.erl

Author: kocolosk
Date: Sun Apr  5 20:34:35 2009
New Revision: 762153

URL: http://svn.apache.org/viewvc?rev=762153&view=rev
Log:
added compaction for view indexes.  See COUCHDB-92

No tests or Futon interface for this feature yet.

Added:
    couchdb/trunk/src/couchdb/couch_view_compactor.erl
Modified:
    couchdb/trunk/etc/couchdb/default.ini.tpl.in
    couchdb/trunk/src/couchdb/Makefile.am
    couchdb/trunk/src/couchdb/couch_httpd_db.erl
    couchdb/trunk/src/couchdb/couch_view.erl
    couchdb/trunk/src/couchdb/couch_view_group.erl

Modified: couchdb/trunk/etc/couchdb/default.ini.tpl.in
URL: http://svn.apache.org/viewvc/couchdb/trunk/etc/couchdb/default.ini.tpl.in?rev=762153&r1=762152&r2=762153&view=diff
==============================================================================
--- couchdb/trunk/etc/couchdb/default.ini.tpl.in (original)
+++ couchdb/trunk/etc/couchdb/default.ini.tpl.in Sun Apr  5 20:34:35 2009
@@ -52,6 +52,7 @@
 _stats = {couch_httpd_stats_handlers, handle_stats_req}
 
 [httpd_db_handlers]
+_compact = {couch_httpd_db, handle_compact_req}
 _design = {couch_httpd_db, handle_design_req}
 _temp_view = {couch_httpd_view, handle_temp_view_req}
 

Modified: couchdb/trunk/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/Makefile.am?rev=762153&r1=762152&r2=762153&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/Makefile.am (original)
+++ couchdb/trunk/src/couchdb/Makefile.am Sun Apr  5 20:34:35 2009
@@ -73,6 +73,7 @@
     couch_task_status.erl \
     couch_util.erl \
     couch_view.erl \
+    couch_view_compactor.erl \
     couch_view_updater.erl \
     couch_view_group.erl \
     couch_db_updater.erl
@@ -114,6 +115,7 @@
     couch_task_status.beam \
     couch_util.beam \
     couch_view.beam \
+    couch_view_compactor.beam \
     couch_view_updater.beam \
     couch_view_group.beam \
     couch_db_updater.beam

Modified: couchdb/trunk/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_db.erl?rev=762153&r1=762152&r2=762153&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_db.erl Sun Apr  5 20:34:35 2009
@@ -13,7 +13,8 @@
 -module(couch_httpd_db).
 -include("couch_db.hrl").
 
--export([handle_request/1, handle_design_req/2, db_req/2, couch_doc_open/4]).
+-export([handle_request/1, handle_compact_req/2, handle_design_req/2, 
+    db_req/2, couch_doc_open/4]).
 
 -import(couch_httpd,
     [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2,
@@ -41,6 +42,17 @@
         do_db_req(Req, Handler)
     end.
 
+handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, _Db) ->
+    ok = couch_view_compactor:start_compact(DbName, Id),
+    send_json(Req, 202, {[{ok, true}]});
+
+handle_compact_req(#httpd{method='POST'}=Req, Db) ->
+    ok = couch_db:start_compact(Db),
+    send_json(Req, 202, {[{ok, true}]});
+
+handle_compact_req(Req, _Db) ->
+    send_method_not_allowed(Req, "POST").
+
 handle_design_req(#httpd{
         path_parts=[_DbName,_Design,_DesName, <<"_",_/binary>> = Action | _Rest],
         design_url_handlers = DesignUrlHandlers
@@ -189,13 +201,6 @@
 db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) ->
     send_method_not_allowed(Req, "POST");
 
-db_req(#httpd{method='POST',path_parts=[_,<<"_compact">>]}=Req, Db) ->
-    ok = couch_db:start_compact(Db),
-    send_json(Req, 202, {[{ok, true}]});
-
-db_req(#httpd{path_parts=[_,<<"_compact">>]}=Req, _Db) ->
-    send_method_not_allowed(Req, "POST");
-
 db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) ->
     {IdsRevs} = couch_httpd:json_body(Req),
     IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],

Modified: couchdb/trunk/src/couchdb/couch_view.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view.erl?rev=762153&r1=762152&r2=762153&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view.erl Sun Apr  5 20:34:35 2009
@@ -17,7 +17,7 @@
     detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,
     code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4,
     get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/7,
-    extract_map_view/1]).
+    extract_map_view/1,get_group_server/2]).
 
 -include("couch_db.hrl").
 

Added: couchdb/trunk/src/couchdb/couch_view_compactor.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_compactor.erl?rev=762153&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_compactor.erl (added)
+++ couchdb/trunk/src/couchdb/couch_view_compactor.erl Sun Apr  5 20:34:35 2009
@@ -0,0 +1,98 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_view_compactor).
+
+-include ("couch_db.hrl").
+
+-export([start_compact/2]).
+
+%% @spec start_compact(DbName::binary(), GroupId:binary()) -> ok
+%% @doc Compacts the views.  GroupId must not include the _design/ prefix
+start_compact(DbName, GroupId) ->
+    Pid = couch_view:get_group_server(DbName, <<"_design/",GroupId/binary>>),
+    gen_server:cast(Pid, {start_compact, fun compact_group/2}).
+
+%%=============================================================================
+%% internal functions
+%%=============================================================================
+
+%% @spec compact_group(Group, NewGroup) -> ok
+compact_group(Group, EmptyGroup) ->
+    #group{
+        current_seq = Seq,
+        id_btree = IdBtree,
+        name = GroupId,
+        views = Views
+    } = Group,
+    
+    #group{
+        db = Db,
+        id_btree = EmptyIdBtree,
+        views = EmptyViews
+    } = EmptyGroup,
+    
+    {ok, {Count, _}} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree),
+    
+    <<"_design", ShortName/binary>> = GroupId,
+    DbName = couch_db:name(Db),
+    TaskName = <<DbName/binary, ShortName/binary>>,
+    couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>),
+    
+    Fun = fun(KV, {Bt, Acc, TotalCopied}) ->
+        if TotalCopied rem 10000 == 0 ->
+            couch_task_status:update("Copied ~p of ~p Ids (~p%)",
+                [TotalCopied, Count, (TotalCopied*100) div Count]),
+            {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])),
+            {ok, {Bt2, [], TotalCopied+1}};
+        true ->
+            {ok, {Bt, [KV|Acc], TotalCopied+1}}
+        end
+    end,
+    {ok, {Bt3, Uncopied, _Total}} = couch_btree:foldl(IdBtree, Fun, 
+        {EmptyIdBtree, [], 0}),
+    {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
+    
+    NewViews = lists:map(fun({View, EmptyView}) ->
+        compact_view(View, EmptyView)
+    end, lists:zip(Views, EmptyViews)),
+    
+    NewGroup = EmptyGroup#group{
+        id_btree=NewIdBtree, 
+        views=NewViews, 
+        current_seq=Seq
+    },
+    
+    Pid = couch_view:get_group_server(DbName, GroupId),
+    gen_server:cast(Pid, {compact_done, NewGroup}).
+
+%% @spec compact_view(View, EmptyView, Retry) -> CompactView
+compact_view(View, EmptyView) ->
+    {ok, Count} = couch_view:get_row_count(View),
+    
+    %% Key is {Key,DocId}
+    Fun = fun(KV, {Bt, Acc, TotalCopied}) ->
+        if TotalCopied rem 10000 == 0 ->
+            couch_task_status:update("View #~p: copied ~p of ~p KVs (~p%)",
+                [View#view.id_num, TotalCopied, Count, (TotalCopied*100) div Count]),
+            {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])),
+            {ok, {Bt2, [], TotalCopied + 1}};
+        true ->    
+            {ok, {Bt, [KV|Acc], TotalCopied + 1}}
+        end
+    end,
+    
+    {ok, {Bt3, Uncopied, _Total}} = couch_btree:foldl(View#view.btree, Fun, 
+        {EmptyView#view.btree, [], 0}),
+    {ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
+    EmptyView#view{btree = NewBt}.
+

Modified: couchdb/trunk/src/couchdb/couch_view_group.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_group.erl?rev=762153&r1=762152&r2=762153&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_group.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view_group.erl Sun Apr  5 20:34:35 2009
@@ -29,15 +29,18 @@
     init_args,
     group,
     updater_pid=nil,
+    compactor_pid=nil,
     waiting_commit=false,
-    waiting_list=[]
+    waiting_list=[],
+    ref_counter=nil
 }).
 
 % api methods
 request_group(Pid, Seq) ->
     ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),
     case gen_server:call(Pid, {request_group, Seq}, infinity) of
-    {ok, Group} ->
+    {ok, Group, RefCounter} ->
+        couch_ref_counter:add(RefCounter),
         {ok, Group};
     Else ->
         ?LOG_DEBUG("get_updated_group replied with _Else ~p", [Else]),
@@ -70,14 +73,16 @@
 init({InitArgs, ReturnPid, Ref}) ->
     process_flag(trap_exit, true),
     case prepare_group(InitArgs, false) of
-    {ok, #group{db=Db}=Group} ->
+    {ok, #group{db=Db, fd=Fd}=Group} ->
         couch_db:monitor(Db),
         Pid = spawn_link(fun()-> couch_view_updater:update(Group) end),
+        {ok, RefCounter} = couch_ref_counter:start([Fd]),
         {ok, #group_state{
                 db_name=couch_db:name(Db),
                 init_args=InitArgs,
                 updater_pid = Pid,
-                group=Group}};
+                group=Group,
+                ref_counter=RefCounter}};
     Error ->
         ReturnPid ! {Ref, self(), Error},
         ignore
@@ -120,10 +125,11 @@
 
 % If the request seqence is less than or equal to the seq_id of a known Group,
 % we respond with that Group.
-handle_call({request_group, RequestSeq}, _From, 
-        #group_state{group=#group{current_seq=GroupSeq}=Group}=State)
-        when RequestSeq =< GroupSeq  ->
-    {reply, {ok, Group}, State};
+handle_call({request_group, RequestSeq}, _From, #group_state{
+            group = #group{current_seq=GroupSeq} = Group,
+            ref_counter = RefCounter
+        } = State) when RequestSeq =< GroupSeq  ->
+    {reply, {ok, Group, RefCounter}, State};
 
 % Otherwise: TargetSeq => RequestSeq > GroupSeq
 % We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq
@@ -134,9 +140,63 @@
         }, infinity}.
 
 
-handle_cast(foo, State) ->
-    {ok, State}.
+handle_cast({start_compact, CompactFun}, #group_state{ compactor_pid=nil, 
+        group=Group, init_args={view, RootDir, DbName, GroupId} } = State) ->
+    ?LOG_INFO("Starting view group compaction", []),
+    {ok, Db} = couch_db:open(DbName, []),
+    {ok, Fd} = open_index_file(RootDir, DbName, <<GroupId/binary,".compact">>),
+    NewGroup = reset_file(Db, Fd, DbName, Group),
+    Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
+    {noreply, State#group_state{compactor_pid = Pid}};
+handle_cast({start_compact, _}, State) ->
+    %% compact already running, this is a no-op
+    {noreply, State};
 
+handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, 
+        #group_state{ 
+            group = #group{current_seq=OldSeq} = Group,
+            init_args = {view, RootDir, DbName, GroupId}, 
+            updater_pid = nil,
+            ref_counter = RefCounter
+        } = State) when NewSeq >= OldSeq ->
+    ?LOG_INFO("View Group compaction complete", []),
+    BaseName = RootDir ++ "/." ++ ?b2l(DbName) ++ ?b2l(GroupId),
+    FileName = BaseName ++ ".view",
+    CompactName = BaseName ++".compact.view",
+    file:delete(FileName),
+    ok = file:rename(CompactName, FileName),
+    
+    %% cleanup old group
+    couch_ref_counter:drop(RefCounter),
+    {ok, NewRefCounter} = couch_ref_counter:start([NewFd]),
+    case Group#group.db of
+        nil -> ok;
+        Else -> couch_db:close(Else)
+    end,
+    
+    erlang:send_after(1000, self(), delayed_commit),
+    {noreply, State#group_state{
+        group=NewGroup, 
+        ref_counter=NewRefCounter,
+        compactor_pid=nil
+    }};
+handle_cast({compact_done, NewGroup}, #group_state{ 
+        init_args={view, _RootDir, DbName, GroupId} } = State) ->
+    ?LOG_INFO("View index compaction still behind main file", []),
+    couch_db:close(NewGroup#group.db),
+    {ok, Db} = couch_db:open(DbName, []),
+    Pid = spawn_link(fun() -> 
+        {_,Ref} = erlang:spawn_monitor(fun() -> 
+            couch_view_updater:update(NewGroup#group{db = Db})
+        end),
+        receive
+            {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
+                #group{name=GroupId} = NewGroup2,
+                Pid2 = couch_view:get_group_server(DbName, GroupId),
+                gen_server:cast(Pid2, {compact_done, NewGroup2})
+        end
+    end),
+    {noreply, State#group_state{compactor_pid = Pid}}.
 
 handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
     {ok, Db} = couch_db:open(DbName, []),
@@ -160,6 +220,7 @@
 handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
         #group_state{db_name=DbName,
             updater_pid=UpPid,
+            ref_counter=RefCounter,
             waiting_list=WaitList,
             waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
     ok = couch_db:close(Db),
@@ -168,7 +229,7 @@
         erlang:send_after(1000, self(), delayed_commit);
     true -> ok
     end,
-    case reply_with_group(Group, WaitList, []) of
+    case reply_with_group(Group, WaitList, [], RefCounter) of
     [] ->
         {noreply, State#group_state{waiting_commit=true, waiting_list=[],
                 group=Group#group{db=nil}, updater_pid=nil}};
@@ -221,17 +282,18 @@
 % reply_with_group/3
 % for each item in the WaitingList {Pid, Seq}
 % if the Seq is =< GroupSeq, reply
-reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], StillWaiting) when Seq =< GroupSeq ->
-    gen_server:reply(Pid, {ok, Group}),
-    reply_with_group(Group, WaitList, StillWaiting);
+reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], 
+        StillWaiting, RefCounter) when Seq =< GroupSeq ->
+    gen_server:reply(Pid, {ok, Group, RefCounter}),
+    reply_with_group(Group, WaitList, StillWaiting, RefCounter);
 
 % else
 % put it in the continuing waiting list    
-reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting) ->
-    reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting]);
+reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting, RefCounter) ->
+    reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting], RefCounter);
 
 % return the still waiting list
-reply_with_group(_Group, [], StillWaiting) ->
+reply_with_group(_Group, [], StillWaiting, _RefCounter) ->
     StillWaiting.
 
 reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->