You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2018/01/24 21:16:05 UTC

[couchdb] branch elixir-suite-davisp updated: [WIP] Dirty grab bag commit for now

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch elixir-suite-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/elixir-suite-davisp by this push:
     new bd80f85  [WIP] Dirty grab bag commit for now
bd80f85 is described below

commit bd80f8545b8776999ba446363e63e21d529c038e
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Wed Jan 24 15:14:54 2018 -0600

    [WIP] Dirty grab bag commit for now
---
 test/elixir/lib/couch.ex              |  84 ++++++++++++++-
 test/elixir/test/replication_test.exs | 197 +++++++++++++++++++++++++---------
 test/elixir/test/test_helper.exs      |  22 ++--
 3 files changed, 237 insertions(+), 66 deletions(-)

diff --git a/test/elixir/lib/couch.ex b/test/elixir/lib/couch.ex
index 8ad4821..fd67e13 100644
--- a/test/elixir/lib/couch.ex
+++ b/test/elixir/lib/couch.ex
@@ -1,3 +1,35 @@
+defmodule Couch.Session do
+  @enforce_keys [:cookie]
+  defstruct [:cookie]
+
+  def new(cookie) do
+    %Couch.Session{cookie: cookie}
+  end
+
+  def get(sess, url, opts \\ []), do: go(sess, :get, url, opts)
+  def get!(sess, url, opts \\ []), do: go(sess, :get, url, opts)
+  def put(sess, url, opts \\ []), do: go(sess, :put, url, opts)
+  def put!(sess, url, opts \\ []), do: go(sess, :put, url, opts)
+  def post(sess, url, opts \\ []), do: go(sess, :post, url, opts)
+  def post!(sess, url, opts \\ []), do: go(sess, :post, url, opts)
+  def delete(sess, url, opts \\ []), do: go(sess, :delete, url, opts)
+  def delete!(sess, url, opts \\ []), do: go(sess, :delete, url, opts)
+
+  # Skipping head/patch/options for YAGNI. Feel free to add
+  # if the need arises.
+
+  def go(%Couch.Session{} = sess, method, url, opts) do
+    opts = Keyword.merge(opts, [cookie: sess.cookie])
+    Couch.request(method, url, opts)
+  end
+
+  def go!(%Couch.Session{} = sess, method, url, opts) do
+    opts = Keyword.merge(opts, [cookie: sess.cookie])
+    Couch.request!(method, url, opts)
+  end
+end
+
+
 defmodule Couch do
   use HTTPotion.Base
 
@@ -9,17 +41,28 @@ defmodule Couch do
     "http://localhost:15984" <> url
   end
 
-  def process_request_headers(headers) do
+  def process_request_headers(headers, options) do
     headers = Keyword.put(headers, :"User-Agent", "couch-potion")
-    if headers[:"Content-Type"] do
+    headers = if headers[:"Content-Type"] do
       headers
     else
       Keyword.put(headers, :"Content-Type", "application/json")
     end
+    case Keyword.get options, :cookie do
+      nil ->
+        headers
+      cookie ->
+        Keyword.put headers, :"Cookie", cookie
+    end
   end
 
+
   def process_options(options) do
-    Dict.put options, :basic_auth, {"adm", "pass"}
+    if Keyword.get(options, :cookie) == nil do
+      Keyword.put(options, :basic_auth, {"adm", "pass"})
+    else
+      options
+    end
   end
 
   def process_request_body(body) do
@@ -41,7 +84,9 @@ defmodule Couch do
   def login(user, pass) do
     resp = Couch.post("/_session", body: %{:username => user, :password => pass})
     true = resp.body["ok"]
-    resp.body
+    cookie = resp.headers[:'set-cookie']
+    [token | _] = String.split(cookie, ";")
+    %Couch.Session{cookie: token}
   end
 
   # HACK: this is here until this commit lands in a release
@@ -72,4 +117,35 @@ defmodule Couch do
         %HTTPotion.ErrorResponse{ message: error_to_string(reason)}
     end
   end
