You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/10/28 18:05:15 UTC

[2/2] kudu git commit: KUDU-1611 - [python] Enable setting scanner selection policy

KUDU-1611 - [python] Enable setting scanner selection policy

Currently the python client cannot set the scanner selection policy.
This patch enables that both for the Scanner class and the
ScanTokenBuilder.  This patch includes two unit tests.

Change-Id: I09d9cd081c037e3284370943e2cae348476df26c
Reviewed-on: http://gerrit.cloudera.org:8080/4408
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d38a17d9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d38a17d9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d38a17d9

Branch: refs/heads/master
Commit: d38a17d9be48fbe2ad3fc5aa9c034157b6b0a456
Parents: b7497b3
Author: Jordan Birdsell <jo...@gmail.com>
Authored: Tue Sep 13 23:33:47 2016 -0400
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Fri Oct 28 17:01:50 2016 +0000

----------------------------------------------------------------------
 python/kudu/__init__.py             |  3 ++
 python/kudu/client.pyx              | 81 ++++++++++++++++++++++++++++++++
 python/kudu/libkudu_client.pxd      |  6 +--
 python/kudu/tests/test_scanner.py   | 16 +++++++
 python/kudu/tests/test_scantoken.py | 23 +++++++++
 5 files changed, 126 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/d38a17d9/python/kudu/__init__.py
