You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2014/07/22 01:57:20 UTC

[07/43] couchdb commit: updated refs/heads/1963-eunit-bigcouch to 424dca5

Port 042-work-queue.t etap test suite to eunit

Etap tests were made in flow style, testing the same things multiple
times without real need. For eunit they are split into small test cases
to focus on testing goals.

Timeout on receive is decreased from 3000 to 100.


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/22a47bb5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/22a47bb5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/22a47bb5

Branch: refs/heads/1963-eunit-bigcouch
Commit: 22a47bb58a1008b9cce460f41d9a0ff48b709b0c
Parents: a7dc5c1
Author: Alexander Shorin <kx...@apache.org>
Authored: Sun May 18 14:04:08 2014 +0400
Committer: Russell Branca <ch...@apache.org>
Committed: Mon Jul 21 16:36:49 2014 -0700

----------------------------------------------------------------------
 test/couchdb/couch_work_queue_tests.erl | 393 +++++++++++++++++++++
 test/etap/042-work-queue.t              | 500 ---------------------------
 2 files changed, 393 insertions(+), 500 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/22a47bb5/test/couchdb/couch_work_queue_tests.erl
----------------------------------------------------------------------
diff --git a/test/couchdb/couch_work_queue_tests.erl b/test/couchdb/couch_work_queue_tests.erl
new file mode 100644
index 0000000..8a463b5
--- /dev/null
+++ b/test/couchdb/couch_work_queue_tests.erl
@@ -0,0 +1,393 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_work_queue_tests).
+
+-include("couch_eunit.hrl").
+
+-define(TIMEOUT, 100).
+
+
+setup(Opts) ->
+    {ok, Q} = couch_work_queue:new(Opts),
+    Producer = spawn_producer(Q),
+    Consumer = spawn_consumer(Q),
+    {Q, Producer, Consumer}.
+
+setup_max_items() ->
+    setup([{max_items, 3}]).
+
+setup_max_size() ->
+    setup([{max_size, 160}]).
+
+setup_max_items_and_size() ->
+    setup([{max_size, 160}, {max_items, 3}]).
+
+setup_multi_workers() ->
+    {Q, Producer, Consumer1} = setup([{max_size, 160},
+                                      {max_items, 3},
+                                      {multi_workers, true}]),
+    Consumer2 = spawn_consumer(Q),
+    Consumer3 = spawn_consumer(Q),
+    {Q, Producer, [Consumer1, Consumer2, Consumer3]}.
+
+teardown({Q, Producer, Consumers}) when is_list(Consumers) ->
+    % consume all to unblock and let producer/consumer stop without timeout
+    [consume(Consumer, all) || Consumer <- Consumers],
+
+    ok = close_queue(Q),
+    ok = stop(Producer, "producer"),
+    R = [stop(Consumer, "consumer") || Consumer <- Consumers],
+    R = [ok || _ <- Consumers],
+    ok;
+teardown({Q, Producer, Consumer}) ->
+    teardown({Q, Producer, [Consumer]}).
+
+
+single_consumer_test_() ->
+    {
+        "Single producer and consumer",
+        [
+            {
+                "Queue with 3 max items",
+                {
+                    foreach,
+                    fun setup_max_items/0, fun teardown/1,
+                    single_consumer_max_item_count() ++ common_cases()
+                }
+            },
+            {
+                "Queue with max size of 160 bytes",
+                {
+                    foreach,
+                    fun setup_max_size/0, fun teardown/1,
+                    single_consumer_max_size() ++ common_cases()
+                }
+            },
+            {
+                "Queue with max size of 160 bytes and 3 max items",
+                {
+                    foreach,
+                    fun setup_max_items_and_size/0, fun teardown/1,
+                    single_consumer_max_items_and_size() ++ common_cases()
+                }
+            }
+        ]
+    }.
+
+multiple_consumers_test_() ->
+    {
+        "Single producer and multiple consumers",
+        [
+            {
+                "Queue with max size of 160 bytes and 3 max items",
+                {
+                    foreach,
+                    fun setup_multi_workers/0, fun teardown/1,
+                    common_cases() ++ multiple_consumers()
+                }
+
+            }
+        ]
+    }.
+
+common_cases()->
+    [
+        fun should_block_consumer_on_dequeue_from_empty_queue/1,
+        fun should_consume_right_item/1,
+        fun should_timeout_on_close_non_empty_queue/1,
+        fun should_not_block_producer_for_non_empty_queue_after_close/1,
+        fun should_be_closed/1
+    ].
+
+single_consumer_max_item_count()->
+    [
+        fun should_have_no_items_for_new_queue/1,
+        fun should_block_producer_on_full_queue_count/1,
+        fun should_receive_first_queued_item/1,
+        fun should_consume_multiple_items/1,
+        fun should_consume_all/1
+    ].
+
+single_consumer_max_size()->
+    [
+        fun should_have_zero_size_for_new_queue/1,
+        fun should_block_producer_on_full_queue_size/1,
+        fun should_increase_queue_size_on_produce/1,
+        fun should_receive_first_queued_item/1,
+        fun should_consume_multiple_items/1,
+        fun should_consume_all/1
+    ].
+
+single_consumer_max_items_and_size() ->
+    single_consumer_max_item_count() ++ single_consumer_max_size().
+
+multiple_consumers() ->
+    [
+        fun should_have_zero_size_for_new_queue/1,
+        fun should_have_no_items_for_new_queue/1,
+        fun should_increase_queue_size_on_produce/1
+    ].
+
+
+should_have_no_items_for_new_queue({Q, _, _}) ->
+    ?_assertEqual(0, couch_work_queue:item_count(Q)).
+
+should_have_zero_size_for_new_queue({Q, _, _}) ->
+    ?_assertEqual(0, couch_work_queue:size(Q)).
+
+should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumers}) when is_list(Consumers) ->
+    [consume(C, 2) || C <- Consumers],
+    Pongs = [ping(C) || C <- Consumers],
+    ?_assertEqual([timeout, timeout, timeout], Pongs);
+should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumer}) ->
+    consume(Consumer, 1),
+    Pong = ping(Consumer),
+    ?_assertEqual(timeout, Pong).
+
+should_consume_right_item({Q, Producer, Consumers}) when is_list(Consumers) ->
+    [consume(C, 3) || C <- Consumers],
+
+    Item1 = produce(Producer, 10),
+    ok = ping(Producer),
+    ?assertEqual(0, couch_work_queue:item_count(Q)),
+    ?assertEqual(0, couch_work_queue:size(Q)),
+
+    Item2 = produce(Producer, 10),
+    ok = ping(Producer),
+    ?assertEqual(0, couch_work_queue:item_count(Q)),
+    ?assertEqual(0, couch_work_queue:size(Q)),
+
+    Item3 = produce(Producer, 10),
+    ok = ping(Producer),
+    ?assertEqual(0, couch_work_queue:item_count(Q)),
+    ?assertEqual(0, couch_work_queue:size(Q)),
+
+    R = [{ping(C), Item}
+         || {C, Item} <- lists:zip(Consumers, [Item1, Item2, Item3])],
+
+    ?_assertEqual([{ok, Item1}, {ok, Item2}, {ok, Item3}], R);
+should_consume_right_item({_, Producer, Consumer}) ->
+    consume(Consumer, 1),
+    Item = produce(Producer, 10),
+    produce(Producer, 20),
+    ok = ping(Producer),
+    ok = ping(Consumer),
+    {ok, Items} = last_consumer_items(Consumer),
+    ?_assertEqual([Item], Items).
+
+should_increase_queue_size_on_produce({Q, Producer, _}) ->
+    produce(Producer, 50),
+    ok = ping(Producer),
+    Count1 = couch_work_queue:item_count(Q),
+    Size1 = couch_work_queue:size(Q),
+
+    produce(Producer, 10),
+    Count2 = couch_work_queue:item_count(Q),
+    Size2 = couch_work_queue:size(Q),
+
+    ?_assertEqual([{Count1, Size1}, {Count2, Size2}], [{1, 50}, {2, 60}]).
+
+should_block_producer_on_full_queue_count({Q, Producer, _}) ->
+    produce(Producer, 10),
+    ?assertEqual(1, couch_work_queue:item_count(Q)),
+    ok = ping(Producer),
+
+    produce(Producer, 15),
+    ?assertEqual(2, couch_work_queue:item_count(Q)),
+    ok = ping(Producer),
+
+    produce(Producer, 20),
+    ?assertEqual(3, couch_work_queue:item_count(Q)),
+    Pong = ping(Producer),
+
+    ?_assertEqual(timeout, Pong).
+
+should_block_producer_on_full_queue_size({Q, Producer, _}) ->
+    produce(Producer, 100),
+    ok = ping(Producer),
+    ?assertEqual(1, couch_work_queue:item_count(Q)),
+    ?assertEqual(100, couch_work_queue:size(Q)),
+
+    produce(Producer, 110),
+    Pong = ping(Producer),
+    ?assertEqual(2, couch_work_queue:item_count(Q)),
+    ?assertEqual(210, couch_work_queue:size(Q)),
+
+    ?_assertEqual(timeout, Pong).
+
+should_consume_multiple_items({_, Producer, Consumer}) ->
+    Item1 = produce(Producer, 10),
+    ok = ping(Producer),
+
+    Item2 = produce(Producer, 15),
+    ok = ping(Producer),
+
+    consume(Consumer, 2),
+
+    {ok, Items} = last_consumer_items(Consumer),
+    ?_assertEqual([Item1, Item2], Items).
+
+should_receive_first_queued_item({Q, Producer, Consumer}) ->
+    consume(Consumer, 100),
+    timeout = ping(Consumer),
+
+    Item = produce(Producer, 11),
+    ok = ping(Producer),
+
+    ok = ping(Consumer),
+    ?assertEqual(0, couch_work_queue:item_count(Q)),
+
+    {ok, Items} = last_consumer_items(Consumer),
+    ?_assertEqual([Item], Items).
+
+should_consume_all({_, Producer, Consumer}) ->
+    Item1 = produce(Producer, 10),
+    Item2 = produce(Producer, 15),
+    Item3 = produce(Producer, 20),
+
+    consume(Consumer, all),
+
+    {ok, Items} = last_consumer_items(Consumer),
+    ?_assertEqual([Item1, Item2, Item3], Items).
+
+should_timeout_on_close_non_empty_queue({Q, Producer, _}) ->
+    produce(Producer, 1),
+    Status = close_queue(Q),
+
+    ?_assertEqual(timeout, Status).
+
+should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) ->
+    produce(Producer, 1),
+    close_queue(Q),
+    Pong = ping(Producer),
+    Size = couch_work_queue:size(Q),
+    Count = couch_work_queue:item_count(Q),
+
+    ?_assertEqual({ok, 1, 1}, {Pong, Size, Count}).
+
+should_be_closed({Q, _, Consumers}) when is_list(Consumers) ->
+    ok = close_queue(Q),
+
+    [consume(C, 1) || C <- Consumers],
+
+    LastConsumerItems = [last_consumer_items(C) || C <- Consumers],
+    ItemsCount = couch_work_queue:item_count(Q),
+    Size = couch_work_queue:size(Q),
+
+    ?_assertEqual({[closed, closed, closed], closed, closed},
+                  {LastConsumerItems, ItemsCount, Size});
+should_be_closed({Q, _, Consumer}) ->
+    ok = close_queue(Q),
+
+    consume(Consumer, 1),
+
+    LastConsumerItems = last_consumer_items(Consumer),
+    ItemsCount = couch_work_queue:item_count(Q),
+    Size = couch_work_queue:size(Q),
+
+    ?_assertEqual({closed, closed, closed},
+                  {LastConsumerItems, ItemsCount, Size}).
+
+
+close_queue(Q) ->
+    ok = couch_work_queue:close(Q),
+    MonRef = erlang:monitor(process, Q),
+    receive
+        {'DOWN', MonRef, process, Q, _Reason} -> ok
+    after ?TIMEOUT ->
+        erlang:demonitor(MonRef),
+        timeout
+    end.
+
+spawn_consumer(Q) ->
+    Parent = self(),
+    spawn(fun() -> consumer_loop(Parent, Q, nil) end).
+
+consumer_loop(Parent, Q, PrevItem) ->
+    receive
+        {stop, Ref} ->
+            Parent ! {ok, Ref};
+        {ping, Ref} ->
+            Parent ! {pong, Ref},
+            consumer_loop(Parent, Q, PrevItem);
+        {last_item, Ref} ->
+            Parent ! {item, Ref, PrevItem},
+            consumer_loop(Parent, Q, PrevItem);
+        {consume, N} ->
+            Result = couch_work_queue:dequeue(Q, N),
+            consumer_loop(Parent, Q, Result)
+    end.
+
+spawn_producer(Q) ->
+    Parent = self(),
+    spawn(fun() -> producer_loop(Parent, Q) end).
+
+producer_loop(Parent, Q) ->
+    receive
+        {stop, Ref} ->
+            Parent ! {ok, Ref};
+        {ping, Ref} ->
+            Parent ! {pong, Ref},
+            producer_loop(Parent, Q);
+        {produce, Ref, Size} ->
+            Item = crypto:rand_bytes(Size),
+            Parent ! {item, Ref, Item},
+            ok = couch_work_queue:queue(Q, Item),
+            producer_loop(Parent, Q)
+    end.
+
+consume(Consumer, N) ->
+    Consumer ! {consume, N}.
+
+last_consumer_items(Consumer) ->
+    Ref = make_ref(),
+    Consumer ! {last_item, Ref},
+    receive
+        {item, Ref, Items} ->
+            Items
+    after ?TIMEOUT ->
+        timeout
+    end.
+
+produce(Producer, Size) ->
+    Ref = make_ref(),
+    Producer ! {produce, Ref, Size},
+    receive
+        {item, Ref, Item} ->
+            Item
+    after ?TIMEOUT ->
+        erlang:error({assertion_failed,
+                      [{module, ?MODULE},
+                       {line, ?LINE},
+                       {reason, "Timeout asking producer to produce an item"}]})
+    end.
+
+ping(Pid) ->
+    Ref = make_ref(),
+    Pid ! {ping, Ref},
+    receive
+        {pong, Ref} ->
+            ok
+    after ?TIMEOUT ->
+        timeout
+    end.
+
+stop(Pid, Name) ->
+    Ref = make_ref(),
+    Pid ! {stop, Ref},
+    receive
+        {ok, Ref} -> ok
+    after ?TIMEOUT ->
+        ?debugMsg("Timeout stopping " ++ Name),
+        timeout
+    end.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/22a47bb5/test/etap/042-work-queue.t
----------------------------------------------------------------------
diff --git a/test/etap/042-work-queue.t b/test/etap/042-work-queue.t
deleted file mode 100755
index 8594a6f..0000000
--- a/test/etap/042-work-queue.t
+++ /dev/null
@@ -1,500 +0,0 @@
-#!/usr/bin/env escript
-%% -*- erlang -*-
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-main(_) ->
-    test_util:init_code_path(),
-
-    etap:plan(155),
-    case (catch test()) of
-        ok ->
-            etap:end_tests();
-        Other ->
-            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
-            etap:bail(Other)
-    end,
-    ok.
-
-
-test() ->
-    ok = crypto:start(),
-    test_single_consumer_max_item_count(),
-    test_single_consumer_max_size(),
-    test_single_consumer_max_item_count_and_size(),
-    test_multiple_consumers(),
-    ok.
-
-
-test_single_consumer_max_item_count() ->
-    etap:diag("Spawning a queue with 3 max items, 1 producer and 1 consumer"),
-
-    {ok, Q} = couch_work_queue:new([{max_items, 3}]),
-    Producer = spawn_producer(Q),
-    Consumer = spawn_consumer(Q),
-
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-
-    consume(Consumer, 1),
-    etap:is(ping(Consumer), timeout,
-        "Consumer blocked when attempting to dequeue 1 item from empty queue"),
-
-    Item1 = produce(Producer, 10),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-
-    etap:is(ping(Consumer), ok, "Consumer unblocked"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item1]},
-        "Consumer received the right item"),
-
-    Item2 = produce(Producer, 20),
-    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
-    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
-
-    Item3 = produce(Producer, 15),
-    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
-    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
-
-    Item4 = produce(Producer, 3),
-    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
-    etap:is(ping(Producer), timeout, "Producer blocked with full queue"),
-
-    consume(Consumer, 2),
-    etap:is(ping(Consumer), ok,
-        "Consumer not blocked when attempting to dequeue 2 items from queue"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item2, Item3]},
-        "Consumer received the right items"),
-    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
-
-    consume(Consumer, 2),
-    etap:is(ping(Consumer), ok,
-        "Consumer not blocked when attempting to dequeue 2 items from queue"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item4]},
-        "Consumer received the right item"),
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-
-    consume(Consumer, 100),
-    etap:is(ping(Consumer), timeout,
-        "Consumer blocked when attempting to dequeue 100 items from empty queue"),
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-
-    Item5 = produce(Producer, 11),
-    etap:is(ping(Producer), ok, "Producer not blocked with empty queue"),
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-
-    Item6 = produce(Producer, 19),
-    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
-    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
-
-    Item7 = produce(Producer, 2),
-    etap:is(ping(Producer), ok, "Producer not blocked with non full queue"),
-    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
-
-    Item8 = produce(Producer, 33),
-    etap:is(ping(Producer), timeout, "Producer blocked with full queue"),
-    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
-
-    etap:is(ping(Consumer), ok, "Consumer unblocked"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item5]},
-        "Consumer received the first queued item"),
-    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
-
-    consume(Consumer, all),
-    etap:is(ping(Consumer), ok,
-        "Consumer not blocked when attempting to dequeue all items from queue"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item6, Item7, Item8]},
-        "Consumer received all queued items"),
-
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-
-    etap:is(close_queue(Q), ok, "Closed queue"),
-    consume(Consumer, 1),
-    etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),
-    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
-    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
-
-    stop(Producer, "producer"),
-    stop(Consumer, "consumer").
-
-
-
-test_single_consumer_max_size() ->
-    etap:diag("Spawning a queue with max size of 160 bytes, "
-        "1 producer and 1 consumer"),
-
-    {ok, Q} = couch_work_queue:new([{max_size, 160}]),
-    Producer = spawn_producer(Q),
-    Consumer = spawn_consumer(Q),
-
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    consume(Consumer, 1),
-    etap:is(ping(Consumer), timeout,
-        "Consumer blocked when attempting to dequeue 1 item from empty queue"),
-
-    Item1 = produce(Producer, 50),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-
-    etap:is(ping(Consumer), ok, "Consumer unblocked"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item1]},
-        "Consumer received the right item"),
-
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    Item2 = produce(Producer, 50),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
-    etap:is(couch_work_queue:size(Q), 50, "Queue size is 50 bytes"),
-
-    Item3 = produce(Producer, 50),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
-    etap:is(couch_work_queue:size(Q), 100, "Queue size is 100 bytes"),
-
-    Item4 = produce(Producer, 61),
-    etap:is(ping(Producer), timeout, "Producer blocked"),
-    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
-    etap:is(couch_work_queue:size(Q), 161, "Queue size is 161 bytes"),
-
-    consume(Consumer, 1),
-    etap:is(ping(Consumer), ok,
-        "Consumer not blocked when attempting to dequeue 1 item from full queue"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item2]},
-        "Consumer received the right item"),
-    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
-    etap:is(couch_work_queue:size(Q), 111, "Queue size is 111 bytes"),
-
-    Item5 = produce(Producer, 20),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
-    etap:is(couch_work_queue:size(Q), 131, "Queue size is 131 bytes"),
-
-    Item6 = produce(Producer, 40),
-    etap:is(ping(Producer), timeout, "Producer blocked"),
-    etap:is(couch_work_queue:item_count(Q), 4, "Queue item count is 4"),
-    etap:is(couch_work_queue:size(Q), 171, "Queue size is 171 bytes"),
-
-    etap:is(close_queue(Q), timeout,
-        "Timeout when trying to close non-empty queue"),
-
-    consume(Consumer, 2),
-    etap:is(ping(Consumer), ok,
-        "Consumer not blocked when attempting to dequeue 2 items from full queue"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item3, Item4]},
-        "Consumer received the right items"),
-    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
-    etap:is(couch_work_queue:size(Q), 60, "Queue size is 60 bytes"),
-
-    etap:is(close_queue(Q), timeout,
-        "Timeout when trying to close non-empty queue"),
-
-    consume(Consumer, all),
-    etap:is(ping(Consumer), ok,
-        "Consumer not blocked when attempting to dequeue all items from queue"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6]},
-        "Consumer received the right items"),
-
-    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
-    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
-
-    consume(Consumer, all),
-    etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),
-
-    stop(Producer, "producer"),
-    stop(Consumer, "consumer").
-
-
-test_single_consumer_max_item_count_and_size() ->
-    etap:diag("Spawning a queue with 3 max items, max size of 200 bytes, "
-        "1 producer and 1 consumer"),
-
-    {ok, Q} = couch_work_queue:new([{max_items, 3}, {max_size, 200}]),
-    Producer = spawn_producer(Q),
-    Consumer = spawn_consumer(Q),
-
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    Item1 = produce(Producer, 100),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
-    etap:is(couch_work_queue:size(Q), 100, "Queue size is 100 bytes"),
-
-    Item2 = produce(Producer, 110),
-    etap:is(ping(Producer), timeout,
-        "Producer blocked when queue size >= max_size"),
-    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
-    etap:is(couch_work_queue:size(Q), 210, "Queue size is 210 bytes"),
-
-    consume(Consumer, all),
-    etap:is(ping(Consumer), ok,
-        "Consumer not blocked when attempting to dequeue all items from queue"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item1, Item2]},
-        "Consumer received the right items"),
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    etap:is(ping(Producer), ok, "Producer not blocked anymore"),
-
-    Item3 = produce(Producer, 10),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
-    etap:is(couch_work_queue:size(Q), 10, "Queue size is 10 bytes"),
-
-    Item4 = produce(Producer, 4),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
-    etap:is(couch_work_queue:size(Q), 14, "Queue size is 14 bytes"),
-
-    Item5 = produce(Producer, 2),
-    etap:is(ping(Producer), timeout,
-        "Producer blocked when queue item count = max_items"),
-    etap:is(couch_work_queue:item_count(Q), 3, "Queue item count is 3"),
-    etap:is(couch_work_queue:size(Q), 16, "Queue size is 16 bytes"),
-
-    consume(Consumer, 1),
-    etap:is(ping(Consumer), ok,
-        "Consumer not blocked when attempting to dequeue 1 item from queue"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item3]},
-       "Consumer received 1 item"),
-    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
-    etap:is(couch_work_queue:size(Q), 6, "Queue size is 6 bytes"),
-
-    etap:is(close_queue(Q), timeout,
-        "Timeout when trying to close non-empty queue"),
-
-    consume(Consumer, 1),
-    etap:is(ping(Consumer), ok,
-        "Consumer not blocked when attempting to dequeue 1 item from queue"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item4]},
-       "Consumer received 1 item"),
-    etap:is(couch_work_queue:item_count(Q), 1, "Queue item count is 1"),
-    etap:is(couch_work_queue:size(Q), 2, "Queue size is 2 bytes"),
-
-    Item6 = produce(Producer, 50),
-    etap:is(ping(Producer), ok,
-        "Producer not blocked when queue is not full and already received"
-        " a close request"),
-    etap:is(couch_work_queue:item_count(Q), 2, "Queue item count is 2"),
-    etap:is(couch_work_queue:size(Q), 52, "Queue size is 52 bytes"),
-
-    consume(Consumer, all),
-    etap:is(ping(Consumer), ok,
-        "Consumer not blocked when attempting to dequeue all items from queue"),
-    etap:is(last_consumer_items(Consumer), {ok, [Item5, Item6]},
-       "Consumer received all queued items"),
-
-    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
-    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
-
-    consume(Consumer, 1),
-    etap:is(last_consumer_items(Consumer), closed, "Consumer got closed queue"),
-
-    stop(Producer, "producer"),
-    stop(Consumer, "consumer").
-
-
-test_multiple_consumers() ->
-    etap:diag("Spawning a queue with 3 max items, max size of 200 bytes, "
-        "1 producer and 3 consumers"),
-
-    {ok, Q} = couch_work_queue:new(
-        [{max_items, 3}, {max_size, 200}, {multi_workers, true}]),
-    Producer = spawn_producer(Q),
-    Consumer1 = spawn_consumer(Q),
-    Consumer2 = spawn_consumer(Q),
-    Consumer3 = spawn_consumer(Q),
-
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    consume(Consumer1, 1),
-    etap:is(ping(Consumer1), timeout,
-        "Consumer 1 blocked when attempting to dequeue 1 item from empty queue"),
-    consume(Consumer2, 2),
-    etap:is(ping(Consumer2), timeout,
-        "Consumer 2 blocked when attempting to dequeue 2 items from empty queue"),
-    consume(Consumer3, 1),
-    etap:is(ping(Consumer3), timeout,
-        "Consumer 3 blocked when attempting to dequeue 1 item from empty queue"),
-
-    Item1 = produce(Producer, 50),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    Item2 = produce(Producer, 50),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    Item3 = produce(Producer, 50),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    etap:is(ping(Consumer1), ok, "Consumer 1 unblocked"),
-    etap:is(last_consumer_items(Consumer1), {ok, [Item1]},
-       "Consumer 1 received 1 item"),
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    etap:is(ping(Consumer2), ok, "Consumer 2 unblocked"),
-    etap:is(last_consumer_items(Consumer2), {ok, [Item2]},
-       "Consumer 2 received 1 item"),
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    etap:is(ping(Consumer3), ok, "Consumer 3 unblocked"),
-    etap:is(last_consumer_items(Consumer3), {ok, [Item3]},
-       "Consumer 3 received 1 item"),
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    consume(Consumer1, 1),
-    etap:is(ping(Consumer1), timeout,
-        "Consumer 1 blocked when attempting to dequeue 1 item from empty queue"),
-    consume(Consumer2, 2),
-    etap:is(ping(Consumer2), timeout,
-        "Consumer 2 blocked when attempting to dequeue 1 item from empty queue"),
-    consume(Consumer3, 1),
-    etap:is(ping(Consumer3), timeout,
-        "Consumer 3 blocked when attempting to dequeue 1 item from empty queue"),
-
-    Item4 = produce(Producer, 50),
-    etap:is(ping(Producer), ok, "Producer not blocked"),
-    etap:is(couch_work_queue:item_count(Q), 0, "Queue item count is 0"),
-    etap:is(couch_work_queue:size(Q), 0, "Queue size is 0 bytes"),
-
-    etap:is(close_queue(Q), ok, "Closed queue"),
-
-    etap:is(ping(Consumer1), ok, "Consumer 1 unblocked"),
-    etap:is(last_consumer_items(Consumer1), {ok, [Item4]},
-       "Consumer 1 received 1 item"),
-
-    etap:is(couch_work_queue:item_count(Q), closed, "Queue closed"),
-    etap:is(couch_work_queue:size(Q), closed, "Queue closed"),
-
-    etap:is(ping(Consumer2), ok, "Consumer 2 unblocked"),
-    etap:is(last_consumer_items(Consumer2), closed,
-        "Consumer 2 received 'closed' atom"),
-
-    etap:is(ping(Consumer3), ok, "Consumer 3 unblocked"),
-    etap:is(last_consumer_items(Consumer3), closed,
-        "Consumer 3 received 'closed' atom"),
-
-    stop(Producer, "producer"),
-    stop(Consumer1, "consumer 1"),
-    stop(Consumer2, "consumer 2"),
-    stop(Consumer3, "consumer 3").
-
-
-close_queue(Q) ->
-    ok = couch_work_queue:close(Q),
-    MonRef = erlang:monitor(process, Q),
-    receive
-    {'DOWN', MonRef, process, Q, _Reason} ->
-         etap:diag("Queue closed")
-    after 3000 ->
-         erlang:demonitor(MonRef),
-         timeout
-    end.
-
-
-spawn_consumer(Q) ->
-    Parent = self(),
-    spawn(fun() -> consumer_loop(Parent, Q, nil) end).
-
-
-consumer_loop(Parent, Q, PrevItem) ->
-    receive
-    {stop, Ref} ->
-        Parent ! {ok, Ref};
-    {ping, Ref} ->
-        Parent ! {pong, Ref},
-        consumer_loop(Parent, Q, PrevItem);
-    {last_item, Ref} ->
-        Parent ! {item, Ref, PrevItem},
-        consumer_loop(Parent, Q, PrevItem);
-    {consume, N} ->
-        Result = couch_work_queue:dequeue(Q, N),
-        consumer_loop(Parent, Q, Result)
-    end.
-
-
-spawn_producer(Q) ->
-    Parent = self(),
-    spawn(fun() -> producer_loop(Parent, Q) end).
-
-
-producer_loop(Parent, Q) ->
-    receive
-    {stop, Ref} ->
-        Parent ! {ok, Ref};
-    {ping, Ref} ->
-        Parent ! {pong, Ref},
-        producer_loop(Parent, Q);
-    {produce, Ref, Size} ->
-        Item = crypto:rand_bytes(Size),
-        Parent ! {item, Ref, Item},
-        ok = couch_work_queue:queue(Q, Item),
-        producer_loop(Parent, Q)
-    end.
-
-
-consume(Consumer, N) ->
-    Consumer ! {consume, N}.
-
-
-last_consumer_items(Consumer) ->
-    Ref = make_ref(),
-    Consumer ! {last_item, Ref},
-    receive
-    {item, Ref, Items} ->
-        Items
-    after 3000 ->
-        timeout
-    end.
-
-
-produce(Producer, Size) ->
-    Ref = make_ref(),
-    Producer ! {produce, Ref, Size},
-    receive
-    {item, Ref, Item} ->
-        Item
-    after 3000 ->
-        etap:bail("Timeout asking producer to produce an item")
-    end.
-
-
-ping(Pid) ->
-    Ref = make_ref(),
-    Pid ! {ping, Ref},
-    receive
-    {pong, Ref} ->
-        ok
-    after 3000 ->
-        timeout
-    end.
-
-
-stop(Pid, Name) ->
-    Ref = make_ref(),
-    Pid ! {stop, Ref},
-    receive
-    {ok, Ref} ->
-        etap:diag("Stopped " ++ Name)
-    after 3000 ->
-        etap:bail("Timeout stopping " ++ Name)
-    end.