+
+  # Anther HACK: Until we can get process_request_headers/2 merged
+  # upstream.
+  @spec process_arguments(atom, String.t, [{atom(), any()}]) :: %{}
+  defp process_arguments(method, url, options) do
+    options    = process_options(options)
+
+    body       = Keyword.get(options, :body, "")
+    headers    = Keyword.merge Application.get_env(:httpotion, :default_headers, []), Keyword.get(options, :headers, [])
+    timeout    = Keyword.get(options, :timeout, Application.get_env(:httpotion, :default_timeout, 5000))
+    ib_options = Keyword.merge Application.get_env(:httpotion, :default_ibrowse, []), Keyword.get(options, :ibrowse, [])
+    follow_redirects = Keyword.get(options, :follow_redirects, Application.get_env(:httpotion, :default_follow_redirects, false))
+
+    ib_options = if stream_to = Keyword.get(options, :stream_to), do: Keyword.put(ib_options, :stream_to, spawn(__MODULE__, :transformer, [stream_to, method, url, options])), else: ib_options
+    ib_options = if user_password = Keyword.get(options, :basic_auth) do
+      {user, password} = user_password
+      Keyword.put(ib_options, :basic_auth, { to_charlist(user), to_charlist(password) })
+    else
+      ib_options
+    end
+
+    %{
+      method:     method,
+      url:        url |> to_string |> process_url(options) |> to_charlist,
+      body:       body |> process_request_body,
+      headers:    headers |> process_request_headers(options) |> Enum.map(fn ({k, v}) -> { to_charlist(k), to_charlist(v) } end),
+      timeout:    timeout,
+      ib_options: ib_options,
+      follow_redirects: follow_redirects
+    }
+  end
 end
diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs
index 4753325..12aed04 100644
--- a/test/elixir/test/replication_test.exs
+++ b/test/elixir/test/replication_test.exs
@@ -99,12 +99,28 @@ defmodule ReplicationTest do
     # test "replication by doc ids - #{name}" do
     #   run_by_id_repl(@src_prefix, @tgt_prefix)
     # end
+    #
+    # @tag config: [
+    #   {"replicator", "startup_jitter", "0"}
+    # ]
+    # test "continuous replication - #{name}" do
+    #   run_continuous_repl(@src_prefix, @tgt_prefix)
+    # end
+    #
+    # @tag config: [
+    #   {"replicator", "startup_jitter", "0"},
+    #   {"attachments", "compression_level", "8"},
+    #   {"attachments", "compressible_types", "text/*"}
+    # ]
+    # test "compressed attachment replication - #{name}" do
+    #   run_compressed_att_repl(@src_prefix, @tgt_prefix)
+    # end
 
     @tag config: [
       {"replicator", "startup_jitter", "0"}
     ]
-    test "continuous replication - #{name}" do
-      run_continuous_repl(@src_prefix, @tgt_prefix)
+    test "non-admin user on target - #{name}" do
+      run_non_admin_target_user_repl(@src_prefix, @tgt_prefix)
     end
   end)
 
@@ -1034,20 +1050,19 @@ defmodule ReplicationTest do
 
     ddoc = %{
       "_id" => "_design/mydesign",
-      :language => "javascript",
-      :filters => %{
-        :myfilter => "function(doc, req) { return true; }"
+      "language" => "javascript",
+      "filters" => %{
+        "myfilter" => "function(doc, req) { return true; }"
       }
     }
     docs = make_docs(1..25)
     docs = save_docs(src_db_name, docs ++ [ddoc])
 
     att1_data = get_att1_data()
-    att2_data = get_att2_data()
 
     docs = for doc <- docs do
       if doc["integer"] >= 10 and doc["integer"] < 15 do
-        add_attachment(src_db_name, doc, body: att1_data)
+        add_attachment(src_db_name, doc)
       else
         doc
       end
@@ -1060,9 +1075,10 @@ defmodule ReplicationTest do
     assert is_binary(result["_local_id"])
 
     repl_id = result["_local_id"]
+    task = get_task(repl_id, 30000)
+    assert is_map(task), "Error waiting for replication to start"
 
-    Logger.debug "#{src_db_name} - #{repl_id}"
-    wait_for_seq(src_db_name, repl_id)
+    wait_for_repl(src_db_name, repl_id, 26)
 
     Enum.each(docs, fn doc ->
       resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}")
@@ -1079,8 +1095,8 @@ defmodule ReplicationTest do
         assert att["stub"]
 
         resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/readme.txt")
-        assert String.length(resp.body) == String.length(att1_data)
-        assert resp.body == att1_data
+        assert String.length(resp.body) == String.length("some text")
+        assert resp.body == "some text"
       end
     end)
 
@@ -1095,7 +1111,7 @@ defmodule ReplicationTest do
       case doc["integer"] do
         n when n >= 10 and n < 15 ->
           ctype = "application/binary"
-          opts = [name: "data.dat", body: att2_data, content_type: ctype]
+          opts = [name: "data.dat", body: att1_data, content_type: ctype]
           add_attachment(src_db_name, doc, opts)
         _ when is_ddoc ->
           add_attachment(src_db_name, doc)
