You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2011/05/15 04:35:25 UTC

svn commit: r1103266 - /couchdb/trunk/src/couchdb/couch_work_queue.erl

Author: fdmanana
Date: Sun May 15 02:35:25 2011
New Revision: 1103266

URL: http://svn.apache.org/viewvc?rev=1103266&view=rev
Log:
Bug fix in couch_work_queue: state's size not decremented

When dequeing some items from the queue (that is, not taking all the queued items)
the size field of the gen_server's state was not being decremented as it should.
However when all the queue items are delivered to a consumer, the size is set to 0.

This particular fix has a good impact in the new replicator because it uses work queues
and the consumers only dequeue 1 item at a time - producers will no longer be blocked
until the queue gets empty.
For a push replication that used to take about 15 minutes, it now takes about 13 minutes.


Modified:
    couchdb/trunk/src/couchdb/couch_work_queue.erl

Modified: couchdb/trunk/src/couchdb/couch_work_queue.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_work_queue.erl?rev=1103266&r1=1103265&r2=1103266&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_work_queue.erl (original)
+++ couchdb/trunk/src/couchdb/couch_work_queue.erl Sun May 15 02:35:25 2011
@@ -13,6 +13,8 @@
 -module(couch_work_queue).
 -behaviour(gen_server).
 
+-include("couch_db.hrl").
+
 % public API
 -export([new/1, queue/2, dequeue/1, dequeue/2, close/1]).
 
@@ -37,8 +39,10 @@ new(Options) ->
     gen_server:start_link(couch_work_queue, Options, []).
 
 
+queue(Wq, Item) when is_binary(Item) ->
+    gen_server:call(Wq, {queue, Item, byte_size(Item)}, infinity);
 queue(Wq, Item) ->
-    gen_server:call(Wq, {queue, Item}, infinity).
+    gen_server:call(Wq, {queue, Item, byte_size(?term_to_bin(Item))}, infinity).
 
 
 dequeue(Wq) ->
@@ -70,10 +74,10 @@ terminate(_Reason, #q{work_waiters=Worke
     lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers).
 
     
-handle_call({queue, Item}, From, #q{work_waiters = []} = Q0) ->
-    Q = Q0#q{size = increment_queue_size(Q0, Item),
+handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) ->
+    Q = Q0#q{size = Q0#q.size + Size, % increment_queue_size(Q0, Item),
                 items = Q0#q.items + 1,
-                queue = queue:in(Item, Q0#q.queue)},
+                queue = queue:in({Item, Size}, Q0#q.queue)},
     case (Q#q.size >= Q#q.max_size) orelse
             (Q#q.items >= Q#q.max_items) of
     true ->
@@ -82,7 +86,7 @@ handle_call({queue, Item}, From, #q{work
         {reply, ok, Q}
     end;
 
-handle_call({queue, Item}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) ->
+handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) ->
     gen_server:reply(W, {ok, [Item]}),
     {reply, ok, Q#q{work_waiters = Rest}};
 
@@ -107,38 +111,44 @@ deliver_queue_items(Max, Q) ->
     #q{
         queue = Queue,
         items = Count,
+        size = Size,
         close_on_dequeue = Close,
         blocked = Blocked
     } = Q,
     case (Max =:= all) orelse (Max >= Count) of
     false ->
-        {Items, Queue2, Blocked2} = dequeue_items(Max, Queue, Blocked, []),
-        Q2 = Q#q{items = Count - Max, blocked = Blocked2, queue = Queue2},
+        {Items, Size2, Queue2, Blocked2} = dequeue_items(
+            Max, Size, Queue, Blocked, []),
+        Q2 = Q#q{
+            items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2
+        },
         {reply, {ok, Items}, Q2};
     true ->
         lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked),
         Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()},
+        Items = [Item || {Item, _} <- queue:to_list(Queue)],
         case Close of
         false ->
-            {reply, {ok, queue:to_list(Queue)}, Q2};
+            {reply, {ok, Items}, Q2};
         true ->
-            {stop, normal, {ok, queue:to_list(Queue)}, Q2}
+            {stop, normal, {ok, Items}, Q2}
         end
     end.
 
 
-dequeue_items(0, Queue, Blocked, DequeuedAcc) ->
-    {lists:reverse(DequeuedAcc), Queue, Blocked};
+dequeue_items(0, Size, Queue, Blocked, DequeuedAcc) ->
+    {lists:reverse(DequeuedAcc), Size, Queue, Blocked};
 
-dequeue_items(NumItems, Queue, Blocked, DequeuedAcc) ->
-    {{value, Item}, Queue2} = queue:out(Queue),
+dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) ->
+    {{value, {Item, ItemSize}}, Queue2} = queue:out(Queue),
     case Blocked of
     [] ->
         Blocked2 = Blocked;
     [From | Blocked2] ->
         gen_server:reply(From, ok)
     end,
-    dequeue_items(NumItems - 1, Queue2, Blocked2, [Item | DequeuedAcc]).
+    dequeue_items(
+        NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc]).
     
 
 handle_cast(close, #q{items = 0} = Q) ->
@@ -153,10 +163,3 @@ code_change(_OldVsn, State, _Extra) ->
 
 handle_info(X, Q) ->
     {stop, X, Q}.
-
-increment_queue_size(#q{max_size = nil, size = Size}, _Item) ->
-    Size;
-increment_queue_size(#q{size = Size}, Item) when is_binary(Item) ->
-    Size + byte_size(Item);
-increment_queue_size(#q{size = Size}, Item) ->
-    Size + byte_size(term_to_binary(Item)).