You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/08/21 22:23:06 UTC
svn commit: r987824 - in /couchdb/trunk/src/couchdb: couch_rep_writer.erl
couch_view_updater.erl couch_work_queue.erl
Author: fdmanana
Date: Sat Aug 21 20:23:06 2010
New Revision: 987824
URL: http://svn.apache.org/viewvc?rev=987824&view=rev
Log:
Small refactoring of the work queue module to accomodate for incoming options (multiple workers)
Modified:
couchdb/trunk/src/couchdb/couch_rep_writer.erl
couchdb/trunk/src/couchdb/couch_view_updater.erl
couchdb/trunk/src/couchdb/couch_work_queue.erl
Modified: couchdb/trunk/src/couchdb/couch_rep_writer.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_writer.erl?rev=987824&r1=987823&r2=987824&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_rep_writer.erl (original)
+++ couchdb/trunk/src/couchdb/couch_rep_writer.erl Sat Aug 21 20:23:06 2010
@@ -147,7 +147,8 @@ streamer_fun(Boundary, JsonBytes, Atts)
{start, From} ->
% better use a brand new queue, to ensure there's no garbage from
% a previous (failed) iteration
- {ok, DataQueue} = couch_work_queue:new(1024 * 1024, 1000),
+ {ok, DataQueue} = couch_work_queue:new(
+ [{max_size, 1024 * 1024}, {max_items, 1000}]),
From ! {queue, DataQueue},
couch_doc:doc_to_multi_part_stream(
Boundary,
Modified: couchdb/trunk/src/couchdb/couch_view_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_updater.erl?rev=987824&r1=987823&r2=987824&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_updater.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view_updater.erl Sat Aug 21 20:23:06 2010
@@ -38,8 +38,10 @@ update(Owner, Group) ->
couch_task_status:update(<<"Resetting view index due to lost purge entries.">>),
exit(reset)
end,
- {ok, MapQueue} = couch_work_queue:new(100000, 500),
- {ok, WriteQueue} = couch_work_queue:new(100000, 500),
+ {ok, MapQueue} = couch_work_queue:new(
+ [{max_size, 100000}, {max_items, 500}]),
+ {ok, WriteQueue} = couch_work_queue:new(
+ [{max_size, 100000}, {max_items, 500}]),
Self = self(),
ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
spawn_link(fun() -> do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) end),
Modified: couchdb/trunk/src/couchdb/couch_work_queue.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_work_queue.erl?rev=987824&r1=987823&r2=987824&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_work_queue.erl (original)
+++ couchdb/trunk/src/couchdb/couch_work_queue.erl Sat Aug 21 20:23:06 2010
@@ -13,7 +13,7 @@
-module(couch_work_queue).
-behaviour(gen_server).
--export([new/2,queue/2,dequeue/1,dequeue/2,close/1]).
+-export([new/1,queue/2,dequeue/1,dequeue/2,close/1]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
-record(q, {
@@ -27,8 +27,8 @@
close_on_dequeue=false
}).
-new(MaxSize, MaxItems) ->
- gen_server:start_link(couch_work_queue, {MaxSize, MaxItems}, []).
+new(Options) ->
+ gen_server:start_link(couch_work_queue, Options, []).
queue(Wq, Item) ->
gen_server:call(Wq, {queue, Item}, infinity).
@@ -46,8 +46,12 @@ close(Wq) ->
gen_server:cast(Wq, close).
-init({MaxSize,MaxItems}) ->
- {ok, #q{max_size=MaxSize, max_items=MaxItems}}.
+init(Options) ->
+ Q = #q{
+ max_size = couch_util:get_value(max_size, Options),
+ max_items = couch_util:get_value(max_items, Options)
+ },
+ {ok, Q}.
terminate(_Reason, #q{work_waiter=nil}) ->
ok;