@@ -1104,7 +1120,7 @@ defmodule ReplicationTest do
       end
     end
 
-    wait_for_seq(src_db_name, repl_id)
+    wait_for_repl(src_db_name, repl_id, 32)
 
     Enum.each(docs, fn doc ->
       is_ddoc = String.starts_with?(doc["_id"], "_design/")
@@ -1120,8 +1136,8 @@ defmodule ReplicationTest do
           assert att["stub"]
 
           resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/readme.txt")
-          assert String.length(resp.body) == String.length(att1_data)
-          assert resp.body == att1_data
+          assert String.length(resp.body) == String.length("some text")
+          assert resp.body == "some text"
 
           if not is_ddoc do
             att = atts["data.dat"]
@@ -1131,8 +1147,8 @@ defmodule ReplicationTest do
             assert att["stub"]
 
             resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/data.dat")
-            assert String.length(resp.body) == String.length(att2_data)
-            assert resp.body == att2_data
+            assert String.length(resp.body) == String.length(att1_data)
+            assert resp.body == att1_data
           end
         _ ->
           :ok
@@ -1144,15 +1160,15 @@ defmodule ReplicationTest do
 
     assert tgt_info["doc_count"] == src_info["doc_count"]
 
-    ddoc = Enum.last(docs)
+    ddoc = List.last(docs)
     ctype = "application/binary"
-    opts = [name: "data.dat", body: att2_data, content_type: ctype]
+    opts = [name: "data.dat", body: att1_data, content_type: ctype]
     add_attachment(src_db_name, ddoc, opts)
 
-    wait_for_seq(src_db_name, repl_id)
+    wait_for_repl(src_db_name, repl_id, 33)
 
-    copy = Couch.get("/#{tgt_db_name}/#{ddoc["_id"]}")
-    atts = copy["_attachments"]
+    resp = Couch.get("/#{tgt_db_name}/#{ddoc["_id"]}")
+    atts = resp.body["_attachments"]
     assert is_map(atts)
     att = atts["readme.txt"]
     assert is_map(att)
@@ -1160,9 +1176,9 @@ defmodule ReplicationTest do
     assert String.match?(att["content_type"], ~r/text\/plain/)
     assert att["stub"]
 
-    resp = Couch.get!("/#{tgt_db_name}/#{copy["_id"]}/readme.txt")
-    assert String.length(resp.body) == String.length(att1_data)
-    assert resp.body == att1_data
+    resp = Couch.get!("/#{tgt_db_name}/#{ddoc["_id"]}/readme.txt")
+    assert String.length(resp.body) == String.length("some text")
+    assert resp.body == "some text"
 
     att = atts["data.dat"]
     assert is_map(att)
@@ -1170,9 +1186,9 @@ defmodule ReplicationTest do
     assert String.match?(att["content_type"], ~r/application\/binary/)
     assert att["stub"]
 
-    resp = Couch.get!("/#{tgt_db_name}/#{copy["_id"]}/data.dat")
-    assert String.length(resp.body) == String.length(att2_data)
-    assert resp.body == att2_data
+    resp = Couch.get!("/#{tgt_db_name}/#{ddoc["_id"]}/data.dat")
+    assert String.length(resp.body) == String.length(att1_data)
+    assert resp.body == att1_data
 
     src_info = get_db_info(src_db_name)
     tgt_info = get_db_info(tgt_db_name)
@@ -1180,10 +1196,10 @@ defmodule ReplicationTest do
     assert tgt_info["doc_count"] == src_info["doc_count"]
 
     # Check creating new normal documents
-    new_docs = make_docs(25..35)
+    new_docs = make_docs(26..35)
     new_docs = save_docs(src_db_name, new_docs)
 
-    wait_for_seq(src_db_name, repl_id)
+    wait_for_repl(src_db_name, repl_id, 43)
 
     Enum.each(new_docs, fn doc ->
       resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}")
@@ -1198,42 +1214,112 @@ defmodule ReplicationTest do
 
     # Delete docs from the source
 
-    doc1 = Enum.at(docs, 0)
-    doc2 = Enum.at(docs, 6)
+    doc1 = Enum.at(new_docs, 0)
+    query = %{:rev => doc1["_rev"]}
+    Couch.delete!("/#{src_db_name}/#{doc1["_id"]}", query: query)
 
-    Couch.delete!(src_db_name, doc1["_id"])
-    Couch.delete!(src_db_name, doc2["_id"])
+    doc2 = Enum.at(new_docs, 6)
+    query = %{:rev => doc2["_rev"]}
+    Couch.delete!("/#{src_db_name}/#{doc2["_id"]}", query: query)
 
-    wait_for_seq(src_db_name, repl_id)
+    wait_for_repl(src_db_name, repl_id, 45)
 
-    resp = Couch.get(tgt_db_name, doc1["_id"])
+    resp = Couch.get("/#{tgt_db_name}/#{doc1["_id"]}")
     assert resp.status_code == 404
-    resp = Couch.get(tgt_db_name, doc2["_id"])
+    resp = Couch.get("/#{tgt_db_name}/#{doc2["_id"]}")
     assert resp.status_code == 404
 
     changes = get_db_changes(tgt_db_name, %{:since => tgt_info["update_seq"]})
     # quite unfortunately, there is no way on relying on ordering in a cluster
     # but we can assume a length of 2
     changes = for change <- changes["results"] do
-      {change["_id"], change["deleted"]}
+      {change["id"], change["deleted"]}
     end
     assert Enum.sort(changes) == [{doc1["_id"], true}, {doc2["_id"], true}]
 
     # Cancel the replication
     repl_body = %{:continuous => true, :cancel => true}
-    resp = replicate(repl_src, repl_tgt, repl_body)
+    resp = replicate(repl_src, repl_tgt, body: repl_body)
     assert resp["ok"]
     assert resp["_local_id"] == repl_id
 
     doc = %{"_id" => "foobar", "value": 666}
     [doc] = save_docs(src_db_name, [doc])
 
-    wait_for_replication_stop(repl_id, 30000)
+    wait_for_repl_stop(repl_id, 30000)
 
     resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}")
     assert resp.status_code == 404
   end
 
