You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2013/04/24 17:30:38 UTC

git commit: updated refs/heads/1775-feature-io-regulator to 1d1cd9a

Updated Branches:
  refs/heads/1775-feature-io-regulator [created] 1d1cd9ad0


Deprioritize compaction I/O

When there are compaction vs non-compaction I/O requests, service the
non-compaction requests with higher probability.

Note: For demonstration purposes at the moment, the code is likely too
slow for production use.

COUCHDB-1775


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/1d1cd9ad
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/1d1cd9ad
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/1d1cd9ad

Branch: refs/heads/1775-feature-io-regulator
Commit: 1d1cd9ad097fbe3dd0977857fb3c08de23d8f4e0
Parents: ae6f1eb
Author: Robert Newson <rn...@apache.org>
Authored: Sat Apr 20 15:49:43 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Apr 24 16:28:54 2013 +0100

----------------------------------------------------------------------
 src/couch_index/src/couch_index_compactor.erl |    1 +
 src/couchdb/Makefile.am                       |    2 +
 src/couchdb/couch_db_updater.erl              |    1 +
 src/couchdb/couch_file.erl                    |   10 +-
 src/couchdb/couch_io_regulator.erl            |  126 ++++++++++++++++++++
 src/couchdb/couch_primary_sup.erl             |    6 +
 6 files changed, 141 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/1d1cd9ad/src/couch_index/src/couch_index_compactor.erl
----------------------------------------------------------------------
diff --git a/src/couch_index/src/couch_index_compactor.erl b/src/couch_index/src/couch_index_compactor.erl
index 72bff51..2360e21 100644
--- a/src/couch_index/src/couch_index_compactor.erl
+++ b/src/couch_index/src/couch_index_compactor.erl
@@ -97,6 +97,7 @@ compact(Parent, Mod, IdxState) ->
     compact(Parent, Mod, IdxState, []).
 
 compact(Idx, Mod, IdxState, Opts) ->
+    erlang:put(io_class, compaction),
     DbName = Mod:get(db_name, IdxState),
     Args = [DbName, Mod:get(idx_name, IdxState)],
     ?LOG_INFO("Compaction started for db: ~s idx: ~s", Args),

http://git-wip-us.apache.org/repos/asf/couchdb/blob/1d1cd9ad/src/couchdb/Makefile.am
----------------------------------------------------------------------
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 9fe19bc..5b150f0 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -57,6 +57,7 @@ source_files = \
     couch_httpd_rewrite.erl \
     couch_httpd_stats_handlers.erl \
     couch_httpd_vhost.erl \
+    couch_io_regulator.erl \
     couch_key_tree.erl \
     couch_log.erl \
     couch_native_process.erl \
@@ -114,6 +115,7 @@ compiled_files = \
     couch_httpd_rewrite.beam \
     couch_httpd_stats_handlers.beam \
     couch_httpd_vhost.beam \
+    couch_io_regulator.beam \
     couch_key_tree.beam \
     couch_log.beam \
     couch_native_process.beam \

http://git-wip-us.apache.org/repos/asf/couchdb/blob/1d1cd9ad/src/couchdb/couch_db_updater.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index 0ce3048..91a8b74 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -968,6 +968,7 @@ copy_compact(Db, NewDb0, Retry) ->
     commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
 
 start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=PurgeSeq}}=Db) ->
+    erlang:put(io_class, compaction),
     CompactFile = Filepath ++ ".compact",
     ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
     case couch_file:open(CompactFile) of

http://git-wip-us.apache.org/repos/asf/couchdb/blob/1d1cd9ad/src/couchdb/couch_file.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index ee5dafb..5b837cf 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -106,14 +106,14 @@ append_term_md5(Fd, Term, Options) ->
 %%----------------------------------------------------------------------
 
 append_binary(Fd, Bin) ->
-    gen_server:call(Fd, {append_bin, assemble_file_chunk(Bin)}, infinity).
+    couch_io_regulator:io(Fd, {append_bin, assemble_file_chunk(Bin)}).
     
 append_binary_md5(Fd, Bin) ->
-    gen_server:call(Fd,
-        {append_bin, assemble_file_chunk(Bin, couch_util:md5(Bin))}, infinity).
+    couch_io_regulator:io(Fd,
+        {append_bin, assemble_file_chunk(Bin, couch_util:md5(Bin))}).
 
 append_raw_chunk(Fd, Chunk) ->
-    gen_server:call(Fd, {append_bin, Chunk}, infinity).
+    couch_io_regulator:io(Fd, {append_bin, Chunk}).
 
 
 assemble_file_chunk(Bin) ->
@@ -148,7 +148,7 @@ pread_binary(Fd, Pos) ->
 
 
 pread_iolist(Fd, Pos) ->
