You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jt...@apache.org on 2018/07/06 15:30:56 UTC

kudu git commit: [python] KUDU-2441: Enable configuration of mutation buffer

Repository: kudu
Updated Branches:
  refs/heads/master 910fd0bac -> 7177fc359


[python] KUDU-2441: Enable configuration of mutation buffer

The Python client currently doesn't support configuring the mutation
buffer for Session objects.  This patch brings the Python client up
to speed with the C++ client in this respect. This patch includes a
basic unit test.

Change-Id: I52ac48e7dddc31e666a95ace4c7672da51d80b11
Reviewed-on: http://gerrit.cloudera.org:8080/10674
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7177fc35
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7177fc35
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7177fc35

Branch: refs/heads/master
Commit: 7177fc35992daabdb93d9920fb1cc46c1a07619a
Parents: 910fd0b
Author: Jordan Birdsell <jb...@phdata.io>
Authored: Sat Jun 9 17:06:07 2018 -0400
Committer: Jordan Birdsell <jt...@apache.org>
Committed: Fri Jul 6 15:30:45 2018 +0000

----------------------------------------------------------------------
 python/kudu/client.pyx           | 121 +++++++++++++++++++++++++++++++++-
 python/kudu/libkudu_client.pxd   |   6 +-
 python/kudu/tests/test_client.py |  29 ++++++++
 3 files changed, 154 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7177fc35/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index afa7e8d..39093b1 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -525,7 +525,7 @@ cdef class Client:
             result.append(ts._init(tservers[i]))
         return result
 
-    def new_session(self, flush_mode='manual', timeout_ms=5000):
+    def new_session(self, flush_mode='manual', timeout_ms=5000, **kwargs):
         """
         Create a new KuduSession for applying write operations.
 
@@ -535,17 +535,43 @@ cdef class Client:
           See Session.set_flush_mode
         timeout_ms : int, default 5000
           Timeout in milliseconds
+        mutation_buffer_sz : Size in bytes of the buffer space.
+        mutation_buffer_watermark : Watermark level as percentage of the mutation buffer size,
+            this is used to trigger a flush in AUTO_FLUSH_BACKGROUND mode.
+        mutation_buffer_flush_interval : The duration of the interval for the time-based
+            flushing, in milliseconds. In some cases, while running in AUTO_FLUSH_BACKGROUND
+            mode, the size of the mutation buffer for pending operations and the flush
+            watermark for fresh operations may be too high for the rate of incoming data:
+            it would take too long to accumulate enough data in the buffer to trigger
+            flushing. I.e., it makes sense to flush the accumulated operations if the
+            prior flush happened long time ago. This parameter sets the wait interval for
+            the time-based flushing which takes place along with the flushing triggered
+            by the over-the-watermark criterion. By default, the interval is set to
+            1000 ms (i.e. 1 second).
+        mutation_buffer_max_num : The maximum number of mutation buffers per KuduSession
+            object to hold the applied operations. Use 0 to set the maximum number of
+            concurrent mutation buffers to unlimited
 
         Returns
         -------
         session : kudu.Session
         """
+
         cdef Session result = Session()
         result.s = self.cp.NewSession()
 
         result.set_flush_mode(flush_mode)
         result.set_timeout_ms(timeout_ms)
 
+        if "mutation_buffer_sz" in kwargs:
+            result.set_mutation_buffer_space(kwargs["mutation_buffer_sz"])
+        if "mutation_buffer_watermark" in kwargs:
+            result.set_mutation_buffer_flush_watermark(kwargs["mutation_buffer_watermark"])
+        if "mutation_buffer_flush_interval" in kwargs:
+            result.set_mutation_buffer_flush_interval(kwargs["mutation_buffer_flush_interval"])
+        if "mutation_buffer_max_num" in kwargs:
+            result.set_mutation_buffer_max_num(kwargs["mutation_buffer_max_num"])
+
         return result
 
     def new_table_alterer(self, Table table):