+  def run_compressed_att_repl(src_prefix, tgt_prefix) do
+    base_db_name = random_db_name()
+    src_db_name = base_db_name <> "_src"
+    tgt_db_name = base_db_name <> "_tgt"
+    repl_src = src_prefix <> src_db_name
+    repl_tgt = tgt_prefix <> tgt_db_name
+
+    create_db(src_db_name)
+    create_db(tgt_db_name)
+
+    doc = %{"_id" => "foobar"}
+    [doc] = save_docs(src_db_name, [doc])
+
+    att1_data = get_att1_data()
+    num_copies = 1 + round(128 * 1024 / String.length(att1_data))
+    big_att = List.foldl(Enum.to_list(1..num_copies), "", fn _, acc ->
+      acc <> att1_data
+    end)
+
+    doc = add_attachment(src_db_name, doc, [body: big_att])
+
+    # Disable attachment compression
+    set_config_raw("attachments", "compression_level", "0")
+
+    result = replicate(repl_src, repl_tgt)
+    assert result["ok"]
+    assert is_list(result["history"])
+    assert length(result["history"]) == 1
+    history = Enum.at(result["history"], 0)
+    assert history["missing_checked"] == 1
+    assert history["missing_found"] == 1
+    assert history["docs_read"] == 1
+    assert history["docs_written"] == 1
+    assert history["doc_write_failures"] == 0
+
+    token = Enum.random(1..1_000_000)
+    query = %{"att_encoding_info": "true", "bypass_cache": token}
+    resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}", query: query)
+    assert resp.status_code < 300
+    assert is_map(resp.body["_attachments"])
+    att = resp.body["_attachments"]["readme.txt"]
+    assert att["encoding"] == "gzip"
+    assert is_integer(att["length"])
+    assert is_integer(att["encoded_length"])
+    assert att["encoded_length"] < att["length"]
+  end
+
+  def run_non_admin_target_user_repl(src_prefix, tgt_prefix) do
+    joe_doc = %{
+      :_id => "org.couchdb.user:joe",
+      :type => "user",
+      :name => "joe",
+      :roles => ["erlanger"],
+      :password => "erly"
+    }
+    resp = Couch.post("/_users", body: joe_doc)
+    cond do
+      resp.body["ok"] -> :ok
+      resp.status_code == 409 -> :ok
+      true -> assert false, "Error creating user joe: #{inspect resp}"
+    end
+
+    sess = Couch.login("joe", "erly")
+    resp = Couch.Session.get(sess, "/_session")
+    assert resp.body["ok"]
+    assert resp.body["userCtx"]["name"] == "joe"
+  end
+
   def get_db_info(db_name) do
     resp = Couch.get("/#{db_name}")
     assert HTTPotion.Response.success?(resp)
