You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/06/06 18:32:50 UTC

[couchdb] branch prototype/fdb-layer updated (767c83d -> 6835e18)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


    from 767c83d  Start switching chttpd HTTP endpoints to fabric2
     new e0b2dc1  Remove debug logging
     new 6835e18  Implement attachment compression

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/chttpd/src/chttpd_changes.erl     |   8 ---
 src/couch/src/couch_att.erl           | 109 +++++++++++++++++++++++-----------
 src/fabric/src/fabric2_events.erl     |   6 +-
 test/elixir/test/replication_test.exs |   7 ++-
 4 files changed, 78 insertions(+), 52 deletions(-)


[couchdb] 01/02: Remove debug logging

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit e0b2dc16e2463969eaf7e715b88e130216fba6bb
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Jun 6 11:56:58 2019 -0500

    Remove debug logging
---
 src/chttpd/src/chttpd_changes.erl | 8 --------
 src/fabric/src/fabric2_events.erl | 6 +-----
 2 files changed, 1 insertion(+), 13 deletions(-)

diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl
index 30caab2..620f68d 100644
--- a/src/chttpd/src/chttpd_changes.erl
+++ b/src/chttpd/src/chttpd_changes.erl
@@ -61,7 +61,6 @@ handle_db_changes(Args, Req, Db) ->
     handle_changes(Args, Req, Db, db).
 
 handle_changes(Args1, Req, Db, Type) ->
-    ReqPid = chttpd:header_value(Req, "XKCD", "<unknown>"),
     #changes_args{
         style = Style,
         filter = FilterName,
@@ -69,7 +68,6 @@ handle_changes(Args1, Req, Db, Type) ->
         dir = Dir,
         since = Since
     } = Args1,
-    couch_log:error("XKCD: STARTING CHANGES FEED ~p for ~s : ~p", [self(), ReqPid, Since]),
     Filter = configure_filter(FilterName, Style, Req, Db),
     Args = Args1#changes_args{filter_fun = Filter},
     % The type of changes feed depends on the supplied filter. If the query is
@@ -820,7 +818,6 @@ changes_enumerator(Change0, Acc) ->
             stop -> stop
         end,
         reset_heartbeat(),
-        couch_log:error("XKCD: CHANGE SEQ: ~p", [Seq]),
         {RealGo, Acc#changes_acc{
             seq = Seq,
             user_acc = UserAcc2,
@@ -919,22 +916,17 @@ deleted_item(_) -> [].
 
 % waits for a updated msg, if there are multiple msgs, collects them.
 wait_updated(Timeout, TimeoutFun, UserAcc) ->
-    couch_log:error("XKCD: WAITING FOR UPDATE", []),
     receive
     updated ->
-        couch_log:error("XKCD: GOT UPDATED", []),
         get_rest_updated(UserAcc);
     deleted ->
-        couch_log:error("XKCD: DB DELETED", []),
         {stop, UserAcc}
     after Timeout ->
         {Go, UserAcc2} = TimeoutFun(UserAcc),
         case Go of
         ok ->
-            couch_log:error("XKCD: WAIT UPDATED TIMEOUT, RETRY", []),
             ?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2);
         stop ->
-            couch_log:error("XKCD: WAIT UPDATED TIMEOUT STOP", []),
             {stop, UserAcc2}
         end
     end.
