You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/01/16 22:31:29 UTC
[1/3] incubator-kudu git commit: KUDU-1285: use ccache if available
in thirdparty builds with clang
Repository: incubator-kudu
Updated Branches:
refs/heads/master 56f97d557 -> 53f976f03
KUDU-1285: use ccache if available in thirdparty builds with clang
Piggy-backing on the work done in commit c8049c1.
Change-Id: I64a78d145ae60b5c99e33734486fbe242c90fb1b
Reviewed-on: http://gerrit.cloudera.org:8080/1810
Tested-by: Internal Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/2cd52ba6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/2cd52ba6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/2cd52ba6
Branch: refs/heads/master
Commit: 2cd52ba63562d04c76470a43f08ade0afcd625ca
Parents: 56f97d5
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Jan 15 17:49:56 2016 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Sat Jan 16 05:55:15 2016 +0000
----------------------------------------------------------------------
thirdparty/build-thirdparty.sh | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2cd52ba6/thirdparty/build-thirdparty.sh
----------------------------------------------------------------------
diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh
index ebff539..a6684a2 100755
--- a/thirdparty/build-thirdparty.sh
+++ b/thirdparty/build-thirdparty.sh
@@ -259,9 +259,15 @@ if [ -n "$F_TSAN" ]; then
# * -Wl,-rpath,... - Add instrumented libstdc++ location to the rpath so that
# it can be found at runtime.
- # TODO(KUDU-1285): Fix thirdparty TSAN builds to allow ccache
- export CC="$TP_DIR/clang-toolchain/bin/clang"
- export CXX="$TP_DIR/clang-toolchain/bin/clang++"
+ if which ccache >/dev/null ; then
+ CLANG="$TP_DIR/../build-support/ccache-clang/clang"
+ CLANGXX="$TP_DIR/../build-support/ccache-clang/clang++"
+ else
+ CLANG="$TP_DIR/clang-toolchain/bin/clang"
+ CLANGXX="$TP_DIR/clang-toolchain/bin/clang++"
+ fi
+ export CC=$CLANG
+ export CXX=$CLANGXX
PREFIX=$PREFIX_DEPS_TSAN
@@ -298,7 +304,7 @@ if [ -n "$F_TSAN" ]; then
fi
restore_env
- # Build dependencies that do no require TSAN instrumentation
+ # Build dependencies that do not require TSAN instrumentation
EXTRA_CXXFLAGS="-nostdinc++ $EXTRA_CXXFLAGS"
EXTRA_CXXFLAGS="-isystem $PREFIX_LIBSTDCXX/include/c++/$GCC_VERSION/backward $EXTRA_CXXFLAGS"
[3/3] incubator-kudu git commit: [python client] - Expand C++ API
coverage and improve usability and documentation
Posted by to...@apache.org.
[python client] - Expand C++ API coverage and improve usability and
documentation
Lot of different things in this patch:
- Python 3 support
- Update all ASF license headers
- Use the KuduSchemaBuilder to create schemas. Old APIs in the original Python
client draft from late 2014 have been removed. Can specify columns fully
through SchemaBuilder.add_column or using a fluent/chaining ColumnSpec method
that patterns the C++ API.
- More natural API to create column comparison predicates and Column and
Predicate wrapper classes to enable. For example, table[col_name] <=
val. Remove add_comparison_predicate operation in favor of add_predicates.
- Rework table and column schema objects and add user-friendly __repr__ output,
__getitem__ by position or name, and other conveniences. Show names, types,
and primary keys in Schema __repr__
- Add kudu.connect method to hide kudu.Client constructor details and provide
(eventually) for HA master
- Add Client.list_tables using KuduClient::ListTables
- Use new KuduClient::TableExists API
- Add a KuduType usability layer, exposing Kudu types in the kudu namespace as
int8, int16, ...
- Enable column compression and encoding to be specified by name or enum
- Add some exception subclasses for common Status failures, such as
KuduNotFound
- Add kudu.timedelta method for constructing kudu.TimeDelta, a wrapper for
kudu::MonoDelta. Add timeout options to kudu.connect
- Clean up kudu.* user namespace
- Organized code into multiple Cython modules and multiple test modules
- Library can be packaged using python setup.py sdist for posting to PyPI
Change-Id: I6bb98c780f2584f9ad9d301f910d0a5921d5f387
Reviewed-on: http://gerrit.cloudera.org:8080/1593
Tested-by: Internal Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/53f976f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/53f976f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/53f976f0
Branch: refs/heads/master
Commit: 53f976f03eb88ea8487a7309cf08b7c9f9fb5fd7
Parents: 2cd52ba
Author: Wes McKinney <we...@cloudera.com>
Authored: Fri Jan 15 17:36:51 2016 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Sat Jan 16 21:25:21 2016 +0000
----------------------------------------------------------------------
build-support/jenkins/build-and-test.sh | 6 +-
python/.gitignore | 43 ++
python/MANIFEST.in | 13 +
python/Makefile | 21 +
python/README.md | 12 +
python/kudu/__init__.pxd | 17 +
python/kudu/__init__.py | 98 ++-
python/kudu/client.pxd | 537 ---------------
python/kudu/client.pyx | 938 ++++++++++++++++++---------
python/kudu/compat.py | 86 +++
python/kudu/errors.pxd | 20 +
python/kudu/errors.pyx | 62 ++
python/kudu/libkudu_client.pxd | 606 +++++++++++++++++
python/kudu/schema.pxd | 59 ++
python/kudu/schema.pyx | 545 ++++++++++++++++
python/kudu/tests/common.py | 147 +++++
python/kudu/tests/test_client.py | 189 ++++++
python/kudu/tests/test_kudu.py | 325 ----------
python/kudu/tests/test_scanner.py | 102 +++
python/kudu/tests/test_schema.py | 182 ++++++
python/kudu/util.py | 21 +
python/requirements.txt | 5 +-
python/setup.cfg | 2 +
python/setup.py | 112 +++-
24 files changed, 2936 insertions(+), 1212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/build-support/jenkins/build-and-test.sh
----------------------------------------------------------------------
diff --git a/build-support/jenkins/build-and-test.sh b/build-support/jenkins/build-and-test.sh
index 4f9c640..1ab4679 100755
--- a/build-support/jenkins/build-and-test.sh
+++ b/build-support/jenkins/build-and-test.sh
@@ -328,9 +328,9 @@ if [ "$BUILD_PYTHON" == "1" ]; then
# Assuming we run this script from base dir
python setup.py build_ext
set +e
- python setup.py nosetests --with-xunit \
- --xunit-file=$KUDU_HOME/build/test-logs/python_client.xml 2> \
- $KUDU_HOME/build/test-logs/python_client.log || EXIT_STATUS=$?
+ python setup.py test \
+ --addopts="kudu --junit-xml=$KUDU_HOME/build/test-logs/python_client.xml" \
+ 2> $KUDU_HOME/build/test-logs/python_client.log || EXIT_STATUS=$?
fi
set -e
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/.gitignore
----------------------------------------------------------------------
diff --git a/python/.gitignore b/python/.gitignore
new file mode 100644
index 0000000..921484a
--- /dev/null
+++ b/python/.gitignore
@@ -0,0 +1,43 @@
+# Copyright 2016 Cloudera, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Editor temporary/working/backup files
+*flymake*
+
+# Compiled source and in-place build files
+*.py[ocd]
+*.so
+.build_cache_dir
+.cache
+.eggs
+MANIFEST
+
+# Generated sources
+*.c
+*.cpp
+*.cmake
+# Python files
+
+# setup.py working directory
+build
+# setup.py dist directory
+dist
+# Egg metadata
+*.egg-info
+# coverage
+.coverage
+coverage.xml
+
+# automatically generated during local development
+kudu/version.py
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/MANIFEST.in
----------------------------------------------------------------------
diff --git a/python/MANIFEST.in b/python/MANIFEST.in
new file mode 100644
index 0000000..a6e88d6
--- /dev/null
+++ b/python/MANIFEST.in
@@ -0,0 +1,13 @@
+include MANIFEST.in
+include ../LICENSE.txt
+include README.md
+include setup.py
+
+graft kudu
+
+global-exclude *.so
+global-exclude *.pyc
+global-exclude *~
+global-exclude \#*
+global-exclude .git*
+global-exclude .DS_Store
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/Makefile
----------------------------------------------------------------------
diff --git a/python/Makefile b/python/Makefile
new file mode 100644
index 0000000..3fdd0b6
--- /dev/null
+++ b/python/Makefile
@@ -0,0 +1,21 @@
+# Copyright 2016 Cloudera, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+develop:
+ python setup.py build_ext --inplace
+
+all: develop
+
+clean-pyc:
+ find . -name "*.pyc" -exec rm -rf {} \;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/README.md
----------------------------------------------------------------------
diff --git a/python/README.md b/python/README.md
new file mode 100644
index 0000000..54791c2
--- /dev/null
+++ b/python/README.md
@@ -0,0 +1,12 @@
+## kudu-python: Python interface to the Apache Kudu (incubating) C++ Client API
+
+Using this package requires that you install the Kudu C++ client libraries and
+headers. See http://getkudu.io for more.
+
+To install from PyPI, run
+
+```
+pip install kudu-python
+```
+
+Installation from source requires Cython.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/__init__.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/__init__.pxd b/python/kudu/__init__.pxd
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/python/kudu/__init__.pxd
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/__init__.py
----------------------------------------------------------------------
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index ca120a5..e51f90f 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -15,7 +15,99 @@
# specific language governing permissions and limitations
# under the License.
-from kudu.client import *
+from kudu.client import (Client, Table, Scanner, Session, # noqa
+ Insert, Update, Delete, Predicate,
+ TimeDelta, KuduError,
+ FLUSH_AUTO_BACKGROUND,
+ FLUSH_AUTO_SYNC,
+ FLUSH_MANUAL)
-def schema_from_list(columns, num_key_columns):
- return Schema.create(columns, num_key_columns)
+from kudu.errors import (KuduException, KuduBadStatus, KuduNotFound, # noqa
+ KuduNotSupported,
+ KuduInvalidArgument)
+
+from kudu.schema import (int8, int16, int32, int64, string_ as string, # noqa
+ double_ as double, float_, binary,
+ timestamp,
+ KuduType,
+ SchemaBuilder, ColumnSpec, Schema, ColumnSchema,
+ COMPRESSION_DEFAULT,
+ COMPRESSION_NONE,
+ COMPRESSION_SNAPPY,
+ COMPRESSION_LZ4,
+ COMPRESSION_ZLIB,
+ ENCODING_AUTO,
+ ENCODING_PLAIN,
+ ENCODING_PREFIX,
+ ENCODING_GROUP_VARINT,
+ ENCODING_RLE)
+
+
+def connect(host, port, admin_timeout_ms=None, rpc_timeout_ms=None):
+ """
+ Connect to a Kudu master server
+
+ Parameters
+ ----------
+ host : string
+ Server address of master
+ port : int
+ Server port
+ admin_timeout_ms : int, optional
+ Admin timeout in milliseconds
+ rpc_timeout_ms : int, optional
+ RPC timeout in milliseconds
+
+ Returns
+ -------
+ client : kudu.Client
+ """
+ addr = '{0}:{1}'.format(host, port)
+ return Client(addr, admin_timeout_ms=admin_timeout_ms,
+ rpc_timeout_ms=rpc_timeout_ms)
+
+
+def timedelta(seconds=0, millis=0, micros=0, nanos=0):
+ """
+ Construct a Kudu TimeDelta to set timeouts, etc. Use this function instead
+ of interacting with the TimeDelta class yourself.
+
+ Returns
+ -------
+ delta : kudu.client.TimeDelta
+ """
+ from kudu.compat import long
+ # TimeDelta is a wrapper for kudu::MonoDelta
+ total_ns = (long(0) + seconds * long(1000000000) +
+ millis * long(1000000) + micros * long(1000) + nanos)
+ return TimeDelta.from_nanos(total_ns)
+
+
+def schema_builder():
+ """
+ Create a kudu.SchemaBuilder instance
+
+ Examples
+ --------
+ builder = kudu.schema_builder()
+ builder.add_column('key1', kudu.int64, nullable=False)
+ builder.add_column('key2', kudu.int32, nullable=False)
+
+ (builder.add_column('name', kudu.string)
+ .nullable()
+ .compression('lz4'))
+
+ builder.add_column('value1', kudu.double)
+ builder.add_column('value2', kudu.int8, encoding='rle')
+ builder.set_primary_keys(['key1', 'key2'])
+
+ schema = builder.build()
+
+ Returns
+ -------
+ builder : SchemaBuilder
+ """
+ return SchemaBuilder()
+
+
+from .version import version as __version__ # noqa
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/client.pxd b/python/kudu/client.pxd
deleted file mode 100644
index 73a1724..0000000
--- a/python/kudu/client.pxd
+++ /dev/null
@@ -1,537 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# distutils: language = c++
-
-from libc.stdint cimport *
-from libcpp cimport bool
-from libcpp.string cimport string
-from libcpp.vector cimport vector
-
-# This must be included for cerr and other things to work
-cdef extern from "<iostream>":
- pass
-
-#----------------------------------------------------------------------
-# Smart pointers and such
-
-cdef extern from "<tr1/memory>" namespace "std::tr1" nogil:
-
- cdef cppclass shared_ptr[T]:
- T* get()
- void reset()
- void reset(T* p)
-
-cdef extern from "kudu/util/status.h" namespace "kudu" nogil:
-
- # We can later add more of the common status factory methods as needed
- cdef Status Status_OK "Status::OK"()
-
- cdef cppclass Status:
- Status()
-
- string ToString()
-
- bool ok()
- bool IsNotFound()
- bool IsCorruption()
- bool IsNotSupported()
- bool IsIOError()
- bool IsInvalidArgument()
- bool IsAlreadyPresent()
- bool IsRuntimeError()
- bool IsNetworkError()
- bool IsIllegalState()
- bool IsNotAuthorized()
- bool IsAborted()
-
-
-cdef extern from "kudu/util/monotime.h" namespace "kudu" nogil:
-
- # These classes are not yet needed directly but will need to be completed
- # from the C++ API
- cdef cppclass MonoDelta:
- pass
-
- cdef cppclass MonoTime:
- pass
-
-
-cdef extern from "kudu/client/schema.h" namespace "kudu::client" nogil:
-
- enum DataType" kudu::client::KuduColumnSchema::DataType":
- KUDU_UINT8 " kudu::client::KuduColumnSchema::UINT8"
- KUDU_INT8 " kudu::client::KuduColumnSchema::INT8"
- KUDU_UINT16 " kudu::client::KuduColumnSchema::UINT16"
- KUDU_INT16 " kudu::client::KuduColumnSchema::INT16"
- KUDU_UINT32 " kudu::client::KuduColumnSchema::UINT32"
- KUDU_INT32 " kudu::client::KuduColumnSchema::INT32"
- KUDU_UINT64 " kudu::client::KuduColumnSchema::UINT64"
- KUDU_INT64 " kudu::client::KuduColumnSchema::INT64"
- KUDU_STRING " kudu::client::KuduColumnSchema::STRING"
- KUDU_BOOL " kudu::client::KuduColumnSchema::BOOL"
- KUDU_FLOAT " kudu::client::KuduColumnSchema::FLOAT"
- KUDU_DOUBLE " kudu::client::KuduColumnSchema::DOUBLE"
-
- enum EncodingType" kudu::client::KuduColumnStorageAttributes::EncodingType":
- ENCODING_AUTO " kudu::client::KuduColumnStorageAttributes::AUTO_ENCODING"
- ENCODING_PLAIN " kudu::client::KuduColumnStorageAttributes::PLAIN_ENCODING"
- ENCODING_PREFIX " kudu::client::KuduColumnStorageAttributes::PREFIX_ENCODING"
- ENCODING_GROUP_VARINT " kudu::client::KuduColumnStorageAttributes::GROUP_VARINT"
- ENCODING_RLE " kudu::client::KuduColumnStorageAttributes::RLE"
-
- enum CompressionType" kudu::client::KuduColumnStorageAttributes::CompressionType":
- COMPRESSION_DEFAULT " kudu::client::KuduColumnStorageAttributes::DEFAULT_COMPRESSION"
- COMPRESSION_NONE " kudu::client::KuduColumnStorageAttributes::NO_COMPRESSION"
- COMPRESSION_SNAPPY " kudu::client::KuduColumnStorageAttributes::SNAPPY"
- COMPRESSION_LZ4 " kudu::client::KuduColumnStorageAttributes::LZ4"
- COMPRESSION_ZLIB " kudu::client::KuduColumnStorageAttributes::ZLIB"
-
- cdef struct KuduColumnStorageAttributes:
- KuduColumnStorageAttributes()
-
- EncodingType encoding
- CompressionType compression
- string ToString()
-
- cdef cppclass KuduColumnSchema:
- KuduColumnSchema(KuduColumnSchema& other)
- KuduColumnSchema(string& name, DataType type)
- KuduColumnSchema(string& name, DataType type, bool is_nullable)
- KuduColumnSchema(string& name, DataType type, bool is_nullable,
- const void* default_value)
-
- string& name()
- bool is_nullable()
- DataType type()
-
- bool Equals(KuduColumnSchema& other)
- void CopyFrom(KuduColumnSchema& other)
-
- cdef cppclass KuduSchema:
- KuduSchema()
- KuduSchema(vector[KuduColumnSchema]& columns, int key_columns)
-
- Status Reset(vector[KuduColumnSchema]& columns, int key_columns)
-
- bool Equals(KuduSchema& other)
- KuduColumnSchema Column(size_t idx)
- KuduSchema CreateKeyProjection()
- size_t num_columns()
-
-
-cdef extern from "kudu/client/row_result.h" namespace "kudu::client" nogil:
-
- cdef cppclass KuduRowResult:
- bool IsNull(Slice& col_name)
- bool IsNull(int col_idx)
-
- # These getters return a bad Status if the type does not match,
- # the value is unset, or the value is NULL. Otherwise they return
- # the current set value in *val.
- Status GetBool(Slice& col_name, bool* val)
-
- Status GetInt8(Slice& col_name, int8_t* val)
- Status GetInt16(Slice& col_name, int16_t* val)
- Status GetInt32(Slice& col_name, int32_t* val)
- Status GetInt64(Slice& col_name, int64_t* val)
-
- Status GetUInt8(Slice& col_name, uint8_t* val)
- Status GetUInt16(Slice& col_name, uint16_t* val)
- Status GetUInt32(Slice& col_name, uint32_t* val)
- Status GetUInt64(Slice& col_name, uint64_t* val)
-
- Status GetBool(int col_idx, bool* val)
-
- Status GetInt8(int col_idx, int8_t* val)
- Status GetInt16(int col_idx, int16_t* val)
- Status GetInt32(int col_idx, int32_t* val)
- Status GetInt64(int col_idx, int64_t* val)
-
- Status GetUInt8(int col_idx, uint8_t* val)
- Status GetUInt16(int col_idx, uint16_t* val)
- Status GetUInt32(int col_idx, uint32_t* val)
- Status GetUInt64(int col_idx, uint64_t* val)
-
- Status GetString(Slice& col_name, Slice* val)
- Status GetString(int col_idx, Slice* val)
-
- Status GetFloat(Slice& col_name, float* val)
- Status GetFloat(int col_idx, float* val)
-
- Status GetDouble(Slice& col_name, double* val)
- Status GetDouble(int col_idx, double* val)
-
- const void* cell(int col_idx)
- string ToString()
-
-
-cdef extern from "kudu/util/slice.h" namespace "kudu" nogil:
-
- cdef cppclass Slice:
- Slice()
- Slice(const uint8_t* data, size_t n)
- Slice(const char* data, size_t n)
-
- Slice(string& s)
- Slice(const char* s)
-
- # Many other constructors have been omitted; we can return and add them
- # as needed for the code generation.
-
- const uint8_t* data()
- uint8_t* mutable_data()
- size_t size()
- bool empty()
-
- uint8_t operator[](size_t n)
-
- void clear()
- void remove_prefix(size_t n)
- void truncate(size_t n)
-
- Status check_size(size_t expected_size)
-
- string ToString()
-
- string ToDebugString()
- string ToDebugString(size_t max_len)
-
- int compare(Slice& b)
-
- bool starts_with(Slice& x)
-
- void relocate(uint8_t* d)
-
- # Many other API methods omitted
-
-
-cdef extern from "kudu/common/partial_row.h" namespace "kudu" nogil:
-
- cdef cppclass KuduPartialRow:
- # Schema must not be garbage-collected
- # KuduPartialRow(const Schema* schema)
-
- #----------------------------------------------------------------------
- # Setters
-
- # Slice setters
- Status SetBool(Slice& col_name, bool val)
-
- Status SetInt8(Slice& col_name, int8_t val)
- Status SetInt16(Slice& col_name, int16_t val)
- Status SetInt32(Slice& col_name, int32_t val)
- Status SetInt64(Slice& col_name, int64_t val)
-
- Status SetUInt8(Slice& col_name, uint8_t val)
- Status SetUInt16(Slice& col_name, uint16_t val)
- Status SetUInt32(Slice& col_name, uint32_t val)
- Status SetUInt64(Slice& col_name, uint64_t val)
-
- Status SetDouble(Slice& col_name, double val)
- Status SetFloat(Slice& col_name, float val)
-
- # Integer setters
- Status SetBool(int col_idx, bool val)
-
- Status SetInt8(int col_idx, int8_t val)
- Status SetInt16(int col_idx, int16_t val)
- Status SetInt32(int col_idx, int32_t val)
- Status SetInt64(int col_idx, int64_t val)
-
- Status SetUInt8(int col_idx, uint8_t val)
- Status SetUInt16(int col_idx, uint16_t val)
- Status SetUInt32(int col_idx, uint32_t val)
- Status SetUInt64(int col_idx, uint64_t val)
-
- Status SetDouble(int col_idx, double val)
- Status SetFloat(int col_idx, float val)
-
- # Set, but does not copy string
- Status SetString(Slice& col_name, Slice& val)
- Status SetString(int col_idx, Slice& val)
-
- Status SetStringCopy(Slice& col_name, Slice& val)
- Status SetStringCopy(int col_idx, Slice& val)
-
- Status SetNull(Slice& col_name)
- Status SetNull(int col_idx)
-
- Status Unset(Slice& col_name)
- Status Unset(int col_idx)
-
- #----------------------------------------------------------------------
- # Getters
-
- bool IsColumnSet(Slice& col_name)
- bool IsColumnSet(int col_idx)
-
- bool IsNull(Slice& col_name)
- bool IsNull(int col_idx)
-
- Status GetBool(Slice& col_name, bool* val)
-
- Status GetInt8(Slice& col_name, int8_t* val)
- Status GetInt16(Slice& col_name, int16_t* val)
- Status GetInt32(Slice& col_name, int32_t* val)
- Status GetInt64(Slice& col_name, int64_t* val)
-
- Status GetUInt8(Slice& col_name, uint8_t* val)
- Status GetUInt16(Slice& col_name, uint16_t* val)
- Status GetUInt32(Slice& col_name, uint32_t* val)
- Status GetUInt64(Slice& col_name, uint64_t* val)
-
- Status GetDouble(Slice& col_name, double* val)
- Status GetFloat(Slice& col_name, float* val)
-
- Status GetBool(int col_idx, bool* val)
-
- Status GetInt8(int col_idx, int8_t* val)
- Status GetInt16(int col_idx, int16_t* val)
- Status GetInt32(int col_idx, int32_t* val)
- Status GetInt64(int col_idx, int64_t* val)
-
- Status GetUInt8(int col_idx, uint8_t* val)
- Status GetUInt16(int col_idx, uint16_t* val)
- Status GetUInt32(int col_idx, uint32_t* val)
- Status GetUInt64(int col_idx, uint64_t* val)
-
- Status GetDouble(int col_idx, double* val)
- Status GetFloat(int col_idx, float* val)
-
- # Gets the string but does not copy the value. Callers should
- # copy the resulting Slice if necessary.
- Status GetString(Slice& col_name, Slice* val)
- Status GetString(int col_idx, Slice* val)
-
- # Return true if all of the key columns have been specified
- # for this mutation.
- bool IsKeySet()
-
- # Return true if all columns have been specified.
- bool AllColumnsSet()
- string ToString()
-
- # This relied on a forward declaration of Schema, but we don't want to
- # include the header file here at the moment.
-
- # Schema* schema()
-
-
-cdef extern from "kudu/client/write_op.h" namespace "kudu::client" nogil:
-
- enum WriteType" kudu::client::KuduWriteOperation::Type":
- INSERT " kudu::client::KuduWriteOperation::INSERT"
- UPDATE " kudu::client::KuduWriteOperation::UPDATE"
- DELETE " kudu::client::KuduWriteOperation::DELETE"
-
- cdef cppclass KuduWriteOperation:
- KuduPartialRow& row()
- KuduPartialRow* mutable_row()
-
- # This is a pure virtual function implemented on each of the cppclass
- # subclasses
- string ToString()
-
- # Also a pure virtual
- WriteType type()
-
- cdef cppclass KuduInsert(KuduWriteOperation):
- pass
-
- cdef cppclass KuduDelete(KuduWriteOperation):
- pass
-
- cdef cppclass KuduUpdate(KuduWriteOperation):
- pass
-
-
-cdef extern from "kudu/client/scan_predicate.h" namespace "kudu::client" nogil:
- enum ComparisonOp" kudu::client::KuduPredicate::ComparisonOp":
- KUDU_LESS_EQUAL " kudu::client::KuduPredicate::LESS_EQUAL"
- KUDU_GREATER_EQUAL " kudu::client::KuduPredicate::GREATER_EQUAL"
- KUDU_EQUAL " kudu::client::KuduPredicate::EQUAL"
-
- cdef cppclass KuduPredicate:
- pass
-
-
-cdef extern from "kudu/client/value.h" namespace "kudu::client" nogil:
-
- cdef cppclass KuduValue:
- @staticmethod
- KuduValue* FromInt(int64_t val);
-
- @staticmethod
- KuduValue* CopyString(const Slice& s);
-
-
-cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
-
- # Omitted KuduClient::ReplicaSelection enum
-
- cdef cppclass KuduClient:
-
- Status DeleteTable(string& table_name)
- Status OpenTable(string& table_name, shared_ptr[KuduTable]* table)
- Status GetTableSchema(string& table_name, KuduSchema* schema)
-
- KuduTableCreator* NewTableCreator()
- Status IsCreateTableInProgress(string& table_name,
- bool* create_in_progress)
-
- KuduTableAlterer* NewTableAlterer()
- Status IsAlterTableInProgress(string& table_name,
- bool* alter_in_progress)
-
- shared_ptr[KuduSession] NewSession()
-
- vector[string]& master_server_addrs()
-
- cdef cppclass KuduClientBuilder:
- KuduClientBuilder()
- KuduClientBuilder& master_server_addrs(vector[string]& addrs)
- KuduClientBuilder& master_server_addr(string& addr)
-
- Status Build(shared_ptr[KuduClient]* client)
-
- cdef cppclass KuduTableCreator:
- KuduTableCreator& table_name(string& name)
- KuduTableCreator& schema(KuduSchema* name)
- KuduTableCreator& split_keys(vector[string]& keys)
- KuduTableCreator& num_replicas(int n_replicas)
- KuduTableCreator& wait(bool wait)
-
- Status Create()
-
- cdef cppclass KuduTableAlterer:
- # The name of the existing table to alter
- KuduTableAlterer& table_name(string& name)
-
- KuduTableAlterer& rename_table(string& name)
-
- KuduTableAlterer& add_column(string& name, DataType type,
- const void *default_value)
- KuduTableAlterer& add_column(string& name, DataType type,
- const void *default_value,
- KuduColumnStorageAttributes attr)
-
- KuduTableAlterer& add_nullable_column(string& name, DataType type)
-
- KuduTableAlterer& drop_column(string& name)
-
- KuduTableAlterer& rename_column(string& old_name, string& new_name)
-
- KuduTableAlterer& wait(bool wait)
-
- Status Alter()
-
- # Instances of KuduTable are not directly instantiated by users of the
- # client.
- cdef cppclass KuduTable:
-
- string& name()
- KuduSchema& schema()
-
- KuduInsert* NewInsert()
- KuduUpdate* NewUpdate()
- KuduDelete* NewDelete()
-
- KuduPredicate* NewComparisonPredicate(const Slice& col_name,
- ComparisonOp op,
- KuduValue* value);
-
- KuduClient* client()
-
- enum FlushMode" kudu::client::KuduSession::FlushMode":
- AUTO_FLUSH_SYNC " kudu::client::KuduSession::AUTO_FLUSH_SYNC"
- AUTO_FLUSH_BACKGROUND " kudu::client::KuduSession::AUTO_FLUSH_BACKGROUND"
- MANUAL_FLUSH " kudu::client::KuduSession::MANUAL_FLUSH"
-
- cdef cppclass KuduSession:
-
- Status SetFlushMode(FlushMode m)
-
- void SetMutationBufferSpace(size_t size)
- void SetTimeoutMillis(int millis)
-
- void SetPriority(int priority)
-
- Status Apply(KuduWriteOperation* write_op)
- Status Apply(KuduInsert* write_op)
- Status Apply(KuduUpdate* write_op)
- Status Apply(KuduDelete* write_op)
-
- # This is thread-safe
- Status Flush()
-
- # TODO: Will need to decide on a strategy for exposing the session's
- # async API to Python
-
- # Status ApplyAsync(KuduWriteOperation* write_op,
- # KuduStatusCallback cb)
- # Status ApplyAsync(KuduInsert* write_op,
- # KuduStatusCallback cb)
- # Status ApplyAsync(KuduUpdate* write_op,
- # KuduStatusCallback cb)
- # Status ApplyAsync(KuduDelete* write_op,
- # KuduStatusCallback cb)
- # void FlushAsync(KuduStatusCallback& cb)
-
-
- Status Close()
- bool HasPendingOperations()
- int CountBufferedOperations()
-
- int CountPendingErrors()
- void GetPendingErrors(vector[C_KuduError*]* errors, bool* overflowed)
-
- KuduClient* client()
-
- enum ReadMode" kudu::client::KuduScanner::ReadMode":
- READ_LATEST " kudu::client::KuduScanner::READ_LATEST"
- READ_AT_SNAPSHOT " kudu::client::KuduScanner::READ_AT_SNAPSHOT"
-
- cdef cppclass KuduScanner:
- KuduScanner(KuduTable* table)
-
- Status AddConjunctPredicate(KuduPredicate* pred)
-
- Status Open()
- void Close()
-
- bool HasMoreRows()
- Status NextBatch(vector[KuduRowResult]* rows)
- Status SetBatchSizeBytes(uint32_t batch_size)
-
- # Pending definition of ReplicaSelection enum
- # Status SetSelection(ReplicaSelection selection)
-
- Status SetReadMode(ReadMode read_mode)
- Status SetSnapshot(uint64_t snapshot_timestamp_micros)
- Status SetTimeoutMillis(int millis)
-
- string ToString()
-
- cdef cppclass C_KuduError " kudu::client::KuduError":
-
- Status& status()
-
- KuduWriteOperation& failed_op()
- KuduWriteOperation* release_failed_op()
-
- bool was_possibly_successful()
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index ec2c1cf..a6e2023 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -18,75 +18,198 @@
# distutils: language = c++
# cython: embedsignature = True
-from client cimport *
-
from libcpp.string cimport string
-from libcpp.map cimport map
+from libcpp cimport bool as c_bool
cimport cpython
from cython.operator cimport dereference as deref
-import __builtin__
+from libkudu_client cimport *
-BOOL = KUDU_BOOL
-STRING = KUDU_STRING
+from kudu.compat import tobytes, frombytes
+from kudu.schema cimport Schema, ColumnSchema
+from kudu.errors cimport check_status
+from errors import KuduException
-INT8 = KUDU_INT8
-INT16 = KUDU_INT16
-INT32 = KUDU_INT32
-INT64 = KUDU_INT64
+import six
-FLOAT = KUDU_FLOAT
-DOUBLE = KUDU_DOUBLE
-LESS_EQUAL = KUDU_LESS_EQUAL
-GREATER_EQUAL = KUDU_GREATER_EQUAL
-EQUAL = KUDU_EQUAL
+cdef class TimeDelta:
+ """
+ Wrapper interface for kudu MonoDelta class, which is used to specify
+ timedeltas for timeouts and other uses.
+ """
-cdef dict _type_names = {
- INT8: 'int8',
- INT16: 'int16',
- INT32: 'int32',
- INT64: 'int64',
- STRING: 'string',
- BOOL: 'bool',
- FLOAT: 'float',
- DOUBLE: 'double'
-}
+ cdef:
+ MonoDelta delta
-cdef get_type_name(int dtype):
- return _type_names[dtype]
+ def __cinit__(self):
+ pass
+ @staticmethod
+ def from_seconds(seconds):
+ """
+ Construct a new TimeDelta from fractional seconds.
-class KuduException(Exception):
- pass
+ Parameters
+ ----------
+ seconds : double
+ Returns
+ -------
+ delta : TimeDelta
+ """
+ cdef TimeDelta result = TimeDelta()
+ result.init(MonoDelta.FromSeconds(seconds))
+ return result
-cdef raise_on_failure(Status* status):
- if not status.ok():
- # TODO: Implement an exception class hierarchy and raise the
- # appropriate type of exception from the common types offered by Status
- raise KuduException(status.ToString())
+ @staticmethod
+ def from_millis(int64_t ms):
+ """
+ Construct a new TimeDelta from integer milliseconds.
+ Parameters
+ ----------
+ ms : int
+
+ Returns
+ -------
+ delta : TimeDelta
+ """
+ cdef TimeDelta result = TimeDelta()
+ result.init(MonoDelta.FromMilliseconds(ms))
+ return result
+
+ @staticmethod
+ def from_micros(int64_t us):
+ """
+ Construct a new TimeDelta from integer microseconds.
+
+ Parameters
+ ----------
+ us : int
+
+ Returns
+ -------
+ delta : TimeDelta
+ """
+ cdef TimeDelta result = TimeDelta()
+ result.init(MonoDelta.FromMicroseconds(us))
+ return result
+
+ @staticmethod
+ def from_nanos(seconds):
+ """
+ Construct a new TimeDelta from integer nanoseconds.
+
+ Parameters
+ ----------
+ ns : int
+
+ Returns
+ -------
+ delta : TimeDelta
+ """
+ cdef TimeDelta result = TimeDelta()
+ result.init(MonoDelta.FromNanoseconds(seconds))
+ return result
+
+ cpdef double to_seconds(self):
+ """
+ Return timedelta as fractional seconds.
+ """
+ return self.delta.ToSeconds()
+
+ cpdef int64_t to_millis(self):
+ """
+ Return timedelta as exact milliseconds.
+ """
+ return self.delta.ToMilliseconds()
+
+ cpdef int64_t to_micros(self):
+ """
+ Return timedelta as exact microseconds.
+ """
+ return self.delta.ToMicroseconds()
+
+ cpdef int64_t to_nanos(self):
+ """
+ Return timedelta as exact nanoseconds.
+ """
+ return self.delta.ToNanoseconds()
+
+ cdef init(self, const MonoDelta& val):
+ self.delta = val
+
+ def __repr__(self):
+ cdef object as_string
+
+ if self.delta.Initialized():
+ as_string = self.delta.ToString()
+ return 'kudu.TimeDelta({0})'.format(as_string)
+ else:
+ return 'kudu.TimeDelta()'
+
+ def __richcmp__(TimeDelta self, TimeDelta other, int op):
+ if op == cpython.Py_EQ:
+ return self.delta.Equals(other.delta)
+ elif op == cpython.Py_NE:
+ return not self.delta.Equals(other.delta)
+ elif op == cpython.Py_LT:
+ return self.delta.LessThan(other.delta)
+ elif op == cpython.Py_LE:
+ return not self.delta.MoreThan(other.delta)
+ elif op == cpython.Py_GT:
+ return self.delta.MoreThan(other.delta)
+ elif op == cpython.Py_GE:
+ return not self.delta.LessThan(other.delta)
+ else:
+ raise ValueError('invalid operation: {0}'.format(op))
cdef class Client:
+
+ """
+ The primary class for interacting with a Kudu cluster. Can connect to one
+ or more Kudu master servers. Do not instantiate this class directly; use
+ kudu.connect instead.
+ """
+
cdef:
shared_ptr[KuduClient] client
KuduClient* cp
- def __cinit__(self, master_addr):
+ cdef readonly:
+ list master_addrs
+
+ def __cinit__(self, addr_or_addrs, admin_timeout_ms=None,
+ rpc_timeout_ms=None):
cdef:
- Status s
+ string c_addr
vector[string] c_addrs
+ KuduClientBuilder builder
+ TimeDelta timeout
+
+ if isinstance(addr_or_addrs, six.string_types):
+ addr_or_addrs = [addr_or_addrs]
+ elif not isinstance(addr_or_addrs, list):
+ addr_or_addrs = list(addr_or_addrs)
+
+ self.master_addrs = addr_or_addrs
+ for addr in addr_or_addrs:
+ c_addrs.push_back(tobytes(addr))
+
+ builder.master_server_addrs(c_addrs)
- c_addrs.push_back(master_addr)
- s = KuduClientBuilder()\
- .master_server_addrs(c_addrs)\
- .Build(&self.client)
+ if admin_timeout_ms is not None:
+ timeout = TimeDelta.from_millis(admin_timeout_ms)
+ builder.default_admin_operation_timeout(timeout.delta)
- if not s.ok():
- raise KuduException('Could not connect to master')
+ if rpc_timeout_ms is not None:
+ timeout = TimeDelta.from_millis(rpc_timeout_ms)
+ builder.default_rpc_timeout(timeout.delta)
+
+ check_status(builder.Build(&self.client))
# A convenience
self.cp = self.client.get()
@@ -94,63 +217,138 @@ cdef class Client:
def __dealloc__(self):
self.close()
+ property is_multimaster:
+
+ def __get__(self):
+ return self.cp.IsMultiMaster()
+
cpdef close(self):
# Nothing yet to clean up here
pass
def create_table(self, table_name, Schema schema):
+ """
+ Creates a new Kudu table from the passed Schema and options.
+
+ Parameters
+ ----------
+ table_name : string
+ schema : kudu.Schema
+ Create using kudu.schema_builder
+ """
cdef:
KuduTableCreator* c
Status s
c = self.cp.NewTableCreator()
try:
- s = (c.table_name(table_name)
- .schema(&schema.schema)
+ s = (c.table_name(tobytes(table_name))
+ .schema(schema.schema)
.Create())
- return s.ok()
+ check_status(s)
finally:
del c
def delete_table(self, table_name):
- cdef Status s = self.cp.DeleteTable(table_name)
- return s.ok()
+ """
+ Delete a Kudu table. Raises KuduNotFound if the table does not exist.
+
+ Parameters
+ ----------
+ table_name : string
+ """
+ check_status(self.cp.DeleteTable(tobytes(table_name)))
def table_exists(self, table_name):
- cdef:
- shared_ptr[KuduTable] table
- string c_name = table_name
-
- cdef Status s = self.cp.OpenTable(c_name, &table)
- if s.ok():
- return True
- elif s.IsNotFound():
- return False
- else:
- raise_on_failure(&s)
+ """Return True if the indicated table exists in the Kudu cluster.
- def open_table(self, table_name):
- """
+ Parameters
+ ----------
+ table_name : string
+
+ Returns
+ -------
+ exists : bool
"""
cdef:
- Table table = Table()
+ string c_name = tobytes(table_name)
+ c_bool exists
+
+ check_status(self.cp.TableExists(c_name, &exists))
+ return exists
- cdef Status s = self.cp.OpenTable(table_name, &table.table)
- raise_on_failure(&s)
+ def table(self, table_name):
+ """
+ Construct a kudu.Table and retrieve its schema from the cluster.
+
+ Raises KuduNotFound if the table does not exist.
- table.table_set()
+ Parameters
+ ----------
+ table_name : string
+ Returns
+ -------
+ table : kudu.Table
+ """
+ table_name = tobytes(table_name)
+ cdef Table table = Table(table_name, self)
+
+ check_status(self.cp.OpenTable(table_name, &table.table))
+ table.init()
return table
- def new_session(self, flush_mode='manual', timeout=5000):
+ def list_tables(self, match_substring=None):
"""
+ Retrieve a list of table names in the Kudu cluster with an optional
+ substring filter.
+
+ Parameters
+ ----------
+ match_substring : string, optional
+ If passed, the string must be exactly contained in the table names
+ Returns
+ -------
+ tables : list[string]
+ Table names returned from Kudu
+ """
+ cdef:
+ vector[string] tables
+ string c_match
+ size_t i
+
+ if match_substring is not None:
+ c_match = tobytes(match_substring)
+ check_status(self.cp.ListTables(&tables, c_match))
+ else:
+ check_status(self.cp.ListTables(&tables))
+
+ result = []
+ for i in range(tables.size()):
+ result.append(frombytes(tables[i]))
+ return result
+
+ def new_session(self, flush_mode='manual', timeout_ms=5000):
+ """
+ Create a new KuduSession for applying write operations.
+
+ Parameters
+ ----------
+ flush_mode : {'manual', 'sync', 'background'}, default 'manual'
+ See Session.set_flush_mode
+ timeout_ms : int, default 5000
+ Timeout in milliseconds
+
+ Returns
+ -------
+ session : kudu.Session
"""
cdef Session result = Session()
result.s = self.cp.NewSession()
result.set_flush_mode(flush_mode)
- result.set_timeout(timeout)
+ result.set_timeout_ms(timeout_ms)
return result
@@ -228,12 +426,13 @@ cdef class FloatVal(RawValue):
cdef class BoolVal(RawValue):
cdef:
- bool val
+ c_bool val
def __cinit__(self, obj):
- self.val = <bool> obj
+ self.val = <c_bool> obj
self.data = &self.val
+
cdef class StringVal(RawValue):
cdef:
# Python "str" object that was passed into the constructor.
@@ -257,211 +456,234 @@ cdef class StringVal(RawValue):
#----------------------------------------------------------------------
-cdef class ColumnSchema:
+cdef class Table:
+
"""
- Wraps a Kudu client ColumnSchema object
+ Represents a Kudu table, containing the schema and other tools. Create by
+ using the kudu.Client.table method after connecting to a cluster.
"""
+
cdef:
- KuduColumnSchema* schema
+ shared_ptr[KuduTable] table
- def __cinit__(self):
- self.schema = NULL
+ cdef readonly:
+ object _name
+ Schema schema
+ Client parent
- def __dealloc__(self):
- if self.schema != NULL:
- del self.schema
+ def __cinit__(self, name, Client client):
+ self._name = name
+ self.parent = client
- @classmethod
- def create(cls, name, typenum, is_nullable=False):
- cdef string c_name = name
+ # Users should not instantiate directly
+ self.schema = Schema()
- cdef ColumnSchema result = ColumnSchema()
+ cdef init(self):
+ # Called after the refptr has been populated
+ self.schema.schema = &self.ptr().schema()
+ self.schema.own_schema = 0
+ self.schema.parent = self
- # TODO: This can fail in numerous ways due to bad user input. The input
- # values should be validated / sanitized in some way.
- result.schema = new KuduColumnSchema(c_name, typenum, is_nullable)
+ def __len__(self):
+ # TODO: is this cheaply knowable?
+ raise NotImplementedError
- return result
+ def __getitem__(self, key):
+ spec = self.schema[key]
+ return Column(self, key, spec)
- cdef inline cast_pyvalue(self, object o):
- cdef DataType t = self.schema.type()
- if t == KUDU_BOOL:
- return BoolVal(o)
- elif t == KUDU_INT8:
- return Int8Val(o)
- elif t == KUDU_INT16:
- return Int16Val(o)
- elif t == KUDU_INT32:
- return Int32Val(o)
- elif t == KUDU_INT64:
- return Int64Val(o)
- elif t == KUDU_DOUBLE:
- return DoubleVal(o)
- elif t == KUDU_FLOAT:
- return FloatVal(o)
- elif t == KUDU_STRING:
- return StringVal(o)
- else:
- raise TypeError(t)
+ property name:
- property dtype:
def __get__(self):
- return self.schema.type()
+ return frombytes(self.ptr().name())
- property is_nullable:
- def __get__(self):
- return self.schema.is_nullable()
+ # XXX: don't love this name
+ property num_columns:
- property name:
def __get__(self):
- return self.schema.name()
+ return len(self.schema)
- def equals(self, other):
- if not isinstance(other, ColumnSchema):
- return False
- return self.schema.Equals(deref((<ColumnSchema> other).schema))
-
- def __repr__(self):
- return ('ColumnSchema(name=%s, type=%s, nullable=%s)'
- % (self.name, get_type_name(self.dtype),
- self.is_nullable))
+ def rename(self, new_name):
+ raise NotImplementedError
+ def drop(self):
+ raise NotImplementedError
+ def new_insert(self):
+ """
+ Create a new Insert operation. Pass the completed Insert to a Session.
-#----------------------------------------------------------------------
+ Returns
+ -------
+ insert : Insert
+ """
+ return Insert(self)
-cdef class Schema:
- cdef:
- KuduSchema schema
- int num_key_columns
+ def new_update(self):
+ """
+ Create a new Update operation. Pass the completed Update to a Session.
- def __cinit__(self):
- # Users should not call this directly
- pass
+ Returns
+ -------
+ update : Update
+ """
+ return Update(self)
- def repr(self):
- # Got to be careful with huge schemas, maybe some kind of summary repr
- # when more than 20-30 columns?
- pass
+ def new_delete(self):
+ """
+ Create a new Delete operation. Pass the completed Update to a Session.
- @classmethod
- def create(cls, columns, num_key_columns):
- cdef:
- ColumnSchema col
- Schema result = Schema()
- vector[KuduColumnSchema] vcols
+ Returns
+ -------
+ delete : Delete
+ """
+ return Delete(self)
- for col in columns:
- # This copies the KuduColumnSchema, but that may be OK.
- vcols.push_back(deref(col.schema))
+ def scanner(self):
+ """
+ Create a new scanner for this table for retrieving a selection of table
+ rows.
- result.num_key_columns = num_key_columns
- # TODO: don't use KuduSchema::Reset (deprecated)
- cdef Status s = result.schema.Reset(vcols, num_key_columns)
- raise_on_failure(&s)
+ Examples
+ --------
+ scanner = table.scanner()
+ scanner.add_predicate(table['key'] > 10)
+ scanner.open()
+ batch = scanner.read_all()
+ tuples = batch.as_tuples()
+ Returns
+ -------
+ scanner : kudu.Scanner
+ """
+ cdef Scanner result = Scanner(self)
+ result.scanner = new KuduScanner(self.ptr())
return result
- def __len__(self):
- return self.schema.num_columns()
+ cdef inline KuduTable* ptr(self):
+ return self.table.get()
- def at(self, i):
- cdef:
- ColumnSchema result = ColumnSchema()
- # TODO: boundschecking
- result.schema = new KuduColumnSchema(self.schema.Column(i))
- return result
+cdef class Column:
+ """
+ A reference to a Kudu table column intended to simplify creating predicates
+ and other column-specific operations.
-cdef class Table:
- cdef:
- shared_ptr[KuduTable] table
- const KuduSchema* schema
- map[string, int] _col_mapping
- bint _mapping_initialized
+ Write arithmetic comparisons to create new Predicate objects that can be
+ passed to a Scanner.
- def __cinit__(self):
- # Users should not instantiate directly
- self.schema = NULL
+ Examples
+ --------
+ scanner.add_predicate(table[col_name] <= 10)
+ """
+ cdef readonly:
+ object name
+ Table parent
+ ColumnSchema spec
- cdef table_set(self):
- # Called after the refptr has been populated
- self.schema = &self.table.get().schema()
- self._col_mapping.clear()
- self._mapping_initialized = 0
-
- cdef inline int get_loc(self, name) except -1:
- if not self._mapping_initialized:
- for i in range(self.schema.num_columns()):
- self._col_mapping[self.schema.Column(i).name()] = i
-
- # TODO: std::map is slightly verbose and inefficient here (O(lg n)
- # lookups), may consider replacing with a better / different hash table
- # should it become a performance bottleneck
- cdef map[string, int].iterator it = self._col_mapping.find(name)
- if it == self._col_mapping.end():
- raise KeyError(name)
- return self._col_mapping[name]
-
- cdef inline DataType loc_type(self, int i):
- return self.schema.Column(i).type()
-
- cdef ColumnSchema col_schema(self, int i):
- cdef ColumnSchema result = ColumnSchema()
- result.schema = new KuduColumnSchema(self.table.get()
- .schema().Column(i))
+ def __cinit__(self, Table parent, object name, ColumnSchema spec):
+ self.name = tobytes(name)
+ self.parent = parent
+ self.spec = spec
+
+ def __repr__(self):
+ result = ('Column({0}, parent={1}, type={2})'
+ .format(frombytes(self.name),
+ self.parent.name,
+ self.spec.type.name))
return result
- def __len__(self):
- # TODO: is this cheaply knowable?
- raise NotImplementedError
+ cdef KuduValue* box_value(self, object obj) except NULL:
+ cdef:
+ KuduValue* val
+ Slice* slc
- property name:
+ if isinstance(obj, unicode):
+ obj = obj.encode('utf8')
- def __get__(self):
- return self.table.get().name()
+ if isinstance(obj, bytes):
+ slc = new Slice(<char*> obj, len(obj))
+ val = KuduValue.CopyString(deref(slc))
+ del slc
+ elif isinstance(obj, int):
+ val = KuduValue.FromInt(obj)
+ elif isinstance(obj, float):
+ val = KuduValue.FromDouble(obj)
+ else:
+ raise TypeError(obj)
- # XXX: don't love this name
- property num_columns:
+ return val
- def __get__(self):
- return self.table.get().schema().num_columns()
+ def __richcmp__(Column self, value, int op):
+ cdef:
+ KuduPredicate* pred
+ KuduValue* val
+ Slice* col_name_slice
+ ComparisonOp cmp_op
+ Predicate result
- def column(self, i):
- return self.col_schema(i)
+ col_name_slice = new Slice(<char*> self.name,
+ len(self.name))
- def column_by_name(self, name):
- return self.col_schema(self.get_loc(name))
+ try:
+ if op == 1: # <=
+ cmp_op = KUDU_LESS_EQUAL
+ elif op == 2: # ==
+ cmp_op = KUDU_EQUAL
+ elif op == 5: # >=
+ cmp_op = KUDU_GREATER_EQUAL
+ else:
+ raise NotImplementedError
+
+ val = self.box_value(value)
+ pred = (self.parent.ptr()
+ .NewComparisonPredicate(deref(col_name_slice),
+ cmp_op, val))
+ finally:
+ del col_name_slice
- def rename(self, new_name):
- pass
+ result = Predicate()
+ result.init(pred)
- def rename_column(self):
- pass
+ return result
- def insert(self):
- return Insert(self)
- def update(self):
- return Update(self)
+cdef class Predicate:
- def delete(self):
- return Delete(self)
+ """
+ Wrapper for a KuduPredicate. Pass to Scanner.add_predicates
+ """
+
+ cdef:
+ KuduPredicate* pred
+
+ def __cinit__(self):
+ self.pred = NULL
+
+ def __dealloc__(self):
+ if self.pred != NULL:
+ del self.pred
+
+ cdef init(self, KuduPredicate* pred):
+ self.pred = pred
- def scanner(self):
- """
- Create a new scanner for this table
- """
- cdef Scanner result = Scanner(self)
- result.scanner = new KuduScanner(self.table.get())
- return result
+
+FLUSH_AUTO_SYNC = FlushMode_AutoSync
+FLUSH_AUTO_BACKGROUND = FlushMode_AutoBackground
+FLUSH_MANUAL = FlushMode_Manual
+
+cdef dict _flush_modes = {
+ 'manual': FlushMode_Manual,
+ 'sync': FlushMode_AutoSync,
+ 'background': FlushMode_AutoBackground
+}
cdef class Session:
"""
- Create a (fully local) session to build up operations to interact with the
- cluster.
+ Wrapper for a client KuduSession to build up write operations to interact
+ with the cluster.
"""
cdef:
shared_ptr[KuduSession] s
@@ -470,30 +692,60 @@ cdef class Session:
pass
def set_flush_mode(self, flush_mode='manual'):
+ """
+ Set the session operation flush mode
+
+ Parameters
+ ----------
+ flush_mode : {'manual', 'sync', 'background'}, default 'manual'
+ You can also use the constants FLUSH_MANUAL, FLUSH_AUTO_SYNC,
+ and FLUSH_AUTO_BACKGROUND
+ """
cdef Status status
+ cdef FlushMode fmode
- if flush_mode == 'manual':
- status = self.s.get().SetFlushMode(MANUAL_FLUSH)
- elif flush_mode == 'sync':
- status = self.s.get().SetFlushMode(AUTO_FLUSH_SYNC)
- elif flush_mode == 'background':
- status = self.s.get().SetFlushMode(AUTO_FLUSH_BACKGROUND)
+ if isinstance(flush_mode, int):
+ # todo: validation
+ fmode = <FlushMode> flush_mode
+ else:
+ try:
+ fmode = _flush_modes[flush_mode.lower()]
+ except KeyError:
+ raise ValueError('Invalid flush mode: {0}'
+ .format(flush_mode))
- raise_on_failure(&status)
+ status = self.s.get().SetFlushMode(fmode)
- def set_timeout(self, milliseconds):
- self.s.get().SetTimeoutMillis(5000)
+ check_status(status)
+
+ def set_timeout_ms(self, int64_t ms):
+ """
+ Set the session timeout in milliseconds
+ """
+ self.s.get().SetTimeoutMillis(ms)
def apply(self, WriteOperation op):
+ """
+ Apply the indicated write operation
+
+ Examples
+ --------
+ # Executes a single Insert operation
+ session = client.new_session()
+ op = table.new_insert()
+ op['key'] = 0
+ op['value1'] = 5
+ op['value2'] = 3.5
+ session.apply(op)
+ session.flush()
+ """
return op.add_to_session(self)
def flush(self):
- cdef Status status = self.s.get().Flush()
- raise_on_failure(&status)
- return True
-
- def flush_async(self):
- raise NotImplementedError
+ """
+ Flush pending operations
+ """
+ check_status(self.s.get().Flush())
def get_pending_errors(self):
"""
@@ -508,8 +760,8 @@ cdef class Session:
cdef:
KuduError error
vector[C_KuduError*] v_errors
- bool overflowed
- int i
+ c_bool overflowed
+ size_t i
self.s.get().GetPendingErrors(&v_errors, &overflowed)
@@ -523,6 +775,11 @@ cdef class Session:
cdef class Row:
+
+ """
+ A single row from a row batch
+ """
+
cdef:
# So we can access the schema information
Table table
@@ -540,7 +797,10 @@ cdef class Row:
def __dealloc__(self):
pass
- def as_tuple(self):
+ cdef tuple as_tuple(self):
+ """
+ Return the row as a Python tuple
+ """
cdef:
int i, k
tuple tup
@@ -555,60 +815,51 @@ cdef class Row:
return tup
cdef inline get_bool(self, int i):
- cdef bool val
- cdef Status s = self.row.GetBool(i, &val)
- raise_on_failure(&s)
+ cdef c_bool val
+ check_status(self.row.GetBool(i, &val))
# The built-in bool is masked by the libcpp typedef
- return __builtin__.bool(val)
+ return bool(val)
cdef inline get_int8(self, int i):
cdef int8_t val
- cdef Status s = self.row.GetInt8(i, &val)
- raise_on_failure(&s)
+ check_status(self.row.GetInt8(i, &val))
return val
cdef inline get_int16(self, int i):
cdef int16_t val
- cdef Status s = self.row.GetInt16(i, &val)
- raise_on_failure(&s)
+ check_status(self.row.GetInt16(i, &val))
return val
cdef inline get_int32(self, int i):
cdef int32_t val
- cdef Status s = self.row.GetInt32(i, &val)
- raise_on_failure(&s)
+ check_status(self.row.GetInt32(i, &val))
return val
cdef inline get_int64(self, int i):
cdef int64_t val
- cdef Status s = self.row.GetInt64(i, &val)
- raise_on_failure(&s)
+ check_status(self.row.GetInt64(i, &val))
return val
cdef inline get_double(self, int i):
cdef double val
- cdef Status s = self.row.GetDouble(i, &val)
- raise_on_failure(&s)
+ check_status(self.row.GetDouble(i, &val))
return val
cdef inline get_float(self, int i):
cdef float val
- cdef Status s = self.row.GetFloat(i, &val)
- raise_on_failure(&s)
+ check_status(self.row.GetFloat(i, &val))
return val
cdef inline get_string(self, int i):
cdef Slice val
- cdef Status s = self.row.GetString(i, &val)
- raise_on_failure(&s)
-
+ check_status(self.row.GetString(i, &val))
return cpython.PyBytes_FromStringAndSize(<char*> val.mutable_data(),
val.size())
cdef inline get_slot(self, int i):
cdef:
Status s
- DataType t = self.table.loc_type(i)
+ DataType t = self.table.schema.loc_type(i)
if t == KUDU_BOOL:
return self.get_bool(i)
@@ -625,14 +876,14 @@ cdef class Row:
elif t == KUDU_FLOAT:
return self.get_float(i)
elif t == KUDU_STRING:
- return self.get_string(i)
+ return frombytes(self.get_string(i))
else:
raise TypeError(t)
cdef class RowBatch:
"""
- Class holding a batch of rows
+ Class holding a batch of rows from a Scanner
"""
# This class owns the KuduRowResult data
cdef:
@@ -645,19 +896,26 @@ cdef class RowBatch:
def __len__(self):
return self.rows.size()
+ def __getitem__(self, i):
+ return self.get_row(i).as_tuple()
+
def __iter__(self):
- pass
+ cdef int i = 0
+ for i in range(len(self)):
+ yield self.get_row(i).as_tuple()
def as_tuples(self):
"""
- To simplify testing for the moment
+ Return RowBatch as a list of Python tuples
+
+ To simplify testing for the moment.
"""
- tuples = []
+ cdef list tuples = []
for i in range(self.rows.size()):
tuples.append(self.get_row(i).as_tuple())
return tuples
- cpdef get_row(self, i):
+ cdef Row get_row(self, i):
# TODO: boundscheck
# For safety, we need to increment the parent reference count and hold
@@ -672,42 +930,70 @@ cdef class RowBatch:
return row
+
cdef class Scanner:
+ """
+ A class for defining a selection of data we wish to scan out of a Kudu
+ table. Create a scanner using Table.scanner.
+ """
+
cdef:
Table table
KuduScanner* scanner
+ bint is_open
def __cinit__(self, Table table):
self.table = table
self.scanner = NULL
+ self.is_open = 0
def __dealloc__(self):
# We own this one
if self.scanner != NULL:
del self.scanner
- def add_comparison_predicate(self, col_name, op, value):
- cdef:
- KuduValue* val
- Slice* slc
- if type(col_name) != str:
- raise TypeError("column name must be a string")
+ cdef inline ensure_open(self):
+ if not self.is_open:
+ self.open()
- if type(value) == str:
- slc = new Slice(<char*> value, len(value))
- val = KuduValue.CopyString(deref(slc))
- del slc
- elif type(value) == int:
- val = KuduValue.FromInt(<int64_t> value)
- else:
- raise TypeError("unable to convert python type %s" % str(type(value)))
+ def add_predicates(self, preds):
+ """
+ Add a list of scan predicates to the scanner. Select columns from the
+ parent table and make comparisons to create predicates.
+
+ Examples
+ --------
+ c = table[col_name]
+ preds = [c >= 0, c <= 10]
+ scanner.add_predicates(preds)
+
+ Parameters
+ ----------
+ preds : list of Predicate
+ """
+ for pred in preds:
+ self.add_predicate(pred)
+
+ cpdef add_predicate(self, Predicate pred):
+ """
+ Add a scan predicates to the scanner. Select columns from the
+ parent table and make comparisons to create predicates.
+
+ Examples
+ --------
+ pred = table[col_name] <= 10
+ scanner.add_predicate(pred)
+
+ Parameters
+ ----------
+ pred : kudu.Predicate
+ """
+ cdef KuduPredicate* clone
- cdef Slice* col_name_slice = new Slice(<char*> col_name, len(col_name))
- cdef KuduPredicate* pred = self.table.table.get().NewComparisonPredicate(
- deref(col_name_slice), op, val)
- del col_name_slice
- cdef Status s = self.scanner.AddConjunctPredicate(pred)
- raise_on_failure(&s)
+ # We clone the KuduPredicate so that the Predicate wrapper class can be
+ # reused
+ clone = pred.pred.Clone()
+ check_status(self.scanner.AddConjunctPredicate(clone))
def open(self):
"""
@@ -717,28 +1003,40 @@ cdef class Scanner:
-------
self : Scanner
"""
- cdef Status s = self.scanner.Open()
- raise_on_failure(&s)
+ if not self.is_open:
+ check_status(self.scanner.Open())
+ self.is_open = 1
return self
def has_more_rows(self):
+ """
+ Returns True if there are more rows to be read.
+ """
return self.scanner.HasMoreRows()
- def read_all(self):
+ def read_all_tuples(self):
"""
Compute a RowBatch containing all rows from the scan operation (which
hopefully fit into memory, probably not handled gracefully at the
moment).
"""
- cdef RowBatch batch = RowBatch(self.table)
+ cdef list tuples = []
+ cdef RowBatch batch
+
+ self.ensure_open()
while self.has_more_rows():
- self.next_batch(batch)
+ batch = self.next_batch()
+ tuples.extend(batch.as_tuples())
- return batch
+ return tuples
+
+ def read_next_batch_tuples(self):
+ return self.next_batch().as_tuples()
- cpdef next_batch(self, RowBatch batch=None):
+ cdef RowBatch next_batch(self):
"""
+ Retrieve the next batch of rows from the scanner.
Returns
-------
@@ -747,15 +1045,18 @@ cdef class Scanner:
if not self.has_more_rows():
raise StopIteration
- if batch is None:
- batch = RowBatch(self.table)
- cdef Status status = self.scanner.NextBatch(&batch.rows)
- raise_on_failure(&status)
-
+ cdef RowBatch batch = RowBatch(self.table)
+ check_status(self.scanner.NextBatch(&batch.rows))
return batch
cdef class KuduError:
+
+ """
+ Wrapper for a C++ KuduError indicating a client error resulting from
+ applying operations in a session.
+ """
+
cdef:
C_KuduError* error
@@ -793,15 +1094,15 @@ cdef class WriteOperation:
cpdef set_field(self, key, value):
cdef:
- int i = self.table.get_loc(key)
- DataType t = self.table.loc_type(i)
+ int i = self.table.schema.get_loc(key)
+ DataType t = self.table.schema.loc_type(i)
cdef Slice* slc
# Leave it to Cython to do the coercion and complain if it doesn't
# work. Cython will catch many casting problems but we should verify
# with unit tests.
if t == KUDU_BOOL:
- self.row.SetBool(i, <bool> value)
+ self.row.SetBool(i, <c_bool> value)
elif t == KUDU_INT8:
self.row.SetInt8(i, <int8_t> value)
elif t == KUDU_INT16:
@@ -815,9 +1116,8 @@ cdef class WriteOperation:
elif t == KUDU_DOUBLE:
self.row.SetDouble(i, <double> value)
elif t == KUDU_STRING:
- # TODO: automatic conversion of unicode to UTF8
if not cpython.PyBytes_Check(value):
- raise TypeError('Only support byte strings for now')
+ value = value.encode('utf8')
# TODO: It would be much better not to heap-allocate a Slice object
slc = new Slice(cpython.PyBytes_AsString(value))
@@ -844,7 +1144,7 @@ cdef class Insert(WriteOperation):
KuduInsert* op
def __cinit__(self, Table table):
- self.op = self.table.table.get().NewInsert()
+ self.op = self.table.ptr().NewInsert()
self.row = self.op.mutable_row()
def __dealloc__(self):
@@ -854,7 +1154,7 @@ cdef class Insert(WriteOperation):
if self.applied:
raise Exception
- s.s.get().Apply(self.op)
+ check_status(s.s.get().Apply(self.op))
self.op = NULL
self.applied = 1
@@ -865,7 +1165,7 @@ cdef class Update(WriteOperation):
def __cinit__(self, Table table):
self.table = table
- self.op = table.table.get().NewUpdate()
+ self.op = table.ptr().NewUpdate()
self.row = self.op.mutable_row()
def __dealloc__(self):
@@ -881,7 +1181,7 @@ cdef class Delete(WriteOperation):
def __cinit__(self, Table table):
self.table = table
- self.op = table.table.get().NewDelete()
+ self.op = table.ptr().NewDelete()
self.row = self.op.mutable_row()
def __dealloc__(self):
@@ -891,14 +1191,28 @@ cdef class Delete(WriteOperation):
if self.applied:
raise Exception
- s.s.get().Apply(self.op)
+ check_status(s.s.get().Apply(self.op))
self.applied = 1
self.op = NULL
-cdef class Column:
- cdef:
- Table parent
- def __cinit__(self, parent):
- self.parent = parent
+cdef inline cast_pyvalue(DataType t, object o):
+ if t == KUDU_BOOL:
+ return BoolVal(o)
+ elif t == KUDU_INT8:
+ return Int8Val(o)
+ elif t == KUDU_INT16:
+ return Int16Val(o)
+ elif t == KUDU_INT32:
+ return Int32Val(o)
+ elif t == KUDU_INT64:
+ return Int64Val(o)
+ elif t == KUDU_DOUBLE:
+ return DoubleVal(o)
+ elif t == KUDU_FLOAT:
+ return FloatVal(o)
+ elif t == KUDU_STRING:
+ return StringVal(o)
+ else:
+ raise TypeError(t)
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/compat.py
----------------------------------------------------------------------
diff --git a/python/kudu/compat.py b/python/kudu/compat.py
new file mode 100644
index 0000000..2ac41ac
--- /dev/null
+++ b/python/kudu/compat.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# flake8: noqa
+
+import itertools
+
+import numpy as np
+
+import sys
+import six
+from six import BytesIO, StringIO, string_types as py_string
+
+
+PY26 = sys.version_info[:2] == (2, 6)
+PY2 = sys.version_info[0] == 2
+
+
+if PY26:
+ import unittest2 as unittest
+else:
+ import unittest
+
+
+if PY2:
+ import cPickle
+
+ try:
+ from cdecimal import Decimal
+ except ImportError:
+ from decimal import Decimal
+
+ unicode_type = unicode
+ lzip = zip
+ zip = itertools.izip
+
+ def dict_values(x):
+ return x.values()
+
+ range = xrange
+ long = long
+
+ def tobytes(o):
+ if isinstance(o, unicode):
+ return o.encode('utf8')
+ else:
+ return o
+
+ def frombytes(o):
+ return o
+else:
+ unicode_type = str
+ def lzip(*x):
+ return list(zip(*x))
+ long = int
+ zip = zip
+ def dict_values(x):
+ return list(x.values())
+ from decimal import Decimal
+ range = range
+
+ def tobytes(o):
+ if isinstance(o, str):
+ return o.encode('utf8')
+ else:
+ return o
+
+ def frombytes(o):
+ return o.decode('utf8')
+
+
+integer_types = six.integer_types + (np.integer,)
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/errors.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/errors.pxd b/python/kudu/errors.pxd
new file mode 100644
index 0000000..12cf13b
--- /dev/null
+++ b/python/kudu/errors.pxd
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from libkudu_client cimport *
+
+cdef check_status(const Status& status)
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/errors.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/errors.pyx b/python/kudu/errors.pyx
new file mode 100644
index 0000000..747d620
--- /dev/null
+++ b/python/kudu/errors.pyx
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+class KuduException(Exception):
+ pass
+
+
+class KuduBadStatus(KuduException):
+ """
+ A Kudu C++ client operation returned an error Status
+ """
+ pass
+
+
+class KuduNotFound(KuduBadStatus):
+ pass
+
+
+class KuduNotSupported(KuduBadStatus):
+ pass
+
+
+class KuduInvalidArgument(KuduBadStatus):
+ pass
+
+
+class KuduNotAuthorized(KuduBadStatus):
+ pass
+
+
+class KuduAborted(KuduBadStatus):
+ pass
+
+
+cdef check_status(const Status& status):
+ if status.ok():
+ return
+
+ cdef string c_message = status.message().ToString()
+
+ if status.IsNotFound():
+ raise KuduNotFound(c_message)
+ elif status.IsNotSupported():
+ raise KuduNotSupported(c_message)
+ elif status.IsInvalidArgument():
+ raise KuduInvalidArgument(c_message)
+ else:
+ raise KuduBadStatus(status.ToString())
[2/3] incubator-kudu git commit: [python client] - Expand C++ API
coverage and improve usability and documentation
Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/libkudu_client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
new file mode 100644
index 0000000..a44027e
--- /dev/null
+++ b/python/kudu/libkudu_client.pxd
@@ -0,0 +1,606 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from libc.stdint cimport *
+from libcpp cimport bool as c_bool
+from libcpp.string cimport string
+from libcpp.vector cimport vector
+
+# This must be included for cerr and other things to work
+cdef extern from "<iostream>":
+ pass
+
+#----------------------------------------------------------------------
+# Smart pointers and such
+
+cdef extern from "<tr1/memory>" namespace "std::tr1" nogil:
+
+ cdef cppclass shared_ptr[T]:
+ T* get()
+ void reset()
+ void reset(T* p)
+
+cdef extern from "kudu/util/status.h" namespace "kudu" nogil:
+
+ # We can later add more of the common status factory methods as needed
+ cdef Status Status_OK "Status::OK"()
+
+ cdef cppclass Status:
+ Status()
+
+ string ToString()
+
+ Slice message()
+
+ c_bool ok()
+ c_bool IsNotFound()
+ c_bool IsCorruption()
+ c_bool IsNotSupported()
+ c_bool IsIOError()
+ c_bool IsInvalidArgument()
+ c_bool IsAlreadyPresent()
+ c_bool IsRuntimeError()
+ c_bool IsNetworkError()
+ c_bool IsIllegalState()
+ c_bool IsNotAuthorized()
+ c_bool IsAborted()
+
+
+cdef extern from "kudu/util/monotime.h" namespace "kudu" nogil:
+
+ # These classes are not yet needed directly but will need to be completed
+ # from the C++ API
+ cdef cppclass MonoDelta:
+ MonoDelta()
+
+ @staticmethod
+ MonoDelta FromSeconds(double seconds)
+
+ @staticmethod
+ MonoDelta FromMilliseconds(int64_t ms)
+
+ @staticmethod
+ MonoDelta FromMicroseconds(int64_t us)
+
+ @staticmethod
+ MonoDelta FromNanoseconds(int64_t ns)
+
+ c_bool Initialized()
+ c_bool LessThan(const MonoDelta& other)
+ c_bool MoreThan(const MonoDelta& other)
+ c_bool Equals(const MonoDelta& other)
+
+ string ToString()
+
+ double ToSeconds()
+ int64_t ToMilliseconds()
+ int64_t ToMicroseconds()
+ int64_t ToNanoseconds()
+
+ # TODO, when needed
+ # void ToTimeVal(struct timeval *tv)
+ # void ToTimeSpec(struct timespec *ts)
+
+ # @staticmethod
+ # void NanosToTimeSpec(int64_t nanos, struct timespec* ts);
+
+
+ cdef cppclass MonoTime:
+ pass
+
+
+cdef extern from "kudu/client/schema.h" namespace "kudu::client" nogil:
+
+ enum DataType" kudu::client::KuduColumnSchema::DataType":
+ KUDU_INT8 " kudu::client::KuduColumnSchema::INT8"
+ KUDU_INT16 " kudu::client::KuduColumnSchema::INT16"
+ KUDU_INT32 " kudu::client::KuduColumnSchema::INT32"
+ KUDU_INT64 " kudu::client::KuduColumnSchema::INT64"
+ KUDU_STRING " kudu::client::KuduColumnSchema::STRING"
+ KUDU_BOOL " kudu::client::KuduColumnSchema::BOOL"
+ KUDU_FLOAT " kudu::client::KuduColumnSchema::FLOAT"
+ KUDU_DOUBLE " kudu::client::KuduColumnSchema::DOUBLE"
+ KUDU_BINARY " kudu::client::KuduColumnSchema::BINARY"
+ KUDU_TIMESTAMP " kudu::client::KuduColumnSchema::TIMESTAMP"
+
+ enum EncodingType" kudu::client::KuduColumnStorageAttributes::EncodingType":
+ EncodingType_AUTO " kudu::client::KuduColumnStorageAttributes::AUTO_ENCODING"
+ EncodingType_PLAIN " kudu::client::KuduColumnStorageAttributes::PLAIN_ENCODING"
+ EncodingType_PREFIX " kudu::client::KuduColumnStorageAttributes::PREFIX_ENCODING"
+ EncodingType_GROUP_VARINT " kudu::client::KuduColumnStorageAttributes::GROUP_VARINT"
+ EncodingType_RLE " kudu::client::KuduColumnStorageAttributes::RLE"
+
+ enum CompressionType" kudu::client::KuduColumnStorageAttributes::CompressionType":
+ CompressionType_DEFAULT " kudu::client::KuduColumnStorageAttributes::DEFAULT_COMPRESSION"
+ CompressionType_NONE " kudu::client::KuduColumnStorageAttributes::NO_COMPRESSION"
+ CompressionType_SNAPPY " kudu::client::KuduColumnStorageAttributes::SNAPPY"
+ CompressionType_LZ4 " kudu::client::KuduColumnStorageAttributes::LZ4"
+ CompressionType_ZLIB " kudu::client::KuduColumnStorageAttributes::ZLIB"
+
+ cdef struct KuduColumnStorageAttributes:
+ KuduColumnStorageAttributes()
+
+ EncodingType encoding
+ CompressionType compression
+ string ToString()
+
+ cdef cppclass KuduColumnSchema:
+ KuduColumnSchema(const KuduColumnSchema& other)
+ KuduColumnSchema(const string& name, DataType type)
+ KuduColumnSchema(const string& name, DataType type, c_bool is_nullable)
+ KuduColumnSchema(const string& name, DataType type, c_bool is_nullable,
+ const void* default_value)
+
+ string& name()
+ c_bool is_nullable()
+ DataType type()
+
+ c_bool Equals(KuduColumnSchema& other)
+ void CopyFrom(KuduColumnSchema& other)
+
+ cdef cppclass KuduSchema:
+ KuduSchema()
+ KuduSchema(vector[KuduColumnSchema]& columns, int key_columns)
+
+ c_bool Equals(const KuduSchema& other)
+ KuduColumnSchema Column(size_t idx)
+ size_t num_columns()
+
+ void GetPrimaryKeyColumnIndexes(vector[int]* indexes)
+
+ KuduPartialRow* NewRow()
+
+ cdef cppclass KuduColumnSpec:
+
+ KuduColumnSpec* Default(KuduValue* value)
+ KuduColumnSpec* RemoveDefault()
+
+ KuduColumnSpec* Compression(CompressionType compression)
+ KuduColumnSpec* Encoding(EncodingType encoding)
+ KuduColumnSpec* BlockSize(int32_t block_size)
+
+ KuduColumnSpec* PrimaryKey()
+ KuduColumnSpec* NotNull()
+ KuduColumnSpec* Nullable()
+ KuduColumnSpec* Type(DataType type_)
+
+ KuduColumnSpec* RenameTo(string& new_name)
+
+
+ cdef cppclass KuduSchemaBuilder:
+
+ KuduColumnSpec* AddColumn(string& name)
+ KuduSchemaBuilder* SetPrimaryKey(vector[string]& key_col_names);
+
+ Status Build(KuduSchema* schema)
+
+
+cdef extern from "kudu/client/row_result.h" namespace "kudu::client" nogil:
+
+ cdef cppclass KuduRowResult:
+ c_bool IsNull(Slice& col_name)
+ c_bool IsNull(int col_idx)
+
+ # These getters return a bad Status if the type does not match,
+ # the value is unset, or the value is NULL. Otherwise they return
+ # the current set value in *val.
+ Status GetBool(Slice& col_name, c_bool* val)
+
+ Status GetInt8(Slice& col_name, int8_t* val)
+ Status GetInt16(Slice& col_name, int16_t* val)
+ Status GetInt32(Slice& col_name, int32_t* val)
+ Status GetInt64(Slice& col_name, int64_t* val)
+
+ Status GetTimestamp(const Slice& col_name,
+ int64_t* micros_since_utc_epoch)
+
+ Status GetBool(int col_idx, c_bool* val)
+
+ Status GetInt8(int col_idx, int8_t* val)
+ Status GetInt16(int col_idx, int16_t* val)
+ Status GetInt32(int col_idx, int32_t* val)
+ Status GetInt64(int col_idx, int64_t* val)
+
+ Status GetString(Slice& col_name, Slice* val)
+ Status GetString(int col_idx, Slice* val)
+
+ Status GetFloat(Slice& col_name, float* val)
+ Status GetFloat(int col_idx, float* val)
+
+ Status GetDouble(Slice& col_name, double* val)
+ Status GetDouble(int col_idx, double* val)
+
+ Status GetBinary(const Slice& col_name, Slice* val)
+ Status GetBinary(int col_idx, Slice* val)
+
+ const void* cell(int col_idx)
+ string ToString()
+
+
+cdef extern from "kudu/util/slice.h" namespace "kudu" nogil:
+
+ cdef cppclass Slice:
+ Slice()
+ Slice(const uint8_t* data, size_t n)
+ Slice(const char* data, size_t n)
+
+ Slice(string& s)
+ Slice(const char* s)
+
+ # Many other constructors have been omitted; we can return and add them
+ # as needed for the code generation.
+
+ const uint8_t* data()
+ uint8_t* mutable_data()
+ size_t size()
+ c_bool empty()
+
+ uint8_t operator[](size_t n)
+
+ void clear()
+ void remove_prefix(size_t n)
+ void truncate(size_t n)
+
+ Status check_size(size_t expected_size)
+
+ string ToString()
+
+ string ToDebugString()
+ string ToDebugString(size_t max_len)
+
+ int compare(Slice& b)
+
+ c_bool starts_with(Slice& x)
+
+ void relocate(uint8_t* d)
+
+ # Many other API methods omitted
+
+
+cdef extern from "kudu/common/partial_row.h" namespace "kudu" nogil:
+
+ cdef cppclass KuduPartialRow:
+ # Schema must not be garbage-collected
+ # KuduPartialRow(const Schema* schema)
+
+ #----------------------------------------------------------------------
+ # Setters
+
+ # Slice setters
+ Status SetBool(Slice& col_name, c_bool val)
+
+ Status SetInt8(Slice& col_name, int8_t val)
+ Status SetInt16(Slice& col_name, int16_t val)
+ Status SetInt32(Slice& col_name, int32_t val)
+ Status SetInt64(Slice& col_name, int64_t val)
+
+ Status SetTimestamp(const Slice& col_name,
+ int64_t micros_since_utc_epoch)
+ Status SetTimestamp(int col_idx, int64_t micros_since_utc_epoch)
+
+ Status SetDouble(Slice& col_name, double val)
+ Status SetFloat(Slice& col_name, float val)
+
+ # Integer setters
+ Status SetBool(int col_idx, c_bool val)
+
+ Status SetInt8(int col_idx, int8_t val)
+ Status SetInt16(int col_idx, int16_t val)
+ Status SetInt32(int col_idx, int32_t val)
+ Status SetInt64(int col_idx, int64_t val)
+
+ Status SetDouble(int col_idx, double val)
+ Status SetFloat(int col_idx, float val)
+
+ # Set, but does not copy string
+ Status SetString(Slice& col_name, Slice& val)
+ Status SetString(int col_idx, Slice& val)
+
+ Status SetStringCopy(Slice& col_name, Slice& val)
+ Status SetStringCopy(int col_idx, Slice& val)
+
+ Status SetBinaryCopy(const Slice& col_name, const Slice& val)
+ Status SetBinaryCopy(int col_idx, const Slice& val)
+
+ Status SetNull(Slice& col_name)
+ Status SetNull(int col_idx)
+
+ Status Unset(Slice& col_name)
+ Status Unset(int col_idx)
+
+ #----------------------------------------------------------------------
+ # Getters
+
+ c_bool IsColumnSet(Slice& col_name)
+ c_bool IsColumnSet(int col_idx)
+
+ c_bool IsNull(Slice& col_name)
+ c_bool IsNull(int col_idx)
+
+ Status GetBool(Slice& col_name, c_bool* val)
+ Status GetBool(int col_idx, c_bool* val)
+
+ Status GetInt8(Slice& col_name, int8_t* val)
+ Status GetInt8(int col_idx, int8_t* val)
+
+ Status GetInt16(Slice& col_name, int16_t* val)
+ Status GetInt16(int col_idx, int16_t* val)
+
+ Status GetInt32(Slice& col_name, int32_t* val)
+ Status GetInt32(int col_idx, int32_t* val)
+
+ Status GetInt64(Slice& col_name, int64_t* val)
+ Status GetInt64(int col_idx, int64_t* val)
+
+ Status GetTimestamp(const Slice& col_name,
+ int64_t* micros_since_utc_epoch)
+ Status GetTimestamp(int col_idx, int64_t* micros_since_utc_epoch)
+
+ Status GetDouble(Slice& col_name, double* val)
+ Status GetDouble(int col_idx, double* val)
+
+ Status GetFloat(Slice& col_name, float* val)
+ Status GetFloat(int col_idx, float* val)
+
+ # Gets the string but does not copy the value. Callers should
+ # copy the resulting Slice if necessary.
+ Status GetString(Slice& col_name, Slice* val)
+ Status GetString(int col_idx, Slice* val)
+
+ Status GetBinary(const Slice& col_name, Slice* val)
+ Status GetBinary(int col_idx, Slice* val)
+
+ Status EncodeRowKey(string* encoded_key)
+ string ToEncodedRowKeyOrDie()
+
+ # Return true if all of the key columns have been specified
+ # for this mutation.
+ c_bool IsKeySet()
+
+ # Return true if all columns have been specified.
+ c_bool AllColumnsSet()
+ string ToString()
+
+ # const Schema* schema()
+
+
+cdef extern from "kudu/client/write_op.h" namespace "kudu::client" nogil:
+
+ enum WriteType" kudu::client::KuduWriteOperation::Type":
+ INSERT " kudu::client::KuduWriteOperation::INSERT"
+ UPDATE " kudu::client::KuduWriteOperation::UPDATE"
+ DELETE " kudu::client::KuduWriteOperation::DELETE"
+
+ cdef cppclass KuduWriteOperation:
+ KuduPartialRow& row()
+ KuduPartialRow* mutable_row()
+
+ # This is a pure virtual function implemented on each of the cppclass
+ # subclasses
+ string ToString()
+
+ # Also a pure virtual
+ WriteType type()
+
+ cdef cppclass KuduInsert(KuduWriteOperation):
+ pass
+
+ cdef cppclass KuduDelete(KuduWriteOperation):
+ pass
+
+ cdef cppclass KuduUpdate(KuduWriteOperation):
+ pass
+
+
+cdef extern from "kudu/client/scan_predicate.h" namespace "kudu::client" nogil:
+ enum ComparisonOp" kudu::client::KuduPredicate::ComparisonOp":
+ KUDU_LESS_EQUAL " kudu::client::KuduPredicate::LESS_EQUAL"
+ KUDU_GREATER_EQUAL " kudu::client::KuduPredicate::GREATER_EQUAL"
+ KUDU_EQUAL " kudu::client::KuduPredicate::EQUAL"
+
+ cdef cppclass KuduPredicate:
+ KuduPredicate* Clone()
+
+
+cdef extern from "kudu/client/value.h" namespace "kudu::client" nogil:
+
+ cdef cppclass KuduValue:
+ @staticmethod
+ KuduValue* FromInt(int64_t val);
+
+ @staticmethod
+ KuduValue* FromFloat(float val);
+
+ @staticmethod
+ KuduValue* FromDouble(double val);
+
+ @staticmethod
+ KuduValue* FromBool(c_bool val);
+
+ @staticmethod
+ KuduValue* CopyString(const Slice& s);
+
+
+cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
+
+ # Omitted KuduClient::ReplicaSelection enum
+
+ cdef cppclass KuduClient:
+
+ Status DeleteTable(const string& table_name)
+ Status OpenTable(const string& table_name,
+ shared_ptr[KuduTable]* table)
+ Status GetTableSchema(const string& table_name, KuduSchema* schema)
+
+ KuduTableCreator* NewTableCreator()
+ Status IsCreateTableInProgress(const string& table_name,
+ c_bool* create_in_progress)
+
+ c_bool IsMultiMaster()
+
+ Status ListTables(vector[string]* tables)
+ Status ListTables(vector[string]* tables, const string& filter)
+
+ Status TableExists(const string& table_name, c_bool* exists)
+
+ KuduTableAlterer* NewTableAlterer()
+ Status IsAlterTableInProgress(const string& table_name,
+ c_bool* alter_in_progress)
+
+ shared_ptr[KuduSession] NewSession()
+
+ cdef cppclass KuduClientBuilder:
+ KuduClientBuilder()
+ KuduClientBuilder& master_server_addrs(const vector[string]& addrs)
+ KuduClientBuilder& add_master_server_addr(const string& addr)
+
+ KuduClientBuilder& default_admin_operation_timeout(
+ const MonoDelta& timeout)
+
+ KuduClientBuilder& default_rpc_timeout(const MonoDelta& timeout)
+
+ Status Build(shared_ptr[KuduClient]* client)
+
+ cdef cppclass KuduTableCreator:
+ KuduTableCreator& table_name(string& name)
+ KuduTableCreator& schema(KuduSchema* schema)
+ KuduTableCreator& split_keys(vector[string]& keys)
+ KuduTableCreator& num_replicas(int n_replicas)
+ KuduTableCreator& wait(c_bool wait)
+
+ Status Create()
+
+ cdef cppclass KuduTableAlterer:
+ # The name of the existing table to alter
+ KuduTableAlterer& table_name(string& name)
+
+ KuduTableAlterer& rename_table(string& name)
+
+ KuduTableAlterer& add_column(string& name, DataType type,
+ const void *default_value)
+ KuduTableAlterer& add_column(string& name, DataType type,
+ const void *default_value,
+ KuduColumnStorageAttributes attr)
+
+ KuduTableAlterer& add_nullable_column(string& name, DataType type)
+
+ KuduTableAlterer& drop_column(string& name)
+
+ KuduTableAlterer& rename_column(string& old_name, string& new_name)
+
+ KuduTableAlterer& wait(c_bool wait)
+
+ Status Alter()
+
+ # Instances of KuduTable are not directly instantiated by users of the
+ # client.
+ cdef cppclass KuduTable:
+
+ string& name()
+ KuduSchema& schema()
+
+ KuduInsert* NewInsert()
+ KuduUpdate* NewUpdate()
+ KuduDelete* NewDelete()
+
+ KuduPredicate* NewComparisonPredicate(const Slice& col_name,
+ ComparisonOp op,
+ KuduValue* value);
+
+ KuduClient* client()
+ # const PartitionSchema& partition_schema()
+
+ enum FlushMode" kudu::client::KuduSession::FlushMode":
+ FlushMode_AutoSync " kudu::client::KuduSession::AUTO_FLUSH_SYNC"
+ FlushMode_AutoBackground " kudu::client::KuduSession::AUTO_FLUSH_BACKGROUND"
+ FlushMode_Manual " kudu::client::KuduSession::MANUAL_FLUSH"
+
+ cdef cppclass KuduSession:
+
+ Status SetFlushMode(FlushMode m)
+
+ void SetMutationBufferSpace(size_t size)
+ void SetTimeoutMillis(int millis)
+
+ void SetPriority(int priority)
+
+ Status Apply(KuduWriteOperation* write_op)
+ Status Apply(KuduInsert* write_op)
+ Status Apply(KuduUpdate* write_op)
+ Status Apply(KuduDelete* write_op)
+
+ # This is thread-safe
+ Status Flush()
+
+ # TODO: Will need to decide on a strategy for exposing the session's
+ # async API to Python
+
+ # Status ApplyAsync(KuduWriteOperation* write_op,
+ # KuduStatusCallback cb)
+ # Status ApplyAsync(KuduInsert* write_op,
+ # KuduStatusCallback cb)
+ # Status ApplyAsync(KuduUpdate* write_op,
+ # KuduStatusCallback cb)
+ # Status ApplyAsync(KuduDelete* write_op,
+ # KuduStatusCallback cb)
+ # void FlushAsync(KuduStatusCallback& cb)
+
+
+ Status Close()
+ c_bool HasPendingOperations()
+ int CountBufferedOperations()
+
+ int CountPendingErrors()
+ void GetPendingErrors(vector[C_KuduError*]* errors, c_bool* overflowed)
+
+ KuduClient* client()
+
+ enum ReadMode" kudu::client::KuduScanner::ReadMode":
+ READ_LATEST " kudu::client::KuduScanner::READ_LATEST"
+ READ_AT_SNAPSHOT " kudu::client::KuduScanner::READ_AT_SNAPSHOT"
+
+ cdef cppclass KuduScanner:
+ KuduScanner(KuduTable* table)
+
+ Status AddConjunctPredicate(KuduPredicate* pred)
+
+ Status Open()
+ void Close()
+
+ c_bool HasMoreRows()
+ Status NextBatch(vector[KuduRowResult]* rows)
+ Status SetBatchSizeBytes(uint32_t batch_size)
+
+ # Pending definition of ReplicaSelection enum
+ # Status SetSelection(ReplicaSelection selection)
+
+ Status SetReadMode(ReadMode read_mode)
+ Status SetSnapshot(uint64_t snapshot_timestamp_micros)
+ Status SetTimeoutMillis(int millis)
+
+ string ToString()
+
+ cdef cppclass C_KuduError " kudu::client::KuduError":
+
+ Status& status()
+
+ KuduWriteOperation& failed_op()
+ KuduWriteOperation* release_failed_op()
+
+ c_bool was_possibly_successful()
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/schema.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/schema.pxd b/python/kudu/schema.pxd
new file mode 100644
index 0000000..b70f8ad
--- /dev/null
+++ b/python/kudu/schema.pxd
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from libcpp.map cimport map
+
+from libkudu_client cimport *
+
+
+cdef class KuduType(object):
+ cdef readonly:
+ DataType type
+
+
+cdef class ColumnSchema:
+ """
+ Wraps a Kudu client ColumnSchema object
+ """
+ cdef:
+ KuduColumnSchema* schema
+ KuduType _type
+
+
+cdef class ColumnSpec:
+ cdef:
+ KuduColumnSpec* spec
+
+
+cdef class SchemaBuilder:
+ cdef:
+ KuduSchemaBuilder builder
+
+
+cdef class Schema:
+ cdef:
+ const KuduSchema* schema
+ object parent
+ bint own_schema
+ map[string, int] _col_mapping
+ bint _mapping_initialized
+
+ cdef int get_loc(self, name) except -1
+
+ cdef inline DataType loc_type(self, int i):
+ return self.schema.Column(i).type()
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/schema.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/schema.pyx b/python/kudu/schema.pyx
new file mode 100644
index 0000000..f02d0e4
--- /dev/null
+++ b/python/kudu/schema.pyx
@@ -0,0 +1,545 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+# cython: embedsignature = True
+
+from cython.operator cimport dereference as deref
+
+from kudu.compat import tobytes, frombytes
+from kudu.schema cimport *
+from kudu.errors cimport check_status
+
+import six
+
+from . import util
+
+BOOL = KUDU_BOOL
+STRING = KUDU_STRING
+
+INT8 = KUDU_INT8
+INT16 = KUDU_INT16
+INT32 = KUDU_INT32
+INT64 = KUDU_INT64
+
+FLOAT = KUDU_FLOAT
+DOUBLE = KUDU_DOUBLE
+
+TIMESTAMP = KUDU_TIMESTAMP
+BINARY = KUDU_BINARY
+
+
+cdef dict _reverse_dict(d):
+ return dict((v, k) for k, v in d.items())
+
+
+# CompressionType enums
+COMPRESSION_DEFAULT = CompressionType_DEFAULT
+COMPRESSION_NONE = CompressionType_NONE
+COMPRESSION_SNAPPY = CompressionType_SNAPPY
+COMPRESSION_LZ4 = CompressionType_LZ4
+COMPRESSION_ZLIB = CompressionType_ZLIB
+
+cdef dict _compression_types = {
+ 'default': COMPRESSION_DEFAULT,
+ 'none': COMPRESSION_NONE,
+ 'snappy': COMPRESSION_SNAPPY,
+ 'lz4': COMPRESSION_LZ4,
+ 'zlib': COMPRESSION_ZLIB,
+}
+
+cdef dict _compression_type_to_name = _reverse_dict(_compression_types)
+
+
+# EncodingType enums
+ENCODING_AUTO = EncodingType_AUTO
+ENCODING_PLAIN = EncodingType_PLAIN
+ENCODING_PREFIX = EncodingType_PREFIX
+ENCODING_GROUP_VARINT = EncodingType_GROUP_VARINT
+ENCODING_RLE = EncodingType_RLE
+
+cdef dict _encoding_types = {
+ 'auto': ENCODING_AUTO,
+ 'plain': ENCODING_PLAIN,
+ 'prefix': ENCODING_PREFIX,
+ 'group_varint': ENCODING_GROUP_VARINT,
+ 'rle': ENCODING_RLE,
+}
+
+cdef dict _encoding_type_to_name = _reverse_dict(_encoding_types)
+
+
+cdef class KuduType(object):
+
+ """
+ Usability wrapper for Kudu data type enum
+ """
+
+ def __cinit__(self, DataType type):
+ self.type = type
+
+ property name:
+
+ def __get__(self):
+ return _type_names[self.type]
+
+ def __repr__(self):
+ return 'KuduType({0})'.format(self.name)
+
+
+int8 = KuduType(KUDU_INT8)
+int16 = KuduType(KUDU_INT16)
+int32 = KuduType(KUDU_INT32)
+int64 = KuduType(KUDU_INT64)
+string_ = KuduType(KUDU_STRING)
+bool_ = KuduType(KUDU_BOOL)
+float_ = KuduType(KUDU_FLOAT)
+double_ = KuduType(KUDU_DOUBLE)
+binary = KuduType(KUDU_BINARY)
+timestamp = KuduType(KUDU_TIMESTAMP)
+
+
+cdef dict _type_names = {
+ INT8: 'int8',
+ INT16: 'int16',
+ INT32: 'int32',
+ INT64: 'int64',
+ STRING: 'string',
+ BOOL: 'bool',
+ FLOAT: 'float',
+ DOUBLE: 'double',
+ BINARY: 'binary',
+ TIMESTAMP: 'timestamp'
+}
+
+
+cdef dict _type_name_to_number = _reverse_dict(_type_names)
+
+cdef dict _type_to_obj = {
+ INT8: int8,
+ INT16: int16,
+ INT32: int32,
+ INT64: int64,
+ STRING: string_,
+ BOOL: bool_,
+ FLOAT: float_,
+ DOUBLE: double_,
+ BINARY: binary,
+ TIMESTAMP: timestamp
+}
+
+
+cdef KuduType to_data_type(object obj):
+ if isinstance(obj, KuduType):
+ return obj
+ elif isinstance(obj, six.string_types):
+ return _type_to_obj[_type_name_to_number[obj]]
+ elif obj in _type_to_obj:
+ return _type_to_obj[obj]
+ else:
+ raise ValueError('Invalid type: {0}'.format(obj))
+
+
+cdef class ColumnSchema:
+ """
+ Wraps a Kudu client ColumnSchema object. Use schema.at(i) or schema[i] to
+ construct one.
+ """
+
+ def __cinit__(self):
+ self.schema = NULL
+ self._type = None
+
+ def __dealloc__(self):
+ if self.schema is not NULL:
+ del self.schema
+
+ property name:
+ def __get__(self):
+ return frombytes(self.schema.name())
+
+ property type:
+ def __get__(self):
+ if self._type is None:
+ self._type = _type_to_obj[self.schema.type()]
+ return self._type
+
+ property nullable:
+ def __get__(self):
+ return self.schema.is_nullable()
+
+ def equals(self, other):
+ if not isinstance(other, ColumnSchema):
+ return False
+ return self.schema.Equals(deref((<ColumnSchema> other).schema))
+
+ def __repr__(self):
+ return ('ColumnSchema(name=%s, type=%s, nullable=%s)'
+ % (self.name, self.type.name,
+ self.nullable))
+
+
+#----------------------------------------------------------------------
+
+cdef class ColumnSpec:
+
+ """
+ Helper class for configuring a column's settings while using the
+ SchemaBuilder.
+ """
+
+ def type(self, type_):
+ self.spec.Type(to_data_type(type_).type)
+ return self
+
+ def default(self, value):
+ """
+ Set a default value for the column
+ """
+ raise NotImplementedError
+
+ def clear_default(self):
+ """
+ Remove a default value set.
+ """
+ raise NotImplementedError
+
+ def compression(self, compression):
+ """
+ Set the compression type
+
+ Parameters
+ ----------
+ compression : string or int
+ One of {'default', 'none', 'snappy', 'lz4', 'zlib'}
+ Or see kudu.COMPRESSION_* constants
+
+ Returns
+ -------
+ self
+ """
+ cdef CompressionType type
+ if isinstance(compression, int):
+ # todo: validation
+ type = <CompressionType> compression
+ else:
+ if compression is None:
+ type = CompressionType_NONE
+ else:
+ try:
+ type = _compression_types[compression.lower()]
+ except KeyError:
+ raise ValueError('Invalid compression type: {0}'
+ .format(compression))
+
+ self.spec.Compression(type)
+ return self
+
+ def encoding(self, encoding):
+ """
+ Set the encoding type
+
+ Parameters
+ ----------
+ encoding : string or int
+ One of {'auto', 'plain', 'prefix', 'group_varint', 'rle'}
+ Or see kudu.ENCODING_* constants
+
+ Returns
+ -------
+ self
+ """
+ cdef EncodingType type
+ if isinstance(encoding, six.string_types):
+ try:
+ type = _encoding_types[encoding.lower()]
+ except KeyError:
+ raise ValueError('Invalid encoding type: {0}'
+ .format(encoding))
+ else:
+ # todo: validation
+ type = <EncodingType> encoding
+
+ self.spec.Encoding(type)
+ return self
+
+ def primary_key(self):
+ """
+ Make this column a primary key. If you use this method, it will be the
+ only primary key. Otherwise see set_primary_keys method on
+ SchemaBuilder.
+
+ Returns
+ -------
+ self
+ """
+ self.spec.PrimaryKey()
+ return self
+
+ def nullable(self, bint is_nullable=True):
+ """
+ Set nullable (True) or not nullable (False)
+
+ Parameters
+ ----------
+ is_nullable : boolean, default True
+
+ Returns
+ -------
+ self
+ """
+ if is_nullable:
+ self.spec.Nullable()
+ else:
+ self.spec.NotNull()
+ return self
+
+ def rename(self, new_name):
+ """
+ Change the column name.
+
+ TODO: Not implemented for table creation
+ """
+ self.spec.RenameTo(new_name)
+ return self
+
+
+cdef class SchemaBuilder:
+
+ def add_column(self, name, type_=None, nullable=None, compression=None,
+ encoding=None, primary_key=False):
+ """
+ Add a new column to the schema. Returns a ColumnSpec object for further
+ configuration and use in a fluid programming style.
+
+ Parameters
+ ----------
+ name : string
+ type_ : string or KuduType
+ Data type e.g. 'int32' or kudu.int32
+ nullable : boolean, default None
+ New columns are nullable by default. Set boolean value for explicit
+ nullable / not-nullable
+ compression : string or int
+ One of {'default', 'none', 'snappy', 'lz4', 'zlib'}
+ Or see kudu.COMPRESSION_* constants
+ encoding : string or int
+ One of {'auto', 'plain', 'prefix', 'group_varint', 'rle'}
+ Or see kudu.ENCODING_* constants
+ primary_key : boolean, default False
+ Use this column as the table primary key
+
+ Examples
+ --------
+ (builder.add_column('foo')
+ .nullable(True)
+ .compression('lz4'))
+
+ Returns
+ -------
+ spec : ColumnSpec
+ """
+ cdef:
+ ColumnSpec result = ColumnSpec()
+ string c_name = tobytes(name)
+
+ result.spec = self.builder.AddColumn(c_name)
+
+ if type_ is not None:
+ result.type(type_)
+
+ if nullable is not None:
+ result.nullable(nullable)
+
+ if compression is not None:
+ result.compression(compression)
+
+ if encoding is not None:
+ result.encoding(encoding)
+
+ if primary_key:
+ result.primary_key()
+
+ return result
+
+ def set_primary_keys(self, key_names):
+ """
+ Set indicated columns (by name) to be the primary keys of the table
+ schema
+
+ Parameters
+ ----------
+ key_names : list of Python strings
+
+ Returns
+ -------
+ None
+ """
+ cdef:
+ vector[string] key_col_names
+
+ for name in key_names:
+ key_col_names.push_back(tobytes(name))
+
+ self.builder.SetPrimaryKey(key_col_names)
+
+ def build(self):
+ """
+ Creates an immutable Schema object after the user has finished adding
+ and onfiguring columns
+
+ Returns
+ -------
+ schema : Schema
+ """
+ cdef Schema result = Schema()
+ cdef KuduSchema* schema = new KuduSchema()
+ check_status(self.builder.Build(schema))
+
+ result.schema = schema
+ return result
+
+
+cdef class Schema:
+
+ """
+ Container for a Kudu table schema. Obtain from Table instances or create
+ new ones using kudu.SchemaBuilder
+ """
+
+ def __cinit__(self):
+ # Users should not call this directly
+ self.schema = NULL
+ self.own_schema = 1
+ self._col_mapping.clear()
+ self._mapping_initialized = 0
+
+ def __dealloc__(self):
+ if self.schema is not NULL and self.own_schema:
+ del self.schema
+
+ property names:
+
+ def __get__(self):
+ result = []
+ for i in range(self.schema.num_columns()):
+ name = frombytes(self.schema.Column(i).name())
+ result.append(name)
+
+ return result
+
+ def __repr__(self):
+ # Got to be careful with huge schemas, maybe some kind of summary repr
+ # when more than 20-30 columns?
+ buf = six.StringIO()
+
+ col_names = self.names
+ space = 2 + max(len(x) for x in col_names)
+
+ for i in range(len(self)):
+ col = self.at(i)
+ not_null = '' if col.nullable else ' NOT NULL'
+
+ buf.write('\n{0}{1}{2}'
+ .format(col.name.ljust(space),
+ col.type.name, not_null))
+
+ pk_string = ', '.join(col_names[i] for i in self.primary_key_indices())
+ buf.write('\nPRIMARY KEY ({0})'.format(pk_string))
+
+ return "kudu.Schema {{{0}\n}}".format(util.indent(buf.getvalue(), 2))
+
+ def __len__(self):
+ return self.schema.num_columns()
+
+ def __getitem__(self, key):
+ if isinstance(key, six.string_types):
+ key = self.get_loc(key)
+
+ if key < 0:
+ key += len(self)
+ return self.at(key)
+
+ def equals(self, Schema other):
+ """
+ Returns True if the table schemas are equal
+ """
+ return self.schema.Equals(deref(other.schema))
+
+ cdef int get_loc(self, name) except -1:
+ if not self._mapping_initialized:
+ for i in range(self.schema.num_columns()):
+ self._col_mapping[self.schema.Column(i).name()] = i
+ self._mapping_initialized = 1
+
+ name = tobytes(name)
+
+ # TODO: std::map is slightly verbose and inefficient here (O(lg n)
+ # lookups), may consider replacing with a better / different hash table
+ # should it become a performance bottleneck
+ cdef map[string, int].iterator it = self._col_mapping.find(name)
+ if it == self._col_mapping.end():
+ raise KeyError(name)
+ return self._col_mapping[name]
+
+ def at(self, size_t i):
+ """
+ Return the ColumnSchema for a column index. Analogous to schema[i].
+
+ Returns
+ -------
+ col_schema : ColumnSchema
+ """
+ cdef ColumnSchema result = ColumnSchema()
+
+ if i < 0 or i >= self.schema.num_columns():
+ raise IndexError('Column index {0} is not in range'
+ .format(i))
+
+ result.schema = new KuduColumnSchema(self.schema.Column(i))
+
+ return result
+
+ def primary_key_indices(self):
+ """
+ Return the indices of the columns used as primary keys
+
+ Returns
+ -------
+ key_indices : list[int]
+ """
+ cdef:
+ vector[int] indices
+ size_t i
+
+ self.schema.GetPrimaryKeyColumnIndexes(&indices)
+
+ result = []
+ for i in range(indices.size()):
+ result.append(indices[i])
+ return result
+
+ def primary_keys(self):
+ """
+ Return the names of the columns used as primary keys
+
+ Returns
+ -------
+ key_names : list[str]
+ """
+ indices = self.primary_key_indices()
+ return [self.at(i).name for i in indices]
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/tests/common.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/common.py b/python/kudu/tests/common.py
new file mode 100644
index 0000000..a944d7d
--- /dev/null
+++ b/python/kudu/tests/common.py
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import division
+
+import json
+import fnmatch
+import os
+import shutil
+import subprocess
+import tempfile
+import time
+
+import kudu
+
+
+class KuduTestBase(object):
+
+ """
+ Base test class that will start a configurable number of master and
+ tablet servers.
+ """
+
+ BASE_PORT = 37000
+ NUM_TABLET_SERVERS = 3
+
+ @classmethod
+ def start_cluster(cls):
+ local_path = tempfile.mkdtemp(dir=os.getenv("TEST_TMPDIR", None))
+ bin_path = "{0}/build/latest".format(os.getenv("KUDU_HOME"))
+
+ os.makedirs("{0}/master/".format(local_path))
+ os.makedirs("{0}/master/data".format(local_path))
+ os.makedirs("{0}/master/logs".format(local_path))
+
+ path = [
+ "{0}/kudu-master".format(bin_path),
+ "-rpc_server_allow_ephemeral_ports",
+ "-rpc_bind_addresses=0.0.0.0:0",
+ "-fs_wal_dir={0}/master/data".format(local_path),
+ "-fs_data_dirs={0}/master/data".format(local_path),
+ "-log_dir={0}/master/logs".format(local_path),
+ "-logtostderr",
+ "-webserver_port=0",
+ "-server_dump_info_path={0}/master/config.json".format(local_path)
+ ]
+
+ p = subprocess.Popen(path, shell=False)
+ fid = open("{0}/master/kudu-master.pid".format(local_path), "w+")
+ fid.write("{0}".format(p.pid))
+ fid.close()
+
+ # We have to wait for the master to settle before the config file
+ # appears
+ config_file = "{0}/master/config.json".format(local_path)
+ for i in range(30):
+ if os.path.exists(config_file):
+ break
+ time.sleep(0.1 * (i + 1))
+ else:
+ raise Exception("Could not find kudu-master config file")
+
+ # If the server was started get the bind port from the config dump
+ master_config = json.load(open("{0}/master/config.json"
+ .format(local_path), "r"))
+ # One master bound on local host
+ master_port = master_config["bound_rpc_addresses"][0]["port"]
+
+ for m in range(cls.NUM_TABLET_SERVERS):
+ os.makedirs("{0}/ts/{1}".format(local_path, m))
+ os.makedirs("{0}/ts/{1}/logs".format(local_path, m))
+
+ path = [
+ "{0}/kudu-tserver".format(bin_path),
+ "-rpc_server_allow_ephemeral_ports",
+ "-rpc_bind_addresses=0.0.0.0:0",
+ "-tserver_master_addrs=127.0.0.1:{0}".format(master_port),
+ "-webserver_port=0",
+ "-log_dir={0}/master/logs".format(local_path),
+ "-logtostderr",
+ "-fs_data_dirs={0}/ts/{1}/data".format(local_path, m),
+ "-fs_wal_dir={0}/ts/{1}/data".format(local_path, m),
+ ]
+ p = subprocess.Popen(path, shell=False)
+ tserver_pid = "{0}/ts/{1}/kudu-tserver.pid".format(local_path, m)
+ fid = open(tserver_pid, "w+")
+ fid.write("{0}".format(p.pid))
+ fid.close()
+
+ return local_path, master_port
+
+ @classmethod
+ def stop_cluster(cls, path):
+ for root, dirnames, filenames in os.walk('{0}/..'.format(path)):
+ for filename in fnmatch.filter(filenames, '*.pid'):
+ with open(os.path.join(root, filename)) as fid:
+ a = fid.read()
+ r = subprocess.Popen(["kill", "{0}".format(a)])
+ r.wait()
+ os.remove(os.path.join(root, filename))
+ shutil.rmtree(path, True)
+
+ @classmethod
+ def setUpClass(cls):
+ cls.cluster_path, master_port = cls.start_cluster()
+ time.sleep(1)
+
+ cls.master_host = '127.0.0.1'
+ cls.master_port = master_port
+
+ cls.client = kudu.connect(cls.master_host, cls.master_port)
+
+ cls.schema = cls.example_schema()
+
+ cls.ex_table = 'example-table'
+ if cls.client.table_exists(cls.ex_table):
+ cls.client.delete_table(cls.ex_table)
+ cls.client.create_table(cls.ex_table, cls.schema)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.stop_cluster(cls.cluster_path)
+
+ @classmethod
+ def example_schema(cls):
+ builder = kudu.schema_builder()
+ builder.add_column('key', kudu.int32, nullable=False)
+ builder.add_column('int_val', kudu.int32)
+ builder.add_column('string_val', kudu.string)
+ builder.set_primary_keys(['key'])
+
+ return builder.build()
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/tests/test_client.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
new file mode 100644
index 0000000..4636b3f
--- /dev/null
+++ b/python/kudu/tests/test_client.py
@@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kudu.compat import unittest, long
+from kudu.tests.common import KuduTestBase
+import kudu
+
+
+class TestClient(KuduTestBase, unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def test_table_basics(self):
+ table = self.client.table(self.ex_table)
+
+ self.assertEqual(table.name, self.ex_table)
+ self.assertEqual(table.num_columns, len(self.schema))
+
+ def test_table_column(self):
+ table = self.client.table(self.ex_table)
+ col = table['key']
+
+ assert col.name == b'key'
+ assert col.parent is table
+
+ result_repr = repr(col)
+ expected_repr = ('Column(key, parent={0}, type=int32)'
+ .format(self.ex_table))
+ assert result_repr == expected_repr
+
+ def test_table_schema_retains_reference(self):
+ import gc
+
+ table = self.client.table(self.ex_table)
+ schema = table.schema
+ table = None
+
+ gc.collect()
+ repr(schema)
+
+ def test_table_exists(self):
+ self.assertFalse(self.client.table_exists('nonexistent-table'))
+ self.assertTrue(self.client.table_exists(self.ex_table))
+
+ def test_list_tables(self):
+ schema = self.example_schema()
+
+ to_create = ['foo1', 'foo2', 'foo3']
+ for name in to_create:
+ self.client.create_table(name, schema)
+
+ result = self.client.list_tables()
+ expected = [self.ex_table] + to_create
+ assert sorted(result) == expected
+
+ result = self.client.list_tables('foo')
+ assert sorted(result) == to_create
+
+ for name in to_create:
+ self.client.delete_table(name)
+
+ def test_is_multimaster(self):
+ assert not self.client.is_multimaster
+
+ def test_delete_table(self):
+ name = "peekaboo"
+ self.client.create_table(name, self.schema)
+ self.client.delete_table(name)
+ assert not self.client.table_exists(name)
+
+ # Should raise a more meaningful exception at some point
+ with self.assertRaises(kudu.KuduNotFound):
+ self.client.delete_table(name)
+
+ def test_table_nonexistent(self):
+ self.assertRaises(kudu.KuduNotFound, self.client.table,
+ '__donotexist__')
+
+ def test_insert_nonexistent_field(self):
+ table = self.client.table(self.ex_table)
+ op = table.new_insert()
+ self.assertRaises(KeyError, op.__setitem__, 'doesntexist', 12)
+
+ def test_insert_rows_and_delete(self):
+ nrows = 100
+ table = self.client.table(self.ex_table)
+ session = self.client.new_session()
+ for i in range(nrows):
+ op = table.new_insert()
+ op['key'] = i
+ op['int_val'] = i * 2
+ op['string_val'] = 'hello_%d' % i
+ session.apply(op)
+
+ # Cannot apply the same insert twice, C++ client does not indicate an
+ # error
+ self.assertRaises(Exception, session.apply, op)
+
+ # synchronous
+ session.flush()
+
+ scanner = table.scanner().open()
+ assert len(scanner.read_all_tuples()) == nrows
+
+ # Delete the rows we just wrote
+ for i in range(nrows):
+ op = table.new_delete()
+ op['key'] = i
+ session.apply(op)
+ session.flush()
+
+ scanner = table.scanner().open()
+ assert len(scanner.read_all_tuples()) == 0
+
+ def test_session_auto_open(self):
+ table = self.client.table(self.ex_table)
+ scanner = table.scanner()
+ result = scanner.read_all_tuples()
+ assert len(result) == 0
+
+ def test_session_open_idempotent(self):
+ table = self.client.table(self.ex_table)
+ scanner = table.scanner().open().open()
+ result = scanner.read_all_tuples()
+ assert len(result) == 0
+
+ def test_session_flush_modes(self):
+ self.client.new_session(flush_mode=kudu.FLUSH_MANUAL)
+ self.client.new_session(flush_mode=kudu.FLUSH_AUTO_SYNC)
+
+ self.client.new_session(flush_mode='manual')
+ self.client.new_session(flush_mode='sync')
+
+ with self.assertRaises(kudu.KuduNotSupported):
+ self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND)
+
+ with self.assertRaises(kudu.KuduNotSupported):
+ self.client.new_session(flush_mode='background')
+
+ with self.assertRaises(ValueError):
+ self.client.new_session(flush_mode='foo')
+
+ def test_connect_timeouts(self):
+ # it works! any other way to check
+ kudu.connect(self.master_host, self.master_port,
+ admin_timeout_ms=100,
+ rpc_timeout_ms=100)
+
+ def test_capture_kudu_error(self):
+ pass
+
+
+class TestMonoDelta(unittest.TestCase):
+
+ def test_empty_ctor(self):
+ delta = kudu.TimeDelta()
+ assert repr(delta) == 'kudu.TimeDelta()'
+
+ def test_static_ctors(self):
+ delta = kudu.timedelta(3.5)
+ assert delta.to_seconds() == 3.5
+
+ delta = kudu.timedelta(millis=3500)
+ assert delta.to_millis() == 3500
+
+ delta = kudu.timedelta(micros=3500)
+ assert delta.to_micros() == 3500
+
+ delta = kudu.timedelta(micros=1000)
+ assert delta.to_nanos() == long(1000000)
+
+ delta = kudu.timedelta(nanos=3500)
+ assert delta.to_nanos() == 3500
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/tests/test_kudu.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_kudu.py b/python/kudu/tests/test_kudu.py
deleted file mode 100644
index f74caee..0000000
--- a/python/kudu/tests/test_kudu.py
+++ /dev/null
@@ -1,325 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from __future__ import division
-
-import json
-import fnmatch
-import nose
-import os
-import shutil
-import subprocess
-import tempfile
-import time
-import unittest
-import signal
-
-import kudu
-
-class KuduBasicsBase(object):
- """Base test class that will start a configurable number of master and tablet
- servers."""
-
- BASE_PORT = 37000
- NUM_TABLET_SERVERS = 3
-
- @classmethod
- def start_cluster(cls):
- local_path = tempfile.mkdtemp(dir=os.getenv("TEST_TMPDIR", None))
- bin_path="{0}/build/latest".format(os.getenv("KUDU_HOME"))
-
- os.makedirs("{0}/master/".format(local_path))
- os.makedirs("{0}/master/data".format(local_path))
- os.makedirs("{0}/master/logs".format(local_path))
-
- path = ["{0}/kudu-master".format(bin_path),
- "-rpc_server_allow_ephemeral_ports",
- "-rpc_bind_addresses=0.0.0.0:0",
- "-fs_wal_dir={0}/master/data".format(local_path),
- "-fs_data_dirs={0}/master/data".format(local_path),
- "-log_dir={0}/master/logs".format(local_path),
- "-logtostderr",
- "-webserver_port=0",
- "-server_dump_info_path={0}/master/config.json".format(local_path)
- ]
-
- p = subprocess.Popen(path, shell=False)
- fid = open("{0}/master/kudu-master.pid".format(local_path), "w+")
- fid.write("{0}".format(p.pid))
- fid.close()
-
- # We have to wait for the master to settle before the config file appears
- config_file = "{0}/master/config.json".format(local_path)
- for _ in range(30):
- if os.path.exists(config_file):
- break
- time.sleep(1)
- else:
- raise Exception("Could not find kudu-master config file")
-
- # If the server was started get the bind port from the config dump
- master_config = json.load(open("{0}/master/config.json".format(local_path), "r"))
- # One master bound on local host
- master_port = master_config["bound_rpc_addresses"][0]["port"]
-
- for m in range(cls.NUM_TABLET_SERVERS):
- os.makedirs("{0}/ts/{1}".format(local_path, m))
- os.makedirs("{0}/ts/{1}/logs".format(local_path, m))
-
- path = ["{0}/kudu-tserver".format(bin_path),
- "-rpc_server_allow_ephemeral_ports",
- "-rpc_bind_addresses=0.0.0.0:0",
- "-tserver_master_addrs=127.0.0.1:{0}".format(master_port),
- "-webserver_port=0",
- "-log_dir={0}/master/logs".format(local_path),
- "-logtostderr",
- "-fs_data_dirs={0}/ts/{1}/data".format(local_path, m),
- "-fs_wal_dir={0}/ts/{1}/data".format(local_path, m),
- ]
- p = subprocess.Popen(path, shell=False)
- fid = open("{0}/ts/{1}/kudu-tserver.pid".format(local_path, m), "w+")
- fid.write("{0}".format(p.pid))
- fid.close()
-
- return local_path, master_port
-
- @classmethod
- def stop_cluster(cls, path):
- for root, dirnames, filenames in os.walk('{0}/..'.format(path)):
- for filename in fnmatch.filter(filenames, '*.pid'):
- with open(os.path.join(root, filename)) as fid:
- a = fid.read()
- r = subprocess.Popen(["kill", "{0}".format(a)])
- r.wait()
- os.remove(os.path.join(root, filename))
- shutil.rmtree(path, True)
-
- @classmethod
- def setUpClass(cls):
- cls.cluster_path, master_port = cls.start_cluster()
- time.sleep(1)
- cls.client = kudu.Client('127.0.0.1:{0}'.format(master_port))
-
- cls.schema = cls.example_schema()
-
- cls.ex_table = 'example-table'
- if cls.client.table_exists(cls.ex_table):
- cls.client.delete_table(cls.ex_table)
- cls.client.create_table(cls.ex_table, cls.schema)
-
- @classmethod
- def tearDownClass(cls):
- cls.stop_cluster(cls.cluster_path)
-
- @classmethod
- def example_schema(cls):
- col1 = kudu.ColumnSchema.create('key', kudu.INT32)
- col2 = kudu.ColumnSchema.create('int_val', kudu.INT32)
- col3 = kudu.ColumnSchema.create('string_val', kudu.STRING)
-
- return kudu.schema_from_list([col1, col2, col3], 1)
-
-
-class TestSchema(unittest.TestCase):
-
- def test_column_schema(self):
- pass
-
- def test_create_schema(self):
- col1 = kudu.ColumnSchema.create('key', kudu.INT32)
- col2 = kudu.ColumnSchema.create('int_val', kudu.INT32)
- col3 = kudu.ColumnSchema.create('string_val', kudu.STRING)
-
- cols = [col1, col2, col3]
-
- # One key column
- schema = kudu.schema_from_list(cols, 1)
- self.assertEqual(len(schema), 3)
-
- # Question whether we want to go the overloading route
- self.assertTrue(schema.at(0).equals(col1))
- self.assertTrue(schema.at(1).equals(col2))
- self.assertTrue(schema.at(2).equals(col3))
-
- # This isn't yet very easy
- # self.assertEqual(schema['key'], col1)
- # self.assertEqual(schema['int_val'], col2)
- # self.assertEqual(schema['string_val'], col3)
-
- def test_column_schema_repr(self):
- col1 = kudu.ColumnSchema.create('key', kudu.INT32)
-
- result = repr(col1)
- expected = 'ColumnSchema(name=key, type=int32, nullable=False)'
- self.assertEqual(result, expected)
-
- def test_column_schema_default_value(self):
- pass
-
-
-class TestTable(KuduBasicsBase, unittest.TestCase):
-
- def setUp(self):
- pass
-
- def test_table_basics(self):
- table = self.client.open_table(self.ex_table)
-
- self.assertEqual(table.name, self.ex_table)
- self.assertEqual(table.num_columns, len(self.schema))
-
- def test_table_exists(self):
- self.assertFalse(self.client.table_exists('nonexistent-table'))
- self.assertTrue(self.client.table_exists(self.ex_table))
-
- def test_delete_table(self):
- name = "peekaboo"
- self.client.create_table(name, self.schema)
- self.assertTrue(self.client.delete_table(name))
- self.assertFalse(self.client.table_exists(name))
-
- # Should raise a more meaningful exception at some point
- val = self.client.delete_table(name)
- self.assertFalse(val)
-
- def test_open_table_nonexistent(self):
- self.assertRaises(kudu.KuduException, self.client.open_table,
- '__donotexist__')
-
- def test_insert_nonexistent_field(self):
- table = self.client.open_table(self.ex_table)
- op = table.insert()
- self.assertRaises(KeyError, op.__setitem__, 'doesntexist', 12)
-
- def test_insert_rows_and_delete(self):
- nrows = 100
- table = self.client.open_table(self.ex_table)
- session = self.client.new_session()
- for i in range(nrows):
- op = table.insert()
- op['key'] = i
- op['int_val'] = i * 2
- op['string_val'] = 'hello_%d' % i
- session.apply(op)
-
- # Cannot apply the same insert twice, does not blow up in C++
- self.assertRaises(Exception, session.apply, op)
-
- # synchronous
- self.assertTrue(session.flush())
-
- # Delete the rows we just wrote
- for i in range(nrows):
- op = table.delete()
- op['key'] = i
- session.apply(op)
- session.flush()
- # TODO: verify the table is now empty
-
- def test_capture_kudu_error(self):
- pass
-
-
-class TestScanner(KuduBasicsBase, unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- super(TestScanner, cls).setUpClass()
-
- cls.nrows = 100
- table = cls.client.open_table(cls.ex_table)
- session = cls.client.new_session()
-
- tuples = []
- for i in range(cls.nrows):
- op = table.insert()
- tup = i, i * 2, 'hello_%d' % i
- op['key'] = tup[0]
- op['int_val'] = tup[1]
- op['string_val'] = tup[2]
- session.apply(op)
- tuples.append(tup)
- session.flush()
-
- cls.table = table
- cls.tuples = tuples
-
- @classmethod
- def tearDownClass(cls):
- pass
-
- def setUp(self):
- pass
-
- def test_scan_rows_basic(self):
- # Let's scan with no predicates
- scanner = self.table.scanner().open()
-
- batch = scanner.read_all()
- self.assertEqual(len(batch), self.nrows)
-
- result_tuples = batch.as_tuples()
- self.assertEqual(result_tuples, self.tuples)
-
- def test_scan_rows_simple_predicate(self):
- scanner = self.table.scanner()
- scanner.add_comparison_predicate("key", kudu.GREATER_EQUAL, 20)
- scanner.add_comparison_predicate("key", kudu.LESS_EQUAL, 49)
- scanner.open()
-
- batch = scanner.read_all()
- tuples = batch.as_tuples()
-
- self.assertEqual(tuples, self.tuples[20:50])
-
- def test_scan_rows_string_predicate(self):
- scanner = self.table.scanner()
-
- scanner.add_comparison_predicate("string_val", kudu.GREATER_EQUAL, "hello_20")
- scanner.add_comparison_predicate("string_val", kudu.LESS_EQUAL, "hello_25")
- scanner.open()
-
- batch = scanner.read_all()
- tuples = batch.as_tuples()
-
- self.assertEqual(tuples, self.tuples[20:26])
-
- def test_scan_invalid_predicates(self):
- scanner = self.table.scanner()
- try:
- scanner.add_comparison_predicate("foo", kudu.GREATER_EQUAL, "x")
- except Exception, e:
- self.assertEqual("Not found: column not found: foo", str(e))
-
- try:
- scanner.add_comparison_predicate("string_val", kudu.GREATER_EQUAL, 1)
- except Exception, e:
- self.assertEqual("Invalid argument: non-string value " +
- "for string column string_val", str(e))
-
- try:
- scanner.add_comparison_predicate("string_val", kudu.GREATER_EQUAL, None)
- except Exception, e:
- self.assertEqual("unable to convert python type <type 'NoneType'>", str(e))
-
-
-if __name__ == '__main__':
- nose.runmodule(argv=[__file__, '-vvs', '-x', '--pdb',
- '--pdb-failure', '-s'], exit=False)
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/tests/test_scanner.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py
new file mode 100644
index 0000000..86decb1
--- /dev/null
+++ b/python/kudu/tests/test_scanner.py
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import division
+
+from kudu.compat import unittest
+from kudu.tests.common import KuduTestBase
+import kudu
+
+
+class TestScanner(KuduTestBase, unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestScanner, cls).setUpClass()
+
+ cls.nrows = 100
+ table = cls.client.table(cls.ex_table)
+ session = cls.client.new_session()
+
+ tuples = []
+ for i in range(cls.nrows):
+ op = table.new_insert()
+ tup = i, i * 2, 'hello_%d' % i
+ op['key'] = tup[0]
+ op['int_val'] = tup[1]
+ op['string_val'] = tup[2]
+ session.apply(op)
+ tuples.append(tup)
+ session.flush()
+
+ cls.table = table
+ cls.tuples = tuples
+
+ @classmethod
+ def tearDownClass(cls):
+ pass
+
+ def setUp(self):
+ pass
+
+ def test_scan_rows_basic(self):
+ # Let's scan with no predicates
+ scanner = self.table.scanner().open()
+
+ tuples = scanner.read_all_tuples()
+ self.assertEqual(sorted(tuples), self.tuples)
+
+ def test_scan_rows_simple_predicate(self):
+ key = self.table['key']
+ preds = [key >= 20, key <= 49]
+
+ def _read_predicates(preds):
+ scanner = self.table.scanner()
+ scanner.add_predicates(preds)
+ scanner.open()
+ return scanner.read_all_tuples()
+
+ tuples = _read_predicates(preds)
+ self.assertEqual(sorted(tuples), self.tuples[20:50])
+
+ # verify predicates reusable
+ tuples = _read_predicates(preds)
+ self.assertEqual(sorted(tuples), self.tuples[20:50])
+
+ def test_scan_rows_string_predicate(self):
+ scanner = self.table.scanner()
+
+ sv = self.table['string_val']
+
+ scanner.add_predicates([sv >= 'hello_20',
+ sv <= 'hello_25'])
+ scanner.open()
+
+ tuples = scanner.read_all_tuples()
+
+ self.assertEqual(sorted(tuples), self.tuples[20:26])
+
+ def test_scan_invalid_predicates(self):
+ scanner = self.table.scanner()
+ sv = self.table['string_val']
+
+ with self.assertRaises(TypeError):
+ scanner.add_predicates([sv >= None])
+
+ with self.assertRaises(kudu.KuduInvalidArgument):
+ scanner.add_predicates([sv >= 1])
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/tests/test_schema.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_schema.py b/python/kudu/tests/test_schema.py
new file mode 100644
index 0000000..b75c61d
--- /dev/null
+++ b/python/kudu/tests/test_schema.py
@@ -0,0 +1,182 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import division
+
+from kudu.compat import unittest
+import kudu
+
+
+class TestSchema(unittest.TestCase):
+
+ def setUp(self):
+ self.columns = [('one', 'int32', False),
+ ('two', 'int8', False),
+ ('three', 'double', True),
+ ('four', 'string', False)]
+
+ self.primary_keys = ['one', 'two']
+
+ self.builder = kudu.schema_builder()
+ for name, typename, nullable in self.columns:
+ self.builder.add_column(name, typename, nullable=nullable)
+
+ self.builder.set_primary_keys(self.primary_keys)
+ self.schema = self.builder.build()
+
+ def test_repr(self):
+ result = repr(self.schema)
+ for name, _, _ in self.columns:
+ assert name in result
+
+ assert 'PRIMARY KEY (one, two)' in result
+
+ def test_schema_length(self):
+ assert len(self.schema) == 4
+
+ def test_names(self):
+ assert self.schema.names == ['one', 'two', 'three', 'four']
+
+ def test_primary_keys(self):
+ assert self.schema.primary_key_indices() == [0, 1]
+ assert self.schema.primary_keys() == ['one', 'two']
+
+ def test_getitem_boundschecking(self):
+ with self.assertRaises(IndexError):
+ self.schema[4]
+
+ def test_getitem_wraparound(self):
+ # wraparound
+ result = self.schema[-1]
+ expected = self.schema[3]
+
+ assert result.equals(expected)
+
+ def test_getitem_string(self):
+ result = self.schema['three']
+ expected = self.schema[2]
+
+ assert result.equals(expected)
+
+ with self.assertRaises(KeyError):
+ self.schema['not_found']
+
+ def test_schema_equals(self):
+ assert self.schema.equals(self.schema)
+
+ builder = kudu.schema_builder()
+ builder.add_column('key', 'int64', nullable=False, primary_key=True)
+ schema = builder.build()
+
+ assert not self.schema.equals(schema)
+
+ def test_column_equals(self):
+ assert not self.schema[0].equals(self.schema[1])
+
+ def test_type(self):
+ builder = kudu.schema_builder()
+ (builder.add_column('key')
+ .type('int32')
+ .primary_key()
+ .nullable(False))
+ schema = builder.build()
+
+ tp = schema[0].type
+ assert tp.name == 'int32'
+ assert tp.type == kudu.schema.INT32
+
+ def test_compression(self):
+ builder = kudu.schema_builder()
+ builder.add_column('key', 'int64', nullable=False)
+
+ foo = builder.add_column('foo', 'string').compression('lz4')
+ assert foo is not None
+
+ bar = builder.add_column('bar', 'string')
+ bar.compression(kudu.COMPRESSION_ZLIB)
+
+ with self.assertRaises(ValueError):
+ bar = builder.add_column('qux', 'string', compression='unknown')
+
+ builder.set_primary_keys(['key'])
+ builder.build()
+
+ # TODO; The C++ client does not give us an API to see the storage
+ # attributes of a column
+
+ def test_encoding(self):
+ builder = kudu.schema_builder()
+ builder.add_column('key', 'int64', nullable=False)
+
+ foo = builder.add_column('foo', 'string').encoding('rle')
+ assert foo is not None
+
+ bar = builder.add_column('bar', 'string')
+ bar.encoding(kudu.ENCODING_PLAIN)
+
+ with self.assertRaises(ValueError):
+ builder.add_column('qux', 'string', encoding='unknown')
+
+ builder.set_primary_keys(['key'])
+ builder.build()
+ # TODO(wesm): The C++ client does not give us an API to see the storage
+ # attributes of a column
+
+ def test_set_column_spec_pk(self):
+ builder = kudu.schema_builder()
+ key = (builder.add_column('key', 'int64', nullable=False)
+ .primary_key())
+ assert key is not None
+ schema = builder.build()
+ assert 'key' in schema.primary_keys()
+
+ builder = kudu.schema_builder()
+ key = (builder.add_column('key', 'int64', nullable=False,
+ primary_key=True))
+ schema = builder.build()
+ assert 'key' in schema.primary_keys()
+
+ def test_partition_schema(self):
+ pass
+
+ def test_nullable_not_null(self):
+ builder = kudu.schema_builder()
+ (builder.add_column('key', 'int64', nullable=False)
+ .primary_key())
+
+ builder.add_column('data1', 'double').nullable(True)
+ builder.add_column('data2', 'double').nullable(False)
+ builder.add_column('data3', 'double', nullable=True)
+ builder.add_column('data4', 'double', nullable=False)
+
+ schema = builder.build()
+
+ assert not schema[0].nullable
+ assert schema[1].nullable
+ assert not schema[2].nullable
+
+ assert schema[3].nullable
+ assert not schema[4].nullable
+
+ def test_default_value(self):
+ pass
+
+ def test_column_schema_repr(self):
+ result = repr(self.schema[0])
+ expected = 'ColumnSchema(name=one, type=int32, nullable=False)'
+ self.assertEqual(result, expected)
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/kudu/util.py
----------------------------------------------------------------------
diff --git a/python/kudu/util.py b/python/kudu/util.py
new file mode 100644
index 0000000..a2b65cf
--- /dev/null
+++ b/python/kudu/util.py
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+def indent(text, spaces):
+ block = ' ' * spaces
+ return '\n'.join(block + x for x in text.split('\n'))
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/requirements.txt
----------------------------------------------------------------------
diff --git a/python/requirements.txt b/python/requirements.txt
index 55a8b18..ef646fa 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,3 +1,6 @@
+pytest
+numpy>=1.7.0
cython >= 0.21
-nose >= 1.0
setuptools >= 0.8
+six
+unittest2
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/setup.cfg
----------------------------------------------------------------------
diff --git a/python/setup.cfg b/python/setup.cfg
new file mode 100644
index 0000000..9af7e6f
--- /dev/null
+++ b/python/setup.cfg
@@ -0,0 +1,2 @@
+[aliases]
+test=pytest
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/53f976f0/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 9c7d7e9..4a9828f 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -22,24 +22,45 @@ from Cython.Distutils import build_ext
from Cython.Build import cythonize
import Cython
-if Cython.__version__ < '0.19.1':
- raise Exception('Please upgrade to Cython 0.19.1 or newer')
import sys
from setuptools import setup
+from distutils.command.clean import clean as _clean
from distutils.extension import Extension
import os
+if Cython.__version__ < '0.19.1':
+ raise Exception('Please upgrade to Cython 0.19.1 or newer')
+
MAJOR = 0
-MINOR = 0
-MICRO = 1
+MINOR = 1
+MICRO = 0
VERSION = '%d.%d.%d' % (MAJOR, MINOR, MICRO)
+ISRELEASED = True
+
+setup_dir = os.path.abspath(os.path.dirname(__file__))
+
+
+def write_version_py(filename=os.path.join(setup_dir, 'kudu/version.py')):
+ version = VERSION
+ if not ISRELEASED:
+ version += '.dev'
+
+ a = open(filename, 'w')
+ file_content = "\n".join(["",
+ "# THIS FILE IS GENERATED FROM SETUP.PY",
+ "version = '%(version)s'",
+ "isrelease = '%(isrelease)s'"])
+
+ a.write(file_content % {'version': VERSION,
+ 'isrelease': str(ISRELEASED)})
+ a.close()
-from distutils.command.clean import clean as _clean
class clean(_clean):
def run(self):
_clean.run(self)
- for x in ['kudu/client.cpp']:
+ for x in ['kudu/client.cpp', 'kudu/schema.cpp',
+ 'kudu/errors.cpp']:
try:
os.remove(x)
except OSError:
@@ -48,20 +69,22 @@ class clean(_clean):
# If we're in the context of the Kudu git repository, build against the
# latest in-tree build artifacts
-if 'KUDU_HOME' in os.environ and \
- os.path.exists(os.path.join(os.environ['KUDU_HOME'], "build/latest")):
- print >>sys.stderr, "Building from in-tree build artifacts"
+if ('KUDU_HOME' in os.environ and
+ os.path.exists(os.path.join(os.environ['KUDU_HOME'],
+ "build/latest"))):
+ sys.stderr.write("Building from in-tree build artifacts\n")
kudu_include_dir = os.path.join(os.environ['KUDU_HOME'], 'src')
- kudu_lib_dir = os.path.join(os.environ['KUDU_HOME'], 'build/latest/exported')
+ kudu_lib_dir = os.path.join(os.environ['KUDU_HOME'],
+ 'build/latest/exported')
else:
if os.path.exists("/usr/local/include/kudu"):
- prefix="/usr/local"
+ prefix = "/usr/local"
elif os.path.exists("/usr/include/kudu"):
- prefix="/usr"
+ prefix = "/usr"
else:
- print >>sys.stderr, "Cannot find installed kudu client."
+ sys.stderr.write("Cannot find installed kudu client.\n")
sys.exit(1)
- print >>sys.stderr, "Building from system prefix ", prefix
+ sys.stderr.write("Building from system prefix {0}\n".format(prefix))
kudu_include_dir = prefix + "/include"
kudu_lib_dir = prefix + "/lib"
@@ -69,32 +92,59 @@ INCLUDE_PATHS = [kudu_include_dir]
LIBRARY_DIRS = [kudu_lib_dir]
RT_LIBRARY_DIRS = LIBRARY_DIRS
-client_ext = Extension('kudu.client', ['kudu/client.pyx'],
- libraries=['kudu_client'],
- # Disable the 'new' gcc5 ABI; see the top-level
- # CMakeLists.txt for details.
- define_macros=[('_GLIBCXX_USE_CXX11_ABI', '0')],
- include_dirs=INCLUDE_PATHS,
- library_dirs=LIBRARY_DIRS,
- runtime_library_dirs=RT_LIBRARY_DIRS)
+ext_submodules = ['client', 'errors', 'schema']
+
+extensions = []
+
+for submodule_name in ext_submodules:
+ ext = Extension('kudu.{0}'.format(submodule_name),
+ ['kudu/{0}.pyx'.format(submodule_name)],
+ libraries=['kudu_client'],
+ # Disable the 'new' gcc5 ABI; see the top-level
+ # CMakeLists.txt for details.
+ define_macros=[('_GLIBCXX_USE_CXX11_ABI', '0')],
+ include_dirs=INCLUDE_PATHS,
+ library_dirs=LIBRARY_DIRS,
+ runtime_library_dirs=RT_LIBRARY_DIRS)
+ extensions.append(ext)
+
+extensions = cythonize(extensions)
+
+write_version_py()
+
+LONG_DESCRIPTION = open(os.path.join(setup_dir, "README.md")).read()
+DESCRIPTION = "Python interface to the Apache Kudu (incubating) C++ Client API"
-extensions = cythonize([client_ext])
+CLASSIFIERS = [
+ 'Development Status :: 3 - Alpha',
+ 'Environment :: Console',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 2',
+ 'Programming Language :: Python :: 3',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3.4',
+ 'Programming Language :: Python :: 3.5',
+ 'Programming Language :: Cython'
+]
setup(
- name="python-kudu",
- packages=["kudu"],
+ name="kudu-python",
+ packages=['kudu', 'kudu.tests'],
version=VERSION,
- setup_requires=['nose>=1.0'],
package_data={'kudu': ['*.pxd', '*.pyx']},
ext_modules=extensions,
- cmdclass = {
+ cmdclass={
'clean': clean,
'build_ext': build_ext
},
+ setup_requires=['pytest-runner'],
+ tests_require=['pytest'],
install_requires=['cython >= 0.21'],
- description="Cython wrapper for the Kudu C++ API",
- license='Proprietary',
- author="Wes McKinney",
- maintainer_email="wes@cloudera.com",
+ description=DESCRIPTION,
+ long_description=LONG_DESCRIPTION,
+ license='Apache License, Version 2.0',
+ classifiers=CLASSIFIERS,
+ author="Apache Kudu (incubating) team",
+ maintainer_email="dev@kudu.incubator.apache.org",
test_suite="kudu.tests"
)