@@ -1274,6 +1360,7 @@ defmodule ReplicationTest do
     resp = Couch.post("/#{db_name}/_bulk_docs", query: query, body: body)
     assert HTTPotion.Response.success?(resp)
     for {doc, resp} <- Enum.zip(docs, resp.body) do
+      assert resp["ok"], "Error saving doc: #{doc["_id"]}"
       Map.put(doc, "_rev", resp["rev"])
     end
   end
@@ -1297,35 +1384,39 @@ defmodule ReplicationTest do
     Map.put(doc, "_rev", resp.body["rev"])
   end
 
-  def wait_for_seq(src_db_name, repl_id) do
-    src_info = get_db_info(src_db_name)
-    src_seq = src_info["update_seq"]
-    wait_for_seq(src_seq, repl_id, 30000)
+  def wait_for_repl(src_db_name, repl_id, expect_revs_checked) do
+    wait_for_repl(src_db_name, repl_id, expect_revs_checked, 30000)
   end
 
-  def wait_for_seq(src_seq, _, wait_left) when wait_left <= 0 do
-    assert false, "Timeout waiting for replication sequence: #{src_seq}"
+  def wait_for_repl(_, _, _, wait_left) when wait_left <= 0 do
+    assert false, "Timeout waiting for replication"
   end
 
-  def wait_for_seq(src_seq, repl_id, wait_left) do
+  def wait_for_repl(src_db_name, repl_id, expect_revs_checked, wait_left) do
     task = get_task(repl_id, 0)
-    Logger.debug "task: #{src_seq} #{inspect task}"
-    if not is_map(task) or task["through_seq"] != src_seq do
+    through_seq = task["through_seq"]
+    revs_checked = task["revisions_checked"]
+    changes = get_db_changes(src_db_name, %{:since => through_seq})
+    if length(changes["results"]) > 0 or revs_checked < expect_revs_checked do
       :timer.sleep(500)
-      wait_for_seq(src_seq, repl_id, wait_left - 500)
+      wait_for_repl(src_db_name, repl_id, expect_revs_checked, wait_left - 500)
     end
     task
   end
 
-  def wait_for_replication_stop(repl_id, wait_left) when wait_left <= 0 do
+  def wait_for_repl_stop(repl_id) do
+    wait_for_repl_stop(repl_id, 30000)
+  end
+
+  def wait_for_repl_stop(repl_id, wait_left) when wait_left <= 0 do
     assert false, "Timeout waiting for replication task to stop: #{repl_id}"
   end
 
-  def wait_for_replication_stop(repl_id, wait_left) do
+  def wait_for_repl_stop(repl_id, wait_left) do
     task = get_task(repl_id, 0)
     if is_map(task) do
       :timer.sleep(500)
-      wait_for_replication_stop(repl_id, wait_left - 500)
+      wait_for_repl_stop(repl_id, wait_left - 500)
     end
   end
 
diff --git a/test/elixir/test/test_helper.exs b/test/elixir/test/test_helper.exs
index cb01fc2..9baf204 100644
--- a/test/elixir/test/test_helper.exs
+++ b/test/elixir/test/test_helper.exs
@@ -66,15 +66,7 @@ defmodule CouchTestCase do
       end
 
       def set_config({section, key, value}) do
-        resp = Couch.get("/_membership")
-        existing = Enum.map(resp.body["all_nodes"], fn node ->
-          url = "/_node/#{node}/_config/#{section}/#{key}"
-          headers = ["X-Couch-Persist": "false"]
-          body = :jiffy.encode(value)
-          resp = Couch.put(url, headers: headers, body: body)
-          assert resp.status_code == 200
-          {node, resp.body}
-        end)
+        existing = set_config_raw(section, key, value)
         on_exit(fn ->
           Enum.each(existing, fn {node, prev_value} ->
             if prev_value != "" do
@@ -93,6 +85,18 @@ defmodule CouchTestCase do
         end)
       end
 
+      def set_config_raw(section, key, value) do
+        resp = Couch.get("/_membership")
+        Enum.map(resp.body["all_nodes"], fn node ->
+          url = "/_node/#{node}/_config/#{section}/#{key}"
+          headers = ["X-Couch-Persist": "false"]
+          body = :jiffy.encode(value)
+          resp = Couch.put(url, headers: headers, body: body)
+          assert resp.status_code == 200
+          {node, resp.body}
+        end)
+      end
+
       def create_db(db_name) do
         resp = Couch.put("/#{db_name}")
         assert resp.status_code == 201

-- 
To stop receiving notification emails like this one, please contact
davisp@apache.org.