@@ -1245,6 +1271,99 @@ cdef class Session:
         """
         self.s.get().SetTimeoutMillis(ms)
 
+    def set_mutation_buffer_space(self, size_t size_bytes):
+        """
+        Set the amount of buffer space used by this session for outbound writes.
+
+        The effect of the buffer size varies based on the flush mode of the session:
+
+            AUTO_FLUSH_SYNC: since no buffering is done, this has no effect.
+            AUTO_FLUSH_BACKGROUND: if the buffer space is exhausted, then write calls
+                will block until there is space available in the buffer.
+            MANUAL_FLUSH: if the buffer space is exhausted, then write calls will return
+                an error
+
+        By default, the buffer space is set to 7 MiB (i.e. 7 * 1024 * 1024 bytes).
+
+        Parameters
+        ----------
+        size_bytes : Size of the buffer space to set (number of bytes)
+        """
+        status = self.s.get().SetMutationBufferSpace(size_bytes)
+
+        check_status(status)
+
+    def set_mutation_buffer_flush_watermark(self, double watermark_pct):
+        """
+        Set the buffer watermark to trigger flush in AUTO_FLUSH_BACKGROUND mode.
+
+        This method sets the watermark for fresh operations in the buffer when running
+        in AUTO_FLUSH_BACKGROUND mode: once the specified threshold is reached, the
+        session starts sending the accumulated write operations to the appropriate
+        tablet servers. The flush watermark determines how much of the buffer space is
+        taken by newly submitted operations. Setting this level to 100% results in
+        flushing the buffer only when the newly applied operation would overflow the
+        buffer. By default, the buffer flush watermark is set to 50%.
+
+        Parameters
+        ----------
+        watermark_pct : Watermark level as percentage of the mutation buffer size
+        """
+        status = self.s.get().SetMutationBufferFlushWatermark(watermark_pct)
+
+        check_status(status)
+
+    def set_mutation_buffer_flush_interval(self, unsigned int millis):
+        """
+        Set the interval for time-based flushing of the mutation buffer.
+
+        In some cases, while running in AUTO_FLUSH_BACKGROUND mode, the size of the
+        mutation buffer for pending operations and the flush watermark for fresh
+        operations may be too high for the rate of incoming data: it would take too
+        long to accumulate enough data in the buffer to trigger flushing. I.e., it
+        makes sense to flush the accumulated operations if the prior flush happened
+        long time ago. This method sets the wait interval for the time-based flushing
+        which takes place along with the flushing triggered by the over-the-watermark
+        criterion. By default, the interval is set to 1000 ms (i.e. 1 second).
+
+        Parameters
+        ----------
+        millis : The duration of the interval for the time-based flushing, in milliseconds.
+        """
+
+        status = self.s.get().SetMutationBufferFlushInterval(millis)
+
+        check_status(status)
+
+    def set_mutation_buffer_max_num(self, unsigned int max_num):
+        """
+        Set the maximum number of mutation buffers per Session object.
+
+        A Session accumulates write operations submitted via the Apply() method in
+        mutation buffers. A Session always has at least one mutation buffer. The
+        mutation buffer which accumulates new incoming operations is called the current
+        mutation buffer. The current mutation buffer is flushed using the
+        Session.flush() method or it's done by the Session automatically if
+        running in AUTO_FLUSH_BACKGROUND mode. After flushing the current mutation buffer,
+        a new buffer is created upon calling Session.apply(), provided the limit is
+        not exceeded. A call to Session.apply() blocks if it's at the maximum number
+        of buffers allowed; the call unblocks as soon as one of the pending batchers
+        finished flushing and a new batcher can be created.
+
+        The minimum setting for this parameter is 1 (one). The default setting for this
+        parameter is 2 (two).
+
+        Parameters
+        ----------
+        max_num : The maximum number of mutation buffers per Session object to hold
+            the applied operations. Use 0 to set the maximum number of concurrent mutation
+            buffers to unlimited.
+        """
+
+        status = self.s.get().SetMutationBufferMaxNum(max_num)
+
+        check_status(status)
+
     def apply(self, WriteOperation op):
         """
         Apply the indicated write operation

http://git-wip-us.apache.org/repos/asf/kudu/blob/7177fc35/python/kudu/libkudu_client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index 5cd7731..a34d6c2 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -624,7 +624,11 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
 
         Status SetFlushMode(FlushMode m)
 
-        void SetMutationBufferSpace(size_t size)
+        Status SetMutationBufferSpace(size_t size)
+        Status SetMutationBufferFlushWatermark(double watermark_pct)
+        Status SetMutationBufferFlushInterval(unsigned int millis)
+        Status SetMutationBufferMaxNum(unsigned int max_num)
+
         void SetTimeoutMillis(int millis)
 
         void SetPriority(int priority)

http://git-wip-us.apache.org/repos/asf/kudu/blob/7177fc35/python/kudu/tests/test_client.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index 0a3d47e..f251033 100644
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -255,6 +255,35 @@ class TestClient(KuduTestBase, unittest.TestCase):
         with self.assertRaises(ValueError):
             self.client.new_session(flush_mode='foo')
 
+
+    def test_session_mutation_buffer_settings(self):
+        self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND,
+                                mutation_buffer_sz= 10*1024*1024,
+                                mutation_buffer_watermark=0.5,
+                                mutation_buffer_flush_interval=2000,
+                                mutation_buffer_max_num=3)
+
+        session = self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND)
+        session.set_mutation_buffer_space(10*1024*1024)
+        session.set_mutation_buffer_flush_watermark(0.5)
+        session.set_mutation_buffer_flush_interval(2000)
+        session.set_mutation_buffer_max_num(3)
+
+    def test_session_mutation_buffer_errors(self):
+        session = self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND)
+
+        with self.assertRaises(OverflowError):
+            session.set_mutation_buffer_max_num(-1)
+
+        with self.assertRaises(kudu.errors.KuduInvalidArgument):
+            session.set_mutation_buffer_flush_watermark(1.2)
+
+        with self.assertRaises(OverflowError):
+            session.set_mutation_buffer_flush_interval(-1)
+
+        with self.assertRaises(OverflowError):
+            session.set_mutation_buffer_space(-1)
+
     def test_connect_timeouts(self):
         # it works! any other way to check
         kudu.connect(self.master_hosts, self.master_ports,