----------------------------------------------------------------------
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index 771f99a..8ff299c 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -19,6 +19,9 @@ from kudu.client import (Client, Table, Scanner, Session,  # noqa
                          Insert, Update, Delete, Predicate,
                          TimeDelta, KuduError, ScanTokenBuilder,
                          ScanToken,
+                         LEADER_ONLY,
+                         CLOSEST_REPLICA,
+                         FIRST_REPLICA,
                          FLUSH_AUTO_BACKGROUND,
                          FLUSH_AUTO_SYNC,
                          FLUSH_MANUAL,

http://git-wip-us.apache.org/repos/asf/kudu/blob/d38a17d9/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 97ae711..19ab7a8 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -34,6 +34,16 @@ from errors import KuduException
 
 import six
 
+# Replica selection enums
+LEADER_ONLY = ReplicaSelection_Leader
+CLOSEST_REPLICA = ReplicaSelection_Closest
+FIRST_REPLICA = ReplicaSelection_First
+
+cdef dict _replica_selection_policies = {
+    'leader': ReplicaSelection_Leader,
+    'closest': ReplicaSelection_Closest,
+    'first': ReplicaSelection_First
+}
 
 # Read mode enums
 READ_LATEST = ReadMode_Latest
@@ -1336,6 +1346,41 @@ cdef class Scanner:
         check_status(self.scanner.SetProjectedColumnNames(v_names))
         return self
 
+    def set_selection(self, replica_selection):
+        """
+        Set the replica selection policy while scanning.
+
+        Parameters
+        ----------
+        replica_selection : {'leader', 'closest', 'first'}
+          You can also use the constants LEADER_ONLY, CLOSEST_REPLICA,
+          and FIRST_REPLICA
+
+        Returns
+        -------
+        self : Scanner
+        """
+        cdef ReplicaSelection selection
+
+        def invalid_selection_policy():
+            raise ValueError('Invalid replica selection policy: {0}'
+                             .format(replica_selection))
+
+        if isinstance(replica_selection, int):
+            if 0 <= replica_selection < len(_replica_selection_policies):
+                check_status(self.scanner.SetSelection(
+                             <ReplicaSelection> replica_selection))
+            else:
+                invalid_selection_policy()
+        else:
+            try:
+                check_status(self.scanner.SetSelection(
+                    _replica_selection_policies[replica_selection.lower()]))
+            except KeyError:
+                invalid_selection_policy()
+
+        return self
+
     def set_projected_column_indexes(self, indexes):
         """
         Sets the columns to be scanned.
@@ -1578,6 +1623,7 @@ cdef class Scanner:
         check_status(self.scanner.NextBatch(&batch.batch))
         return batch
 
+
 cdef class ScanToken:
     """
     A ScanToken describes a partial scan of a Kudu table limited to a single
@@ -1826,6 +1872,41 @@ cdef class ScanTokenBuilder:
         check_status(self._builder.SetFaultTolerant())
         return self
 
+    def set_selection(self, replica_selection):
+        """
+        Set the replica selection policy while scanning.
+
+        Parameters
+        ----------
+        replica_selection : {'leader', 'closest', 'first'}
+          You can also use the constants LEADER_ONLY, CLOSEST_REPLICA,
+          and FIRST_REPLICA
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        cdef ReplicaSelection selection
+
+        def invalid_selection_policy():
+            raise ValueError('Invalid replica selection policy: {0}'
+                             .format(replica_selection))
+
+        if isinstance(replica_selection, int):
+            if 0 <= replica_selection < len(_replica_selection_policies):
+                check_status(self._builder.SetSelection(
+                             <ReplicaSelection> replica_selection))
+            else:
+                invalid_selection_policy()
+        else:
+            try:
+                check_status(self._builder.SetSelection(
+                    _replica_selection_policies[replica_selection.lower()]))
+            except KeyError:
+                invalid_selection_policy()
+
+        return self
+
     def add_predicates(self, preds):
         """
         Add a list of scan predicates to the ScanTokenBuilder. Select columns

http://git-wip-us.apache.org/repos/asf/kudu/blob/d38a17d9/python/kudu/libkudu_client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index 83a9b03..141fdb7 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -453,9 +453,9 @@ cdef extern from "kudu/client/value.h" namespace "kudu::client" nogil:
 cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
 
     enum ReplicaSelection" kudu::client::KuduClient::ReplicaSelection":
-        LEADER_ONLY " kudu::client::KuduClient::LEADER_ONLY"
-        CLOSEST_REPLICA " kudu::client::KuduClient::CLOSEST_REPLICA"
-        FIRST_REPLICA " kudu::client::KuduClient::FIRST_REPLICA"
+        ReplicaSelection_Leader " kudu::client::KuduClient::LEADER_ONLY"
+        ReplicaSelection_Closest " kudu::client::KuduClient::CLOSEST_REPLICA"
+        ReplicaSelection_First " kudu::client::KuduClient::FIRST_REPLICA"
 
     enum ReadMode" kudu::client::KuduScanner::ReadMode":
         ReadMode_Latest " kudu::client::KuduScanner::READ_LATEST"

http://git-wip-us.apache.org/repos/asf/kudu/blob/d38a17d9/python/kudu/tests/test_scanner.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py
index ebacb72..fd39f1a 100644
--- a/python/kudu/tests/test_scanner.py
+++ b/python/kudu/tests/test_scanner.py
@@ -258,3 +258,19 @@ class TestScanner(TestScanBase):
         # Test a single precision float predicate
         # Does a row check count only
         self._test_float_pred()
+
+    def test_scan_selection(self):
+        """
+        This test confirms that setting the scan selection policy on the
+        scanner does not cause any errors. There is no way to confirm
+        that the policy was actually set. This functionality is
+        tested in the C++ test:
+            ClientTest.TestReplicatedMultiTabletTableFailover.
+        """
+
+        for policy in ['leader', kudu.CLOSEST_REPLICA, 2]:
+            scanner = self.table.scanner()
+            scanner.set_selection(policy)
+            scanner.open()
+            self.assertEqual(sorted(scanner.read_all_tuples()),
+                             sorted(self.tuples))

http://git-wip-us.apache.org/repos/asf/kudu/blob/d38a17d9/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py
index 897d780..fbe66df 100644
--- a/python/kudu/tests/test_scantoken.py
+++ b/python/kudu/tests/test_scantoken.py
@@ -239,3 +239,26 @@ class TestScanToken(TestScanBase):
         # Test a single precision float predicate
         # Does a row check count only
         self._test_float_pred()
+
+    def test_scan_selection(self):
+        """
+        This test confirms that setting the scan selection policy on the
+        ScanTokenBuilder does not cause any errors . There is no way to
+        confirm that the policy was actually set. This functionality is
+        tested in the C++ test:
+            ClientTest.TestReplicatedMultiTabletTableFailover.
+        """
+
+        for policy in ['leader', kudu.CLOSEST_REPLICA, 2]:
+            builder = self.table.scan_token_builder()
+            builder.set_selection(policy)
+            tokens = builder.build()
+
+            tuples = []
+            for token in tokens:
+                scanner = token.into_kudu_scanner()
+                scanner.open()
+                tuples.extend(scanner.read_all_tuples())
+
+            self.assertEqual(sorted(tuples),
+                             sorted(self.tuples))