diff --git a/src/fabric/src/fabric2_events.erl b/src/fabric/src/fabric2_events.erl
index a571714..094ca2f 100644
--- a/src/fabric/src/fabric2_events.erl
+++ b/src/fabric/src/fabric2_events.erl
@@ -43,11 +43,9 @@ stop_listener(Pid) ->
 init(Parent, DbName, Mod, Fun, St) ->
     {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
     Since = fabric2_db:get_update_seq(Db),
-    couch_log:error("XKCD: START LISTENER: ~s : ~p for ~p", [DbName, Since, Parent]),
     erlang:monitor(process, Parent),
     Parent ! {self(), initialized},
-    poll(DbName, Since, Mod, Fun, St),
-    couch_log:error("XKCD: STOP LISTENER for ~p", [Parent]).
+    poll(DbName, Since, Mod, Fun, St).
 
 
 poll(DbName, Since, Mod, Fun, St) ->
@@ -56,10 +54,8 @@ poll(DbName, Since, Mod, Fun, St) ->
             {ok, Db} ->
                 case fabric2_db:get_update_seq(Db) of
                     Since ->
-                        couch_log:error("XKCD: NO UPDATE: ~s :: ~p", [DbName, Since]),
                         {{ok, St}, Since};
                     Other ->
-                        couch_log:error("XKCD: UPDATED: ~s :: ~p -> ~p", [DbName, Since, Other]),
                         {Mod:Fun(DbName, updated, St), Other}
                 end;
             Error ->


[couchdb] 02/02: Implement attachment compression

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 6835e18e9db23833ae178488efdb42edc9aecc0d
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Thu Jun 6 13:30:01 2019 -0500

    Implement attachment compression
    
    This still holds all attachment data in RAM which we'll have to revisit
    at some point.
---
 src/couch/src/couch_att.erl           | 109 +++++++++++++++++++++++-----------
 test/elixir/test/replication_test.exs |   7 ++-
 2 files changed, 77 insertions(+), 39 deletions(-)

diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl
index 0dc5fa5..90d3644 100644
--- a/src/couch/src/couch_att.erl
+++ b/src/couch/src/couch_att.erl
@@ -383,8 +383,8 @@ flush(Db, DocId, Att1) ->
 
     % If we were sent a gzip'ed attachment with no
     % length data, we have to set it here.
-    Att3 = case AttLen of
-        undefined -> store(att_len, DiskLen, Att2);
+    Att3 = case DiskLen of
+        undefined -> store(disk_len, AttLen, Att2);
         _ -> Att2
     end,
 
@@ -400,12 +400,13 @@ flush(Db, DocId, Att1) ->
             % Already flushed
             Att1;
         _ when is_binary(Data) ->
-            IdentMd5 = get_identity_md5(Data, fetch(encoding, Att4)),
+            DataMd5 = couch_hash:md5_hash(Data),
             if ReqMd5 == undefined -> ok; true ->
-                couch_util:check_md5(IdentMd5, ReqMd5)
+                couch_util:check_md5(DataMd5, ReqMd5)
             end,
-            Att5 = store(md5, IdentMd5, Att4),
-            fabric2_db:write_attachment(Db, DocId, Att5)
+            Att5 = store(md5, DataMd5, Att4),
+            Att6 = maybe_compress(Att5),
+            fabric2_db:write_attachment(Db, DocId, Att6)
     end.
 
 
@@ -451,7 +452,7 @@ read_data(Fun, Att) when is_function(Fun) ->
                     end,
                     Props0 = [
                         {data, iolist_to_binary(lists:reverse(Acc))},
-                        {disk_len, Len}
+                        {att_len, Len}
                     ],
                     Props1 = if InMd5 /= md5_in_footer -> Props0; true ->
                         [{md5, Md5} | Props0]
@@ -473,7 +474,7 @@ read_streamed_attachment(Att, _F, 0, Acc) ->
     Bin = iolist_to_binary(lists:reverse(Acc)),
     store([
         {data, Bin},
-        {disk_len, size(Bin)}
+        {att_len, size(Bin)}
     ], Att);
 
 read_streamed_attachment(_Att, _F, LenLeft, _Acc) when LenLeft < 0 ->
@@ -550,8 +551,23 @@ range_foldl(Att, From, To, Fun, Acc) ->
     range_foldl(Bin, From, To, Fun, Acc).
 
 
-foldl_decode(_Att, _Fun, _Acc) ->
-    erlang:error(not_supported).
+foldl_decode(Att, Fun, Acc) ->
+    [Encoding, Data] = fetch([encoding, data], Att),
+    case {Encoding, Data} of
+        {gzip, {loc, Db, DocId, AttId}} ->
+            NoTxDb = Db#{tx := undefined},
+            Bin = fabric2_db:read_attachment(NoTxDb, DocId, AttId),
+            foldl_decode(store(data, Bin, Att), Fun, Acc);
+        {gzip, _} when is_binary(Data) ->
+            Z = zlib:open(),
+            ok = zlib:inflateInit(Z, 16 + 15),
+            Inflated = iolist_to_binary(zlib:inflate(Z, Data)),
+            ok = zlib:inflateEnd(Z),
+            ok = zlib:close(Z),
+            foldl(Inflated, Att, Fun, Acc);
+        _ ->
+            foldl(Att, Fun, Acc)
+    end.
 
 
 to_binary(Att) ->
@@ -563,7 +579,8 @@ to_binary(Bin, _Att) when is_binary(Bin) ->
 to_binary(Iolist, _Att) when is_list(Iolist) ->
     iolist_to_binary(Iolist);
 to_binary({loc, Db, DocId, AttId}, _Att) ->
-    fabric2_db:read_attachmet(Db, DocId, AttId);
+    NoTxDb = Db#{tx := undefined},
+    fabric2_db:read_attachment(NoTxDb, DocId, AttId);
 to_binary(DataFun, Att) when is_function(DataFun)->
     Len = fetch(att_len, Att),
     iolist_to_binary(
@@ -585,15 +602,53 @@ fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
     fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
 
 
-get_identity_md5(Bin, gzip) ->
+maybe_compress(Att) ->
+    [Encoding, Type] = fetch([encoding, type], Att),
+    IsCompressible = is_compressible(Type),
+    CompLevel = config:get_integer("attachments", "compression_level", 0),
+    case Encoding of
+        identity when IsCompressible, CompLevel >= 1, CompLevel =< 9 ->
+            compress(Att, CompLevel);
+        _ ->
+            Att
+    end.
+
+
+compress(Att, Level) ->
+    Data = fetch(data, Att),
+
     Z = zlib:open(),
-    ok = zlib:inflateInit(Z, 16 + 15),
-    Inflated = zlib:inflate(Z, Bin),
-    ok = zlib:inflateEnd(Z),
+    % 15 = ?MAX_WBITS (defined in the zlib module)
+    % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
+    ok = zlib:deflateInit(Z, Level, deflated, 16 + 15, 8, default),
+    CompData = iolist_to_binary(zlib:deflate(Z, Data, finish)),
+    ok = zlib:deflateEnd(Z),
     ok = zlib:close(Z),
-    couch_hash:md5_hash(Inflated);
-get_identity_md5(Bin, _) ->
-    couch_hash:md5_hash(Bin).
+
+    store([
+        {att_len, size(CompData)},
+        {md5, couch_hash:md5_hash(CompData)},
+        {data, CompData},
+        {encoding, gzip}
+    ], Att).
+
+
+is_compressible(Type) when is_binary(Type) ->
+    is_compressible(binary_to_list(Type));
+is_compressible(Type) ->
+    TypeExpList = re:split(
+        config:get("attachments", "compressible_types", ""),
+        "\\s*,\\s*",
+        [{return, list}]
+    ),
+    lists:any(
+        fun(TypeExp) ->
+            Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
+                "(?:\\s*;.*?)?\\s*", $$],
+            re:run(Type, Regexp, [caseless]) =/= nomatch
+        end,
+        [T || T <- TypeExpList, T /= []]
+    ).
 
 
 max_attachment_size() ->
@@ -612,24 +667,6 @@ validate_attachment_size(_AttName, _AttSize, _MAxAttSize) ->
     ok.
 
 
-%% is_compressible(Type) when is_binary(Type) ->
-%%     is_compressible(binary_to_list(Type));
-%% is_compressible(Type) ->
-%%     TypeExpList = re:split(
-%%         config:get("attachments", "compressible_types", ""),
-%%         "\\s*,\\s*",
-%%         [{return, list}]
-%%     ),
-%%     lists:any(
-%%         fun(TypeExp) ->
-%%             Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
-%%                 "(?:\\s*;.*?)?\\s*", $$],
-%%             re:run(Type, Regexp, [caseless]) =/= nomatch
-%%         end,
-%%         [T || T <- TypeExpList, T /= []]
-%%     ).
-
-
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 
diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs
index e98775f..3f0045b 100644
--- a/test/elixir/test/replication_test.exs
+++ b/test/elixir/test/replication_test.exs
@@ -717,9 +717,10 @@ defmodule ReplicationTest do
 
       assert tgt_info["doc_count"] == src_info["doc_count"]
 
-      src_shards = seq_to_shards(src_info["update_seq"])
-      tgt_shards = seq_to_shards(tgt_info["update_seq"])
-      assert tgt_shards == src_shards
+      # This assertion is no longer valid
+      # src_shards = seq_to_shards(src_info["update_seq"])
+      # tgt_shards = seq_to_shards(tgt_info["update_seq"])
+      # assert tgt_shards == src_shards
     end)
   end