You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2020/12/14 22:05:09 UTC

[couchdb-erlfdb] branch main updated: Avoid sending future ready messages when flushed

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb-erlfdb.git


The following commit(s) were added to refs/heads/main by this push:
     new efef20c  Avoid sending future ready messages when flushed
efef20c is described below

commit efef20c439daa6c24227e840482f82945e5c2437
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Mon Dec 14 15:21:22 2020 -0600

    Avoid sending future ready messages when flushed
    
    There's a small race condition in `erlfdb:flush_future_message` if the
    `is_ready` returns true before the future's callback is invoked to send
    the ready message. This adds an API to silence the message from a future
    to close this race condition.
    
    Fixes apache/couchdb#3294
---
 c_src/main.c       | 39 +++++++++++++++++++++++++++++++++++++++
 c_src/resources.h  |  1 +
 src/erlfdb.erl     |  1 +
 src/erlfdb_nif.erl |  7 +++++++
 4 files changed, 48 insertions(+)

diff --git a/c_src/main.c b/c_src/main.c
index b1db949..4f669f5 100644
--- a/c_src/main.c
+++ b/c_src/main.c
@@ -84,11 +84,15 @@ erlfdb_future_cb(FDBFuture* fdb_future, void* data)
         caller = future->pid_env;
     }
 
+    enif_mutex_lock(future->lock);
+
     if(!future->cancelled) {
         msg = T2(future->msg_env, future->msg_ref, ATOM_ready);
         enif_send(caller, &(future->pid), future->msg_env, msg);
     }
 
+    enif_mutex_unlock(future->lock);
+
     // We're now done with this future which means we need
     // to release our handle to it. See erlfdb_create_future
     // for more on why this happens here.
@@ -114,6 +118,7 @@ erlfdb_create_future(ErlNifEnv* env, FDBFuture* future, ErlFDBFutureType ftype)
     f->pid_env = env;
     f->msg_env = enif_alloc_env();
     f->msg_ref = enif_make_copy(f->msg_env, ref);
+    f->lock = enif_mutex_create("fdb:future_lock");
     f->cancelled = false;
 
     // This resource reference counting dance is a bit
@@ -579,14 +584,47 @@ erlfdb_future_cancel(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
     }
     future = (ErlFDBFuture*) res;
 
+    enif_mutex_lock(future->lock);
+
     future->cancelled = true;
     fdb_future_cancel(future->future);
 
+    enif_mutex_unlock(future->lock);
+
     return ATOM_ok;
 }
 
 
 static ERL_NIF_TERM
+erlfdb_future_silence(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
+{
+    ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
+    ErlFDBFuture* future;
+    void* res;
+
+    if(st->lib_state != ErlFDB_CONNECTED) {
+        return enif_make_badarg(env);
+    }
+
+    if(argc != 1) {
+        return enif_make_badarg(env);
+    }
+
+    if(!enif_get_resource(env, argv[0], ErlFDBFutureRes, &res)) {
+        return enif_make_badarg(env);
+    }
+    future = (ErlFDBFuture*) res;
+
+    enif_mutex_lock(future->lock);
+
+    future->cancelled = true;
+
+    enif_mutex_unlock(future->lock);
+
+    return ATOM_ok;
+}
+
+static ERL_NIF_TERM
 erlfdb_future_is_ready(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
 {
     ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);
@@ -2158,6 +2196,7 @@ static ErlNifFunc funcs[] =
     NIF_FUNC(erlfdb_setup_network, 0),
 
     NIF_FUNC(erlfdb_future_cancel, 1),
+    NIF_FUNC(erlfdb_future_silence, 1),
     NIF_FUNC(erlfdb_future_is_ready, 1),
     NIF_FUNC(erlfdb_future_get_error, 1),
     NIF_FUNC(erlfdb_future_get, 1),
diff --git a/c_src/resources.h b/c_src/resources.h
index 6694a99..b25a43f 100644
--- a/c_src/resources.h
+++ b/c_src/resources.h
@@ -44,6 +44,7 @@ typedef struct _ErlFDBFuture
     ErlNifEnv* pid_env;
     ErlNifEnv* msg_env;
     ERL_NIF_TERM msg_ref;
+    ErlNifMutex* lock;
     bool cancelled;
 } ErlFDBFuture;
 
diff --git a/src/erlfdb.erl b/src/erlfdb.erl
index d2ddb1f..506b4b0 100644
--- a/src/erlfdb.erl
+++ b/src/erlfdb.erl
@@ -789,6 +789,7 @@ options_to_fold_st(StartKey, EndKey, Options) ->
 
 
 flush_future_message(?IS_FUTURE = Future) ->
+    erlfdb_nif:future_silence(Future),
     {erlfdb_future, MsgRef, _Res} = Future,
     receive
         {MsgRef, ready} -> ok
diff --git a/src/erlfdb_nif.erl b/src/erlfdb_nif.erl
index 7ec8e52..ddb7d0f 100644
--- a/src/erlfdb_nif.erl
+++ b/src/erlfdb_nif.erl
@@ -21,6 +21,7 @@
     get_max_api_version/0,
 
     future_cancel/1,
+    future_silence/1,
     future_is_ready/1,
     future_get_error/1,
     future_get/1,
@@ -195,6 +196,11 @@ future_cancel({erlfdb_future, _Ref, Ft}) ->
     erlfdb_future_cancel(Ft).
 
 
+-spec future_silence(future()) -> ok.
+future_silence({erlfdb_future, _Ref, Ft}) ->
+    erlfdb_future_silence(Ft).
+
+
 -spec future_is_ready(future()) -> boolean().
 future_is_ready({erlfdb_future, _Ref, Ft}) ->
     erlfdb_future_is_ready(Ft).
@@ -527,6 +533,7 @@ erlfdb_setup_network() -> ?NOT_LOADED.
 
 % Futures
 erlfdb_future_cancel(_Future) -> ?NOT_LOADED.
+erlfdb_future_silence(_Future) -> ?NOT_LOADED.
 erlfdb_future_is_ready(_Future) -> ?NOT_LOADED.
 erlfdb_future_get_error(_Future) -> ?NOT_LOADED.
 erlfdb_future_get(_Future) -> ?NOT_LOADED.