You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ga...@apache.org on 2018/08/08 17:24:42 UTC

[couchdb] branch master updated (d98e3e1 -> a6bc72e)

This is an automated email from the ASF dual-hosted git repository.

garren pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


    from d98e3e1  Merge pull request #1432 from cloudant/support-callback-module-data-provider
     new a7f2aa5  Add rexi ping message
     new a6bc72e  Move mango selector matching to the shard level

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/fabric/src/fabric_rpc.erl           |  19 ++++--
 src/mango/src/mango_cursor_view.erl     | 112 +++++++++++++++++++++++++++-----
 src/mango/src/mango_execution_stats.erl |   7 +-
 src/mango/test/20-no-timeout-test.py    |  38 +++++++++++
 src/rexi/src/rexi.erl                   |   9 +++
 src/rexi/src/rexi_utils.erl             |   2 +
 6 files changed, 167 insertions(+), 20 deletions(-)
 create mode 100644 src/mango/test/20-no-timeout-test.py


[couchdb] 02/02: Move mango selector matching to the shard level

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a6bc72e76c56a1befa8675b06016ecda46ef3a2d
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Aug 8 09:56:10 2018 +0200

    Move mango selector matching to the shard level
    
    This moves the Mango selector matching down to the shard level.
    this would mean that the document is retrieved from the index and
    matched against the selector before being sent to the coordinator node.
    This reduces the network traffic for a mango query
    
    Co-authored-by: Paul J. Davis <pa...@gmail.com>
    Co-authored-by: Garren Smith <ga...@gmail.com>
---
 src/fabric/src/fabric_rpc.erl           |  19 ++++--
 src/mango/src/mango_cursor_view.erl     | 112 +++++++++++++++++++++++++++-----
 src/mango/src/mango_execution_stats.erl |   7 +-
 src/mango/test/20-no-timeout-test.py    |  38 +++++++++++
 4 files changed, 156 insertions(+), 20 deletions(-)

diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 60526f4..ef4092d 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -100,8 +100,8 @@ all_docs(DbName, Options, Args0) ->
             set_io_priority(DbName, Options),
             Args = fix_skip_and_limit(Args1),
             {ok, Db} = get_or_create_db(DbName, Options),
-            VAcc0 = #vacc{db=Db},
-            couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0)
+            CB = get_view_cb(Args),
+            couch_mrview:query_all_docs(Db, Args, CB, Args)
     end.
 
 update_mrview(DbName, {DDocId, Rev}, ViewName, Args0) ->
@@ -124,8 +124,8 @@ map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
-    VAcc0 = #vacc{db=Db},
-    couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
+    CB = get_view_cb(Args),
+    couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args).
 
 %% @equiv reduce_view(DbName, DDoc, ViewName, Args0)
 reduce_view(DbName, DDocInfo, ViewName, Args0) ->
@@ -303,6 +303,17 @@ get_or_create_db(DbName, Options) ->
     couch_db:open_int(DbName, [{create_if_missing, true} | Options]).
 
 