-    case gen_server:call(Fd, {pread_iolist, Pos}, infinity) of
+    case couch_io_regulator:io(Fd, {pread_iolist, Pos}) of
     {ok, IoList, <<>>} ->
         {ok, IoList};
     {ok, IoList, Md5} ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/1d1cd9ad/src/couchdb/couch_io_regulator.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_io_regulator.erl b/src/couchdb/couch_io_regulator.erl
new file mode 100644
index 0000000..a740756
--- /dev/null
+++ b/src/couchdb/couch_io_regulator.erl
@@ -0,0 +1,126 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_io_regulator).
+-behaviour(gen_server).
+
+-export([start_link/0, io/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
+
+-record(state, {
+    concurrency=10,
+    ratio,
+    interactive=queue:new(),
+    compaction=queue:new(),
+    running=[]
+}).
+
+-record(request, {
+    fd,
+    msg,
+    class,
+    from,
+    ref
+}).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+io(Fd, Msg) ->
+    Request = #request{fd=Fd, msg=Msg, class=get(io_class), from=self()},
+    gen_server:call(?MODULE, Request, infinity).
+
+init(_) ->
+    Ratio = list_to_float(couch_config:get("io", "ratio", "0.01")),
+    {ok, #state{ratio=Ratio}}.
+
+handle_call(#request{}=Request, From, State) ->
+    {noreply, enqueue_request(Request#request{from=From}, State), 0}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({Ref, Reply}, State) ->
+    case lists:keytake(Ref, #request.ref, State#state.running) of
+        {value, Request, Remaining} ->
+            erlang:demonitor(Ref, [flush]),
+            gen_server:reply(Request#request.from, Reply),
+            {noreply, State#state{running=Remaining}, 0};
+        false ->
+            {noreply, State, 0}
+    end;
+
+handle_info({'DOWN', Ref, _, _, Reason}, State) ->
+    case lists:keytake(Ref, #request.ref, State#state.running) of
+        {value, Request, Remaining} ->
+            gen_server:reply(Request#request.from, {'EXIT', Reason}),
+            {noreply, State#state{running=Remaining}, 0};
+        false ->
+            {noreply, State, 0}
+    end;
+
+handle_info(timeout, State) ->
+    {noreply, maybe_submit_request(State)}.
+
+code_change(_Vsn, State, _Extra) ->
+    {ok, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+enqueue_request(#request{class=compaction}=Request, #state{}=State) ->
+    State#state{compaction=queue:in(Request, State#state.compaction)};
+enqueue_request(#request{}=Request, #state{}=State) ->
+    State#state{interactive=queue:in(Request, State#state.interactive)}.
+
+maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State)
+  when length(Running) < Concurrency ->
+    case make_next_request(State) of
+        State ->
+            State;
+        NewState when length(Running) >= Concurrency - 1 ->
+            NewState;
+        NewState ->
+            maybe_submit_request(NewState)
+    end;
+maybe_submit_request(State) ->
+    State.
+
+make_next_request(#state{}=State) ->
+    case {queue:is_empty(State#state.compaction), queue:is_empty(State#state.interactive)} of
+        {true, true} ->
+            State;
+        {true, false} ->
+            choose_next_request(#state.interactive, State);
+        {false, true} ->
+            choose_next_request(#state.compaction, State);
+        {false, false} ->
+            case random:uniform() < State#state.ratio of
+                true ->
+                    choose_next_request(#state.compaction, State);
+                false ->
+                    choose_next_request(#state.interactive, State)
+            end
+    end.
+
+choose_next_request(Index, State) ->
+    case queue:out(element(Index, State)) of
+        {empty, _} ->
+            State;
+        {{value, Request}, Q} ->
+            submit_request(Request, setelement(Index, State, Q))
+    end.
+
+submit_request(#request{}=Request, #state{}=State) ->
+    Ref = erlang:monitor(process, Request#request.fd),
+    Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg},
+    State#state{running = [Request#request{ref=Ref} | State#state.running]}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/1d1cd9ad/src/couchdb/couch_primary_sup.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_primary_sup.erl b/src/couchdb/couch_primary_sup.erl
index 150b92e..63315c3 100644
--- a/src/couchdb/couch_primary_sup.erl
+++ b/src/couchdb/couch_primary_sup.erl
@@ -31,6 +31,12 @@ init([]) ->
             brutal_kill,
             worker,
             [couch_task_status]},
+        {couch_io_regulator,
+            {couch_io_regulator, start_link, []},
+            permanent,
+            brutal_kill,
+            worker,
+            [couch_io_regulator]},
         {couch_server,
             {couch_server, sup_start_link, []},
             permanent,