You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2011/05/15 04:35:25 UTC
svn commit: r1103266 - /couchdb/trunk/src/couchdb/couch_work_queue.erl
Author: fdmanana
Date: Sun May 15 02:35:25 2011
New Revision: 1103266
URL: http://svn.apache.org/viewvc?rev=1103266&view=rev
Log:
Bug fix in couch_work_queue: state's size not decremented
When dequeing some items from the queue (that is, not taking all the queued items)
the size field of the gen_server's state was not being decremented as it should.
However when all the queue items are delivered to a consumer, the size is set to 0.
This particular fix has a good impact in the new replicator because it uses work queues
and the consumers only dequeue 1 item at a time - producers will no longer be blocked
until the queue gets empty.
For a push replication that used to take about 15 minutes, it now takes about 13 minutes.
Modified:
couchdb/trunk/src/couchdb/couch_work_queue.erl
Modified: couchdb/trunk/src/couchdb/couch_work_queue.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_work_queue.erl?rev=1103266&r1=1103265&r2=1103266&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_work_queue.erl (original)
+++ couchdb/trunk/src/couchdb/couch_work_queue.erl Sun May 15 02:35:25 2011
@@ -13,6 +13,8 @@
-module(couch_work_queue).
-behaviour(gen_server).
+-include("couch_db.hrl").
+
% public API
-export([new/1, queue/2, dequeue/1, dequeue/2, close/1]).
@@ -37,8 +39,10 @@ new(Options) ->
gen_server:start_link(couch_work_queue, Options, []).
+queue(Wq, Item) when is_binary(Item) ->
+ gen_server:call(Wq, {queue, Item, byte_size(Item)}, infinity);
queue(Wq, Item) ->
- gen_server:call(Wq, {queue, Item}, infinity).
+ gen_server:call(Wq, {queue, Item, byte_size(?term_to_bin(Item))}, infinity).
dequeue(Wq) ->
@@ -70,10 +74,10 @@ terminate(_Reason, #q{work_waiters=Worke
lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers).
-handle_call({queue, Item}, From, #q{work_waiters = []} = Q0) ->
- Q = Q0#q{size = increment_queue_size(Q0, Item),
+handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) ->
+ Q = Q0#q{size = Q0#q.size + Size, % increment_queue_size(Q0, Item),
items = Q0#q.items + 1,
- queue = queue:in(Item, Q0#q.queue)},
+ queue = queue:in({Item, Size}, Q0#q.queue)},
case (Q#q.size >= Q#q.max_size) orelse
(Q#q.items >= Q#q.max_items) of
true ->
@@ -82,7 +86,7 @@ handle_call({queue, Item}, From, #q{work
{reply, ok, Q}
end;
-handle_call({queue, Item}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) ->
+handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) ->
gen_server:reply(W, {ok, [Item]}),
{reply, ok, Q#q{work_waiters = Rest}};
@@ -107,38 +111,44 @@ deliver_queue_items(Max, Q) ->
#q{
queue = Queue,
items = Count,
+ size = Size,
close_on_dequeue = Close,
blocked = Blocked
} = Q,
case (Max =:= all) orelse (Max >= Count) of
false ->
- {Items, Queue2, Blocked2} = dequeue_items(Max, Queue, Blocked, []),
- Q2 = Q#q{items = Count - Max, blocked = Blocked2, queue = Queue2},
+ {Items, Size2, Queue2, Blocked2} = dequeue_items(
+ Max, Size, Queue, Blocked, []),
+ Q2 = Q#q{
+ items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2
+ },
{reply, {ok, Items}, Q2};
true ->
lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked),
Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()},
+ Items = [Item || {Item, _} <- queue:to_list(Queue)],
case Close of
false ->
- {reply, {ok, queue:to_list(Queue)}, Q2};
+ {reply, {ok, Items}, Q2};
true ->
- {stop, normal, {ok, queue:to_list(Queue)}, Q2}
+ {stop, normal, {ok, Items}, Q2}
end
end.
-dequeue_items(0, Queue, Blocked, DequeuedAcc) ->
- {lists:reverse(DequeuedAcc), Queue, Blocked};
+dequeue_items(0, Size, Queue, Blocked, DequeuedAcc) ->
+ {lists:reverse(DequeuedAcc), Size, Queue, Blocked};
-dequeue_items(NumItems, Queue, Blocked, DequeuedAcc) ->
- {{value, Item}, Queue2} = queue:out(Queue),
+dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) ->
+ {{value, {Item, ItemSize}}, Queue2} = queue:out(Queue),
case Blocked of
[] ->
Blocked2 = Blocked;
[From | Blocked2] ->
gen_server:reply(From, ok)
end,
- dequeue_items(NumItems - 1, Queue2, Blocked2, [Item | DequeuedAcc]).
+ dequeue_items(
+ NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc]).
handle_cast(close, #q{items = 0} = Q) ->
@@ -153,10 +163,3 @@ code_change(_OldVsn, State, _Extra) ->
handle_info(X, Q) ->
{stop, X, Q}.
-
-increment_queue_size(#q{max_size = nil, size = Size}, _Item) ->
- Size;
-increment_queue_size(#q{size = Size}, Item) when is_binary(Item) ->
- Size + byte_size(Item);
-increment_queue_size(#q{size = Size}, Item) ->
- Size + byte_size(term_to_binary(Item)).