You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jt...@apache.org on 2018/07/06 15:30:56 UTC
kudu git commit: [python] KUDU-2441: Enable configuration of mutation
buffer
Repository: kudu
Updated Branches:
refs/heads/master 910fd0bac -> 7177fc359
[python] KUDU-2441: Enable configuration of mutation buffer
The Python client currently doesn't support configuring the mutation
buffer for Session objects. This patch brings the Python client up
to speed with the C++ client in this respect. This patch includes a
basic unit test.
Change-Id: I52ac48e7dddc31e666a95ace4c7672da51d80b11
Reviewed-on: http://gerrit.cloudera.org:8080/10674
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wd...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7177fc35
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7177fc35
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7177fc35
Branch: refs/heads/master
Commit: 7177fc35992daabdb93d9920fb1cc46c1a07619a
Parents: 910fd0b
Author: Jordan Birdsell <jb...@phdata.io>
Authored: Sat Jun 9 17:06:07 2018 -0400
Committer: Jordan Birdsell <jt...@apache.org>
Committed: Fri Jul 6 15:30:45 2018 +0000
----------------------------------------------------------------------
python/kudu/client.pyx | 121 +++++++++++++++++++++++++++++++++-
python/kudu/libkudu_client.pxd | 6 +-
python/kudu/tests/test_client.py | 29 ++++++++
3 files changed, 154 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/7177fc35/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index afa7e8d..39093b1 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -525,7 +525,7 @@ cdef class Client:
result.append(ts._init(tservers[i]))
return result
- def new_session(self, flush_mode='manual', timeout_ms=5000):
+ def new_session(self, flush_mode='manual', timeout_ms=5000, **kwargs):
"""
Create a new KuduSession for applying write operations.
@@ -535,17 +535,43 @@ cdef class Client:
See Session.set_flush_mode
timeout_ms : int, default 5000
Timeout in milliseconds
+ mutation_buffer_sz : Size in bytes of the buffer space.
+ mutation_buffer_watermark : Watermark level as percentage of the mutation buffer size,
+ this is used to trigger a flush in AUTO_FLUSH_BACKGROUND mode.
+ mutation_buffer_flush_interval : The duration of the interval for the time-based
+ flushing, in milliseconds. In some cases, while running in AUTO_FLUSH_BACKGROUND
+ mode, the size of the mutation buffer for pending operations and the flush
+ watermark for fresh operations may be too high for the rate of incoming data:
+ it would take too long to accumulate enough data in the buffer to trigger
+ flushing. I.e., it makes sense to flush the accumulated operations if the
+ prior flush happened long time ago. This parameter sets the wait interval for
+ the time-based flushing which takes place along with the flushing triggered
+ by the over-the-watermark criterion. By default, the interval is set to
+ 1000 ms (i.e. 1 second).
+ mutation_buffer_max_num : The maximum number of mutation buffers per KuduSession
+ object to hold the applied operations. Use 0 to set the maximum number of
+ concurrent mutation buffers to unlimited
Returns
-------
session : kudu.Session
"""
+
cdef Session result = Session()
result.s = self.cp.NewSession()
result.set_flush_mode(flush_mode)
result.set_timeout_ms(timeout_ms)
+ if "mutation_buffer_sz" in kwargs:
+ result.set_mutation_buffer_space(kwargs["mutation_buffer_sz"])
+ if "mutation_buffer_watermark" in kwargs:
+ result.set_mutation_buffer_flush_watermark(kwargs["mutation_buffer_watermark"])
+ if "mutation_buffer_flush_interval" in kwargs:
+ result.set_mutation_buffer_flush_interval(kwargs["mutation_buffer_flush_interval"])
+ if "mutation_buffer_max_num" in kwargs:
+ result.set_mutation_buffer_max_num(kwargs["mutation_buffer_max_num"])
+
return result
def new_table_alterer(self, Table table):
@@ -1245,6 +1271,99 @@ cdef class Session:
"""
self.s.get().SetTimeoutMillis(ms)
+ def set_mutation_buffer_space(self, size_t size_bytes):
+ """
+ Set the amount of buffer space used by this session for outbound writes.
+
+ The effect of the buffer size varies based on the flush mode of the session:
+
+ AUTO_FLUSH_SYNC: since no buffering is done, this has no effect.
+ AUTO_FLUSH_BACKGROUND: if the buffer space is exhausted, then write calls
+ will block until there is space available in the buffer.
+ MANUAL_FLUSH: if the buffer space is exhausted, then write calls will return
+ an error
+
+ By default, the buffer space is set to 7 MiB (i.e. 7 * 1024 * 1024 bytes).
+
+ Parameters
+ ----------
+ size_bytes : Size of the buffer space to set (number of bytes)
+ """
+ status = self.s.get().SetMutationBufferSpace(size_bytes)
+
+ check_status(status)
+
+ def set_mutation_buffer_flush_watermark(self, double watermark_pct):
+ """
+ Set the buffer watermark to trigger flush in AUTO_FLUSH_BACKGROUND mode.
+
+ This method sets the watermark for fresh operations in the buffer when running
+ in AUTO_FLUSH_BACKGROUND mode: once the specified threshold is reached, the
+ session starts sending the accumulated write operations to the appropriate
+ tablet servers. The flush watermark determines how much of the buffer space is
+ taken by newly submitted operations. Setting this level to 100% results in
+ flushing the buffer only when the newly applied operation would overflow the
+ buffer. By default, the buffer flush watermark is set to 50%.
+
+ Parameters
+ ----------
+ watermark_pct : Watermark level as percentage of the mutation buffer size
+ """
+ status = self.s.get().SetMutationBufferFlushWatermark(watermark_pct)
+
+ check_status(status)
+
+ def set_mutation_buffer_flush_interval(self, unsigned int millis):
+ """
+ Set the interval for time-based flushing of the mutation buffer.
+
+ In some cases, while running in AUTO_FLUSH_BACKGROUND mode, the size of the
+ mutation buffer for pending operations and the flush watermark for fresh
+ operations may be too high for the rate of incoming data: it would take too
+ long to accumulate enough data in the buffer to trigger flushing. I.e., it
+ makes sense to flush the accumulated operations if the prior flush happened
+ long time ago. This method sets the wait interval for the time-based flushing
+ which takes place along with the flushing triggered by the over-the-watermark
+ criterion. By default, the interval is set to 1000 ms (i.e. 1 second).
+
+ Parameters
+ ----------
+ millis : The duration of the interval for the time-based flushing, in milliseconds.
+ """
+
+ status = self.s.get().SetMutationBufferFlushInterval(millis)
+
+ check_status(status)
+
+ def set_mutation_buffer_max_num(self, unsigned int max_num):
+ """
+ Set the maximum number of mutation buffers per Session object.
+
+ A Session accumulates write operations submitted via the Apply() method in
+ mutation buffers. A Session always has at least one mutation buffer. The
+ mutation buffer which accumulates new incoming operations is called the current
+ mutation buffer. The current mutation buffer is flushed using the
+ Session.flush() method or it's done by the Session automatically if
+ running in AUTO_FLUSH_BACKGROUND mode. After flushing the current mutation buffer,
+ a new buffer is created upon calling Session.apply(), provided the limit is
+ not exceeded. A call to Session.apply() blocks if it's at the maximum number
+ of buffers allowed; the call unblocks as soon as one of the pending batchers
+ finished flushing and a new batcher can be created.
+
+ The minimum setting for this parameter is 1 (one). The default setting for this
+ parameter is 2 (two).
+
+ Parameters
+ ----------
+ max_num : The maximum number of mutation buffers per Session object to hold
+ the applied operations. Use 0 to set the maximum number of concurrent mutation
+ buffers to unlimited.
+ """
+
+ status = self.s.get().SetMutationBufferMaxNum(max_num)
+
+ check_status(status)
+
def apply(self, WriteOperation op):
"""
Apply the indicated write operation
http://git-wip-us.apache.org/repos/asf/kudu/blob/7177fc35/python/kudu/libkudu_client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index 5cd7731..a34d6c2 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -624,7 +624,11 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
Status SetFlushMode(FlushMode m)
- void SetMutationBufferSpace(size_t size)
+ Status SetMutationBufferSpace(size_t size)
+ Status SetMutationBufferFlushWatermark(double watermark_pct)
+ Status SetMutationBufferFlushInterval(unsigned int millis)
+ Status SetMutationBufferMaxNum(unsigned int max_num)
+
void SetTimeoutMillis(int millis)
void SetPriority(int priority)
http://git-wip-us.apache.org/repos/asf/kudu/blob/7177fc35/python/kudu/tests/test_client.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index 0a3d47e..f251033 100644
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -255,6 +255,35 @@ class TestClient(KuduTestBase, unittest.TestCase):
with self.assertRaises(ValueError):
self.client.new_session(flush_mode='foo')
+
+ def test_session_mutation_buffer_settings(self):
+ self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND,
+ mutation_buffer_sz= 10*1024*1024,
+ mutation_buffer_watermark=0.5,
+ mutation_buffer_flush_interval=2000,
+ mutation_buffer_max_num=3)
+
+ session = self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND)
+ session.set_mutation_buffer_space(10*1024*1024)
+ session.set_mutation_buffer_flush_watermark(0.5)
+ session.set_mutation_buffer_flush_interval(2000)
+ session.set_mutation_buffer_max_num(3)
+
+ def test_session_mutation_buffer_errors(self):
+ session = self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND)
+
+ with self.assertRaises(OverflowError):
+ session.set_mutation_buffer_max_num(-1)
+
+ with self.assertRaises(kudu.errors.KuduInvalidArgument):
+ session.set_mutation_buffer_flush_watermark(1.2)
+
+ with self.assertRaises(OverflowError):
+ session.set_mutation_buffer_flush_interval(-1)
+
+ with self.assertRaises(OverflowError):
+ session.set_mutation_buffer_space(-1)
+
def test_connect_timeouts(self):
# it works! any other way to check
kudu.connect(self.master_hosts, self.master_ports,