You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/07/17 19:02:34 UTC

git commit: Initial commit

Repository: couchdb-ioq
Updated Branches:
  refs/heads/master [created] 6293a8b88


Initial commit

This is substantively the work from branch 1775-feature-io-regulator
but with erlang application paraphenalia.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-ioq/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-ioq/commit/6293a8b8
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-ioq/tree/6293a8b8
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-ioq/diff/6293a8b8

Branch: refs/heads/master
Commit: 6293a8b88d1cd67c254ac764a046a1c3a0905be4
Parents: 
Author: Robert Newson <rn...@apache.org>
Authored: Thu Jul 17 17:53:45 2014 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 17 18:01:53 2014 +0100

----------------------------------------------------------------------
 .gitignore      |   2 +
 src/ioq.app.src |  21 +++++++++
 src/ioq.erl     | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++
 src/ioq_app.erl |  21 +++++++++
 src/ioq_sup.erl |  24 ++++++++++
 5 files changed, 194 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-ioq/blob/6293a8b8/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..21cf3d3
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+.rebar
+ebin/

http://git-wip-us.apache.org/repos/asf/couchdb-ioq/blob/6293a8b8/src/ioq.app.src
----------------------------------------------------------------------
diff --git a/src/ioq.app.src b/src/ioq.app.src
new file mode 100644
index 0000000..65ea50d
--- /dev/null
+++ b/src/ioq.app.src
@@ -0,0 +1,21 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application,ioq, [
+    {description, "I/O prioritizing engine"},
+    {vsn, git},
+    {registered,[]},
+    {applications,[kernel,stdlib,config]},
+    {mod,{ioq_app,[]}},
+    {env, []},
+    {modules,[ioq,ioq_app,ioq_sup]}
+]}.

http://git-wip-us.apache.org/repos/asf/couchdb-ioq/blob/6293a8b8/src/ioq.erl
----------------------------------------------------------------------
diff --git a/src/ioq.erl b/src/ioq.erl
new file mode 100644
index 0000000..686a696
--- /dev/null
+++ b/src/ioq.erl
@@ -0,0 +1,126 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq).
+-behaviour(gen_server).
+
+-export([start_link/0, call/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
+
+-record(state, {
+    concurrency=10,
+    ratio,
+    interactive=queue:new(),
+    compaction=queue:new(),
+    running=[]
+}).
+
+-record(request, {
+    fd,
+    msg,
+    class,
+    from,
+    ref
+}).
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+call(Fd, Msg) ->
+    Request = #request{fd=Fd, msg=Msg, class=get(io_class), from=self()},
+    gen_server:call(?MODULE, Request, infinity).
+
+init(_) ->
+    Ratio = list_to_float(config:get("ioq", "ratio", "0.01")),
+    {ok, #state{ratio=Ratio}}.
+
+handle_call(#request{}=Request, From, State) ->
+    {noreply, enqueue_request(Request#request{from=From}, State), 0}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({Ref, Reply}, State) ->
+    case lists:keytake(Ref, #request.ref, State#state.running) of
+        {value, Request, Remaining} ->
+            erlang:demonitor(Ref, [flush]),
+            gen_server:reply(Request#request.from, Reply),
+            {noreply, State#state{running=Remaining}, 0};
+        false ->
+            {noreply, State, 0}
+    end;
+
+handle_info({'DOWN', Ref, _, _, Reason}, State) ->
+    case lists:keytake(Ref, #request.ref, State#state.running) of
+        {value, Request, Remaining} ->
+            gen_server:reply(Request#request.from, {'EXIT', Reason}),
+            {noreply, State#state{running=Remaining}, 0};
+        false ->
+            {noreply, State, 0}
+    end;
+
+handle_info(timeout, State) ->
+    {noreply, maybe_submit_request(State)}.
+
+code_change(_Vsn, State, _Extra) ->
+    {ok, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+enqueue_request(#request{class=compaction}=Request, #state{}=State) ->
+    State#state{compaction=queue:in(Request, State#state.compaction)};
+enqueue_request(#request{}=Request, #state{}=State) ->
+    State#state{interactive=queue:in(Request, State#state.interactive)}.
+
+maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State)
+  when length(Running) < Concurrency ->
+    case make_next_request(State) of
+        State ->
+            State;
+        NewState when length(Running) >= Concurrency - 1 ->
+            NewState;
+        NewState ->
+            maybe_submit_request(NewState)
+    end;
+maybe_submit_request(State) ->
+    State.
+
+make_next_request(#state{}=State) ->
+    case {queue:is_empty(State#state.compaction), queue:is_empty(State#state.interactive)} of
+        {true, true} ->
+            State;
+        {true, false} ->
+            choose_next_request(#state.interactive, State);
+        {false, true} ->
+            choose_next_request(#state.compaction, State);
+        {false, false} ->
+            case random:uniform() < State#state.ratio of
+                true ->
+                    choose_next_request(#state.compaction, State);
+                false ->
+                    choose_next_request(#state.interactive, State)
+            end
+    end.
+
+choose_next_request(Index, State) ->
+    case queue:out(element(Index, State)) of
+        {empty, _} ->
+            State;
+        {{value, Request}, Q} ->
+            submit_request(Request, setelement(Index, State, Q))
+    end.
+
+submit_request(#request{}=Request, #state{}=State) ->
+    Ref = erlang:monitor(process, Request#request.fd),
+    Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg},
+    State#state{running = [Request#request{ref=Ref} | State#state.running]}.

http://git-wip-us.apache.org/repos/asf/couchdb-ioq/blob/6293a8b8/src/ioq_app.erl
----------------------------------------------------------------------
diff --git a/src/ioq_app.erl b/src/ioq_app.erl
new file mode 100644
index 0000000..2e6d75a
--- /dev/null
+++ b/src/ioq_app.erl
@@ -0,0 +1,21 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_app).
+-behaviour(application).
+-export([start/2, stop/1]).
+
+start(_StartType, _StartArgs) ->
+    ioq_sup:start_link().
+
+stop(_State) ->
+    ok.

http://git-wip-us.apache.org/repos/asf/couchdb-ioq/blob/6293a8b8/src/ioq_sup.erl
----------------------------------------------------------------------
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
new file mode 100644
index 0000000..c4d04a9
--- /dev/null
+++ b/src/ioq_sup.erl
@@ -0,0 +1,24 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ioq_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    {ok, { {one_for_one, 5, 10}, [?CHILD(ioq, worker)]}}.