+get_view_cb(#mrargs{extra = Options}) ->
+    case couch_util:get_value(callback, Options) of
+        {Mod, Fun} when is_atom(Mod), is_atom(Fun) ->
+            fun Mod:Fun/2;
+        _ ->
+            fun view_cb/2
+    end;
+get_view_cb(_) ->
+    fun view_cb/2.
+
+
 view_cb({meta, Meta}, Acc) ->
     % Map function starting
     ok = rexi:stream2({meta, Meta}),
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl
index dbea36e..51ec68c 100644
--- a/src/mango/src/mango_cursor_view.erl
+++ b/src/mango/src/mango_cursor_view.erl
@@ -19,6 +19,7 @@
 ]).
 
 -export([
+    view_cb/2,
     handle_message/2,
     handle_all_docs_message/2,
     composite_indexes/2,
@@ -28,9 +29,13 @@
 
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("fabric/include/fabric.hrl").
+
 -include("mango_cursor.hrl").
 -include("mango_idx_view.hrl").
 
+-define(HEARTBEAT_INTERVAL_IN_USEC, 4000000).
+
 create(Db, Indexes, Selector, Opts) ->
     FieldRanges = mango_idx_view:field_ranges(Selector),
     Composited = composite_indexes(Indexes, FieldRanges),
@@ -93,13 +98,14 @@ maybe_replace_max_json([H | T] = EndKey) when is_list(EndKey) ->
 maybe_replace_max_json(EndKey) ->
     EndKey.
 
-base_args(#cursor{index = Idx} = Cursor) ->
+base_args(#cursor{index = Idx, selector = Selector} = Cursor) ->
     #mrargs{
         view_type = map,
         reduce = false,
         start_key = mango_idx:start_key(Idx, Cursor#cursor.ranges),
         end_key = mango_idx:end_key(Idx, Cursor#cursor.ranges),
-        include_docs = true
+        include_docs = true,
+        extra = [{callback, {?MODULE, view_cb}}, {selector, Selector}]
     }.
 
 
@@ -210,22 +216,84 @@ choose_best_index(_DbName, IndexRanges) ->
     {SelectedIndex, SelectedIndexRanges}.
 
 
+view_cb({meta, Meta}, Acc) ->
+    % Map function starting
+    put(mango_docs_examined, 0),
+    set_mango_msg_timestamp(),
+    ok = rexi:stream2({meta, Meta}),
+    {ok, Acc};
+view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
+    ViewRow =  #view_row{
+        id = couch_util:get_value(id, Row),
+        key = couch_util:get_value(key, Row),
+        doc = couch_util:get_value(doc, Row)
+    },
+    case ViewRow#view_row.doc of
+        undefined ->
+            ViewRow2 = ViewRow#view_row{
+                value = couch_util:get_value(value, Row)
+            },
+            ok = rexi:stream2(ViewRow2),
+            put(mango_docs_examined, 0),
+            set_mango_msg_timestamp();
+        Doc ->
+            Selector = couch_util:get_value(selector, Options),
+            case mango_selector:match(Selector, Doc) of
+                true ->
+                    ViewRow2 = ViewRow#view_row{
+                        value = get(mango_docs_examined) + 1
+                    },
+                    ok = rexi:stream2(ViewRow2),
+                    put(mango_docs_examined, 0),
+                    set_mango_msg_timestamp();
+                false ->
+                    put(mango_docs_examined, get(mango_docs_examined) + 1),
+                    maybe_send_mango_ping()
+            end
+        end,
+    {ok, Acc};
+view_cb(complete, Acc) ->
+    % Finish view output
+    ok = rexi:stream_last(complete),
+    {ok, Acc};
+view_cb(ok, ddoc_updated) ->
+    rexi:reply({ok, ddoc_updated}).
+
+
+maybe_send_mango_ping() ->
+    Current = os:timestamp(),
+    LastPing = get(mango_last_msg_timestamp),
+    % Fabric will timeout if it has not heard a response from a worker node
+    % after 5 seconds. Send a ping every 4 seconds so the timeout doesn't happen.
+    case timer:now_diff(Current, LastPing) > ?HEARTBEAT_INTERVAL_IN_USEC of
+        false ->
+            ok;
+        true ->
+            rexi:ping(),
+            set_mango_msg_timestamp()
+    end.
+
+
+set_mango_msg_timestamp() ->
+    put(mango_last_msg_timestamp, os:timestamp()).
+
+
 handle_message({meta, _}, Cursor) ->
     {ok, Cursor};
 handle_message({row, Props}, Cursor) ->
-    case doc_member(Cursor#cursor.db, Props, Cursor#cursor.opts, Cursor#cursor.execution_stats) of
+    case doc_member(Cursor, Props) of
         {ok, Doc, {execution_stats, ExecutionStats1}} ->
             Cursor1 = Cursor#cursor {
                 execution_stats = ExecutionStats1
             },
-            case mango_selector:match(Cursor1#cursor.selector, Doc) of
-                true ->
-                    Cursor2 = update_bookmark_keys(Cursor1, Props),
-                    FinalDoc = mango_fields:extract(Doc, Cursor2#cursor.fields),
-                    handle_doc(Cursor2, FinalDoc);
-                false ->
-                    {ok, Cursor1}
-            end;
+            Cursor2 = update_bookmark_keys(Cursor1, Props),
+            FinalDoc = mango_fields:extract(Doc, Cursor2#cursor.fields),
+            handle_doc(Cursor2, FinalDoc);
+        {no_match, _, {execution_stats, ExecutionStats1}} ->
+            Cursor1 = Cursor#cursor {
+                execution_stats = ExecutionStats1
+            },
+            {ok, Cursor1};
         Error ->
             couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]),
             {ok, Cursor}
@@ -332,17 +400,31 @@ apply_opts([{_, _} | Rest], Args) ->
     apply_opts(Rest, Args).
 
 
-doc_member(Db, RowProps, Opts, ExecutionStats) ->
+doc_member(Cursor, RowProps) ->
+    Db = Cursor#cursor.db, 
+    Opts = Cursor#cursor.opts,
+    ExecutionStats = Cursor#cursor.execution_stats,
+    Selector = Cursor#cursor.selector,
+    Incr = case couch_util:get_value(value, RowProps) of
+        N when is_integer(N) -> N;
+        _ -> 1
+    end,
     case couch_util:get_value(doc, RowProps) of
         {DocProps} ->
-            ExecutionStats1 = mango_execution_stats:incr_docs_examined(ExecutionStats),
+            ExecutionStats1 = mango_execution_stats:incr_docs_examined(ExecutionStats, Incr),
             {ok, {DocProps}, {execution_stats, ExecutionStats1}};
         undefined ->
             ExecutionStats1 = mango_execution_stats:incr_quorum_docs_examined(ExecutionStats),
             Id = couch_util:get_value(id, RowProps),
             case mango_util:defer(fabric, open_doc, [Db, Id, Opts]) of
-                {ok, #doc{}=Doc} ->
-                    {ok, couch_doc:to_json_obj(Doc, []), {execution_stats, ExecutionStats1}};
+                {ok, #doc{}=DocProps} ->
+                    Doc = couch_doc:to_json_obj(DocProps, []),
+                    case mango_selector:match(Selector, Doc) of
+                        true ->
+                            {ok, Doc, {execution_stats, ExecutionStats1}};
+                        false ->
+                            {no_match, Doc, {execution_stats, ExecutionStats1}}
+                    end;
                 Else ->
                     Else
             end
diff --git a/src/mango/src/mango_execution_stats.erl b/src/mango/src/mango_execution_stats.erl
index afdb417..7e8afd7 100644
--- a/src/mango/src/mango_execution_stats.erl
+++ b/src/mango/src/mango_execution_stats.erl
@@ -17,6 +17,7 @@
     to_json/1,
     incr_keys_examined/1,
     incr_docs_examined/1,
+    incr_docs_examined/2,
     incr_quorum_docs_examined/1,
     incr_results_returned/1,
     log_start/1,
@@ -45,8 +46,12 @@ incr_keys_examined(Stats) ->
 
 
 incr_docs_examined(Stats) ->
+    incr_docs_examined(Stats, 1).
+
+
+incr_docs_examined(Stats, N) ->
     Stats#execution_stats {
-        totalDocsExamined = Stats#execution_stats.totalDocsExamined + 1
+        totalDocsExamined = Stats#execution_stats.totalDocsExamined + N
     }.
 
 
diff --git a/src/mango/test/20-no-timeout-test.py b/src/mango/test/20-no-timeout-test.py
new file mode 100644
index 0000000..93dc146
--- /dev/null
+++ b/src/mango/test/20-no-timeout-test.py
@@ -0,0 +1,38 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+import mango
+import copy
+import unittest
+
+class LongRunningMangoTest(mango.DbPerClass):
+
+    def setUp(self):
+        self.db.recreate()
+        docs = []
+        for i in range(100000):
+            docs.append({
+                "_id": str(i),
+                "another": "field"
+            })
+            if i % 20000 == 0:
+                self.db.save_docs(docs)
+                docs = []
+  
+  # This test should run to completion and not timeout
+    def test_query_does_not_time_out(self):
+        selector = {
+        "_id": {"$gt": 0},
+        "another": "wrong"
+        }
+        docs = self.db.find(selector)
+        self.assertEqual(len(docs), 0)


[couchdb] 01/02: Add rexi ping message

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

garren pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit a7f2aa5175c8fad8f946c3d2ff79558b74b8ee18
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Aug 8 09:53:47 2018 +0200

    Add rexi ping message
    
    Add a ping message to rexi to avoid any long running operations from
    timing out. Long running operations at the node level can exceed the
    fabric timeout and be cancelled. Sending a ping message back will
    stop that from happening.
---
 src/rexi/src/rexi.erl       | 9 +++++++++
 src/rexi/src/rexi_utils.erl | 2 ++
 2 files changed, 11 insertions(+)

diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index fea4d64..f774dc9 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -19,6 +19,7 @@
 -export([stream_start/1, stream_cancel/1]).
 -export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
 -export([stream2/1, stream2/2, stream2/3, stream_last/1, stream_last/2]).
+-export([ping/0]).
 
 -include_lib("rexi/include/rexi.hrl").
 
@@ -233,6 +234,14 @@ stream_ack(Client) ->
 stream_ack(Client, N) ->
     erlang:send(Client, {rexi_ack, N}).
 
+
+%% Sends a ping message to the coordinator. This is for long running
+%% operations on a node that could exceed the rexi timeout
+ping() -> 
+    {Caller, _} = get(rexi_from),
+    erlang:send(Caller, {rexi, '$rexi_ping'}).
+
+
 %% internal functions %%
 
 cast_msg(Msg) -> {'$gen_cast', Msg}.
diff --git a/src/rexi/src/rexi_utils.erl b/src/rexi/src/rexi_utils.erl
index e3eaa6f..11dbb25 100644
--- a/src/rexi/src/rexi_utils.erl
+++ b/src/rexi/src/rexi_utils.erl
@@ -81,6 +81,8 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
         Worker ->
             Fun(Msg, {Worker, From}, Acc0)
         end;
+    {rexi, '$rexi_ping'} ->
+        {ok, Acc0};
     {Ref, Msg} ->
         case lists:keyfind(Ref, Keypos, RefList) of
         false ->