You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/09/29 04:33:55 UTC
[pulsar-client-cpp] branch main updated: PIP-209: Removed Python client wrapper
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 034c710 PIP-209: Removed Python client wrapper
new 5f42cdf Merge pull request #1 from merlimat/remove-python-client
034c710 is described below
commit 034c7108c033b5a51abeac7e9040b55d0148864d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Sep 28 17:58:30 2022 -0700
PIP-209: Removed Python client wrapper
---
CMakeLists.txt | 38 -
docker-build-python3.9.sh | 40 -
docker/build-wheel-file-within-docker.sh | 46 -
docker/build-wheels.sh | 82 --
docker/create-images.sh | 44 -
docker/manylinux1/Dockerfile | 162 ----
docker/manylinux2014/Dockerfile | 130 ---
docker/manylinux_musl/Dockerfile | 116 ---
docker/python-versions.sh | 41 -
python/.gitignore | 4 -
python/CMakeLists.txt | 103 ---
python/build-mac-wheels.sh | 300 -------
python/custom_logger_test.py | 54 --
python/examples/company.avsc | 21 -
python/examples/rpc_client.py | 80 --
python/examples/rpc_server.py | 63 --
python/pulsar/__init__.py | 1428 ------------------------------
python/pulsar/exceptions.py | 28 -
python/pulsar/functions/__init__.py | 20 -
python/pulsar/functions/context.py | 191 ----
python/pulsar/functions/function.py | 51 --
python/pulsar/functions/serde.py | 87 --
python/pulsar/schema/__init__.py | 24 -
python/pulsar/schema/definition.py | 515 -----------
python/pulsar/schema/schema.py | 111 ---
python/pulsar/schema/schema_avro.py | 96 --
python/pulsar_test.py | 1341 ----------------------------
python/schema_test.py | 1291 ---------------------------
python/setup.py | 117 ---
python/src/authentication.cc | 119 ---
python/src/client.cc | 118 ---
python/src/config.cc | 300 -------
python/src/consumer.cc | 120 ---
python/src/cryptoKeyReader.cc | 32 -
python/src/enums.cc | 114 ---
python/src/exceptions.cc | 112 ---
python/src/message.cc | 171 ----
python/src/producer.cc | 102 ---
python/src/pulsar.cc | 59 --
python/src/reader.cc | 98 --
python/src/schema.cc | 28 -
python/src/utils.cc | 47 -
python/src/utils.h | 104 ---
python/test_consumer.py | 36 -
python/test_producer.py | 46 -
45 files changed, 8230 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1975cd6..13452e6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -56,9 +56,6 @@ MESSAGE(STATUS "BUILD_STATIC_LIB: " ${BUILD_STATIC_LIB})
option(BUILD_TESTS "Build tests" ON)
MESSAGE(STATUS "BUILD_TESTS: " ${BUILD_TESTS})
-option(BUILD_PYTHON_WRAPPER "Build Pulsar Python wrapper" ON)
-MESSAGE(STATUS "BUILD_PYTHON_WRAPPER: " ${BUILD_PYTHON_WRAPPER})
-
option(BUILD_WIRESHARK "Build Pulsar Wireshark dissector" OFF)
MESSAGE(STATUS "BUILD_WIRESHARK: " ${BUILD_WIRESHARK})
@@ -265,33 +262,6 @@ endif()
find_package(Boost REQUIRED COMPONENTS ${BOOST_COMPONENTS})
-if (BUILD_PYTHON_WRAPPER)
- find_package(PythonLibs REQUIRED)
- MESSAGE(STATUS "PYTHON: " ${PYTHONLIBS_VERSION_STRING})
-
- string(REPLACE "." ";" PYTHONLIBS_VERSION_NO_LIST ${PYTHONLIBS_VERSION_STRING})
- list(GET PYTHONLIBS_VERSION_NO_LIST 0 PYTHONLIBS_VERSION_MAJOR)
- list(GET PYTHONLIBS_VERSION_NO_LIST 1 PYTHONLIBS_VERSION_MINOR)
- set(BOOST_PYTHON_NAME_POSTFIX ${PYTHONLIBS_VERSION_MAJOR}${PYTHONLIBS_VERSION_MINOR})
- # For python3 the lib name is boost_python3
- set(BOOST_PYTHON_NAME_LIST python37;python38;python39;python310;python3;python3-mt;python-py${BOOST_PYTHON_NAME_POSTFIX};python${BOOST_PYTHON_NAME_POSTFIX}-mt;python${BOOST_PYTHON_NAME_POSTFIX})
-
- foreach (BOOST_PYTHON_NAME IN LISTS BOOST_PYTHON_NAME_LIST)
- find_package(Boost QUIET COMPONENTS ${BOOST_PYTHON_NAME})
- if (${Boost_FOUND})
- set(BOOST_PYTHON_NAME_FOUND ${BOOST_PYTHON_NAME})
- break()
- endif()
- endforeach()
-
- if (NOT ${Boost_FOUND})
- MESSAGE(FATAL_ERROR "Could not find Boost Python library")
- endif ()
-
- MESSAGE(STATUS "BOOST_PYTHON_NAME_FOUND: " ${BOOST_PYTHON_NAME_FOUND})
- find_package(Boost REQUIRED COMPONENTS ${BOOST_PYTHON_NAME_FOUND})
-endif (BUILD_PYTHON_WRAPPER)
-
find_package(OpenSSL REQUIRED)
if (BUILD_TESTS)
@@ -305,8 +275,6 @@ if (USE_LOG4CXX)
endif (USE_LOG4CXX)
if (NOT APPLE AND NOT MSVC)
- # we don't set options below to build _pulsar.so
- set(CMAKE_CXX_FLAGS_PYTHON "${CMAKE_CXX_FLAGS}")
# Hide all non-exported symbols to avoid conflicts
add_compile_options(-fvisibility=hidden)
if (CMAKE_COMPILER_IS_GNUCC)
@@ -429,10 +397,6 @@ if (BUILD_TESTS)
add_subdirectory(tests)
endif()
-if (BUILD_PYTHON_WRAPPER)
- add_subdirectory(python)
-endif ()
-
if (BUILD_WIRESHARK)
add_subdirectory(wireshark)
endif()
@@ -448,7 +412,6 @@ add_custom_target(format ${BUILD_SUPPORT_DIR}/run_clang_format.py
${CMAKE_SOURCE_DIR}/examples
${CMAKE_SOURCE_DIR}/tests
${CMAKE_SOURCE_DIR}/include
- ${CMAKE_SOURCE_DIR}/python/src
${CMAKE_SOURCE_DIR}/wireshark)
# `make check-format` option (for CI test)
@@ -461,5 +424,4 @@ add_custom_target(check-format ${BUILD_SUPPORT_DIR}/run_clang_format.py
${CMAKE_SOURCE_DIR}/examples
${CMAKE_SOURCE_DIR}/tests
${CMAKE_SOURCE_DIR}/include
- ${CMAKE_SOURCE_DIR}/python/src
${CMAKE_SOURCE_DIR}/wireshark)
diff --git a/docker-build-python3.9.sh b/docker-build-python3.9.sh
deleted file mode 100755
index cbad25c..0000000
--- a/docker-build-python3.9.sh
+++ /dev/null
@@ -1,40 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Build Pulsar Python3.9 client
-
-set -e
-
-ROOT_DIR=$(git rev-parse --show-toplevel)
-cd $ROOT_DIR/pulsar-client-cpp
-
-
-# Use manylinux2014 build image
-PYTHON_VERSION="3.9"
-PYTHON_SPEC="cp39-cp39"
-ARCH="x86_64"
-IMAGE_NAME=apachepulsar/pulsar-build
-IMAGE=$IMAGE_NAME:manylinux-$PYTHON_SPEC-$ARCH
-
-VOLUME_OPTION=${VOLUME_OPTION:-"-v $ROOT_DIR:/pulsar"}
-COMMAND="/pulsar/pulsar-client-cpp/docker/build-wheel-file-within-docker.sh"
-DOCKER_CMD="docker run -i ${VOLUME_OPTION} -e USE_FULL_POM_NAME -e NAME_POSTFIX -e ARCH=${ARCH} ${IMAGE}"
-
-$DOCKER_CMD bash -c "${COMMAND}"
diff --git a/docker/build-wheel-file-within-docker.sh b/docker/build-wheel-file-within-docker.sh
deleted file mode 100755
index 87650f9..0000000
--- a/docker/build-wheel-file-within-docker.sh
+++ /dev/null
@@ -1,46 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-set -ex
-
-cd /pulsar/pulsar-client-cpp
-
-find . -name CMakeCache.txt | xargs -r rm
-find . -name CMakeFiles | xargs -r rm -rf
-
-cmake . -DPYTHON_INCLUDE_DIR=/opt/python/$PYTHON_SPEC/include/python$PYTHON_VERSION \
- -DPYTHON_LIBRARY=/opt/python/$PYTHON_SPEC/lib \
- -DLINK_STATIC=ON \
- -DBUILD_TESTS=OFF \
- -DBUILD_WIRESHARK=OFF
-
-make clean
-make _pulsar -j3
-
-cd python
-python setup.py bdist_wheel
-
-# Audit wheel is required to convert a wheel that is tagged as generic
-# 'linux' into a 'multilinux' wheel.
-# Only wheel files tagged as multilinux can be uploaded to PyPI
-# Audit wheel will make sure no external dependencies are needed for
-# the shared library and that only symbols supported by most linux
-# distributions are used.
-auditwheel repair dist/pulsar_client*-$PYTHON_SPEC-linux_${ARCH}.whl
diff --git a/docker/build-wheels.sh b/docker/build-wheels.sh
deleted file mode 100755
index 25ac64f..0000000
--- a/docker/build-wheels.sh
+++ /dev/null
@@ -1,82 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
-set -e
-
-BUILD_IMAGE_NAME="${BUILD_IMAGE_NAME:-apachepulsar/pulsar-build}"
-
-# ROOT_DIR should be an absolute path so that Docker accepts it as a valid volumes path
-ROOT_DIR=`cd $(dirname $0)/../..; pwd`
-cd $ROOT_DIR
-
-source ./pulsar-client-cpp/docker/python-versions.sh
-
-function contains_build_version {
- for line in "${PYTHON_VERSIONS[@]}"; do
- read -r -a v <<< "$line"
- value="${v[0]} ${v[1]} ${v[2]} ${v[3]}"
-
- if [ "${build_version}" == "${value}" ]; then
- # found
- res=1
- return
- fi
- done
-
- # not found
- res=0
-}
-
-
-if [ $# -ge 1 ]; then
- build_version=$@
- contains_build_version
- if [ $res == 1 ]; then
- PYTHON_VERSIONS=(
- "${build_version}"
- )
- else
- echo "Unknown build version : ${build_version}"
- echo "Supported python build versions are :"
- echo ${PYTHON_VERSIONS[@]}
- exit 1
- fi
-fi
-
-for line in "${PYTHON_VERSIONS[@]}"; do
- read -r -a PY <<< "$line"
- PYTHON_VERSION=${PY[0]}
- PYTHON_SPEC=${PY[1]}
- IMAGE=${PY[2]}
- ARCH=${PY[3]}
- echo "--------- Build Python wheel for $PYTHON_VERSION -- $IMAGE -- $PYTHON_SPEC -- $ARCH"
-
- IMAGE=$BUILD_IMAGE_NAME:${IMAGE}-$PYTHON_SPEC-$ARCH
-
- echo "Using image: $IMAGE"
-
- VOLUME_OPTION=${VOLUME_OPTION:-"-v $ROOT_DIR:/pulsar"}
- COMMAND="/pulsar/pulsar-client-cpp/docker/build-wheel-file-within-docker.sh"
- DOCKER_CMD="docker run -i ${VOLUME_OPTION} -e USE_FULL_POM_NAME -e NAME_POSTFIX ${IMAGE}"
-
- $DOCKER_CMD bash -c "ARCH=$ARCH ${COMMAND}"
-
-done
diff --git a/docker/create-images.sh b/docker/create-images.sh
deleted file mode 100755
index 14938a4..0000000
--- a/docker/create-images.sh
+++ /dev/null
@@ -1,44 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
-# Create all the Docker images for variations of Python versions
-
-set -e
-
-source python-versions.sh
-
-for line in "${PYTHON_VERSIONS[@]}"; do
- read -r -a PY <<< "$line"
- PYTHON_VERSION=${PY[0]}
- PYTHON_SPEC=${PY[1]}
- BASE_IMAGE=${PY[2]}
- ARCH=${PY[3]}
- echo "--------- Build Docker image for $PYTHON_VERSION -- $PYTHON_SPEC -- $ARCH"
-
- IMAGE_NAME=pulsar-build:$BASE_IMAGE-$PYTHON_SPEC-$ARCH
-
- docker build -t $IMAGE_NAME $BASE_IMAGE \
- --build-arg PYTHON_VERSION=$PYTHON_VERSION \
- --build-arg PYTHON_SPEC=$PYTHON_SPEC \
- --build-arg ARCH=$ARCH
-
- echo "==== Successfully built image $IMAGE_NAME"
-done
diff --git a/docker/manylinux1/Dockerfile b/docker/manylinux1/Dockerfile
deleted file mode 100644
index 9fef05a..0000000
--- a/docker/manylinux1/Dockerfile
+++ /dev/null
@@ -1,162 +0,0 @@
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
-FROM quay.io/pypa/manylinux1_x86_64
-
-RUN yum install -y gtest-devel
-
-ARG PYTHON_VERSION
-ARG PYTHON_SPEC
-
-ENV PYTHON_VERSION=${PYTHON_VERSION}
-ENV PYTHON_SPEC=${PYTHON_SPEC}
-
-ENV PATH="/opt/python/${PYTHON_SPEC}/bin:${PATH}"
-
-RUN ln -s /opt/python/${PYTHON_SPEC}/include/python${PYTHON_VERSION}m /opt/python/${PYTHON_SPEC}/include/python${PYTHON_VERSION}
-
-# Perl (required for building OpenSSL)
-RUN curl -O -L https://www.cpan.org/src/5.0/perl-5.10.0.tar.gz && \
- tar xvfz perl-5.10.0.tar.gz && \
- cd perl-5.10.0 && \
- ./configure.gnu --prefix=/usr/local/ && \
- make && make install && \
- rm -rf /perl-5.10.0.tar.gz /perl-5.10.0
-
-####################################
-# These dependencies can be found in Ubuntu but they're not compiled with -fPIC,
-# so they cannot be statically linked into a shared library
-####################################
-
-# ZLib
-RUN curl -O -L https://zlib.net/zlib-1.2.12.tar.gz && \
- tar xvfz zlib-1.2.12.tar.gz && \
- cd zlib-1.2.12 && \
- CFLAGS="-fPIC -O3" ./configure && \
- make && make install && \
- rm -rf /zlib-1.2.12.tar.gz /zlib-1.2.12
-
-# Compile OpenSSL
-RUN curl -O -L https://github.com/openssl/openssl/archive/OpenSSL_1_1_1n.tar.gz && \
- tar xvfz OpenSSL_1_1_1n.tar.gz && \
- cd openssl-OpenSSL_1_1_1n/ && \
- ./Configure -fPIC --prefix=/usr/local/ssl/ no-shared linux-x86_64 && \
- make && make install && \
- rm -rf /OpenSSL_1_1_1n.tar.gz /openssl-OpenSSL_1_1_1n
-
-ENV LD_LIBRARY_PATH /usr/local/ssl/lib/:
-
-# Download and compile boost
-RUN curl -O -L https://boostorg.jfrog.io/artifactory/main/release/1.68.0/source/boost_1_68_0.tar.gz && \
- tar xvfz boost_1_68_0.tar.gz && \
- cd /boost_1_68_0 && \
- ./bootstrap.sh --with-libraries=program_options,filesystem,regex,thread,system,python && \
- ./b2 address-model=64 cxxflags=-fPIC link=static threading=multi variant=release install && \
- rm -rf /boost_1_68_0.tar.gz /boost_1_68_0
-
-# Download and copile protoubf
-RUN curl -O -L https://github.com/google/protobuf/releases/download/v3.20.0/protobuf-cpp-3.20.0.tar.gz && \
- tar xvfz protobuf-cpp-3.20.0.tar.gz && \
- cd protobuf-3.20.0/ && \
- CXXFLAGS=-fPIC ./configure && \
- make && make install && ldconfig && \
- rm -rf /protobuf-cpp-3.20.0.tar.gz /protobuf-3.20.0
-
-# Compile APR
-RUN curl -O -L http://archive.apache.org/dist/apr/apr-1.5.2.tar.gz && \
- tar xvfz apr-1.5.2.tar.gz && \
- cd apr-1.5.2 && \
- CFLAGS=-fPIC CXXFLAGS=-fPIC ./configure && \
- make && make install && \
- rm -rf /apr-1.5.2.tar.gz /apr-1.5.2
-
-# Compile APR-Util
-RUN curl -O -L http://archive.apache.org/dist/apr/apr-util-1.5.4.tar.gz && \
- tar xvfz apr-util-1.5.4.tar.gz && \
- cd apr-util-1.5.4 && \
- CFLAGS=-fPIC CXXFLAGS=-fPIC ./configure -with-apr=/usr/local/apr && \
- make && make install && \
- rm -rf /apr-util-1.5.4.tar.gz /apr-util-1.5.4
-
-# Libtool
-RUN curl -L -O https://ftp.gnu.org/gnu/libtool/libtool-2.4.6.tar.gz && \
- tar xvfz libtool-2.4.6.tar.gz && \
- cd libtool-2.4.6 && \
- ./configure && \
- make && make install && \
- rm -rf /libtool-2.4.6.tar.gz /libtool-2.4.6
-
-# Compile log4cxx
-RUN curl -O -L https://github.com/apache/logging-log4cxx/archive/v0.11.0.tar.gz && \
- tar xvfz v0.11.0.tar.gz && \
- cd logging-log4cxx-0.11.0 && \
- ./autogen.sh && \
- CXXFLAGS=-fPIC ./configure && \
- make && make install && \
- rm -rf /v0.11.0.tar.gz /logging-log4cxx-0.11.0
-
-# Compile expat
-RUN curl -O -L https://github.com/libexpat/libexpat/archive/R_2_2_0.tar.gz && \
- tar xfvz R_2_2_0.tar.gz && \
- cd libexpat-R_2_2_0/expat && \
- ./buildconf.sh && \
- CFLAGS=-fPIC CXXFLAGS=-fPIC ./configure && \
- make && make installlib && \
- rm -rf /R_2_2_0.tar.gz /libexpat-R_2_2_0
-
-RUN curl -O -L https://github.com/Kitware/CMake/archive/v3.12.1.tar.gz && \
- tar xvfz v3.12.1.tar.gz && \
- cd CMake-3.12.1 && \
- ./configure && \
- make && make install && \
- rm -rf /v3.12.1.tar.gz /CMake-3.12.1
-
-# Zstandard
-RUN curl -O -L https://github.com/facebook/zstd/releases/download/v1.3.7/zstd-1.3.7.tar.gz && \
- tar xvfz zstd-1.3.7.tar.gz && \
- cd zstd-1.3.7 && \
- CFLAGS="-fPIC -O3" make -j8 && \
- make install && \
- rm -rf /zstd-1.3.7 /zstd-1.3.7.tar.gz
-
-# Snappy
-RUN curl -O -L https://github.com/google/snappy/releases/download/1.1.3/snappy-1.1.3.tar.gz && \
- tar xvfz snappy-1.1.3.tar.gz && \
- cd snappy-1.1.3 && \
- CXXFLAGS="-fPIC -O3" ./configure && \
- make && make install && \
- rm -rf /snappy-1.1.3 /snappy-1.1.3.tar.gz
-
-# LibCurl
-RUN curl -O -L https://github.com/curl/curl/releases/download/curl-7_61_0/curl-7.61.0.tar.gz && \
- tar xvfz curl-7.61.0.tar.gz && \
- cd curl-7.61.0 && \
- CFLAGS=-fPIC ./configure --with-ssl=/usr/local/ssl/ && \
- make && make install && \
- rm -rf /curl-7.61.0.tar.gz /curl-7.61.0
-
-RUN pip install twine
-RUN pip install fastavro
-
-
-ENV PYTHON_INCLUDE_DIR /opt/python/${PYTHON_SPEC}/include
-ENV PYTHON_LIBRARIES /opt/python/${PYTHON_SPEC}/lib/python${PYTHON_VERSION}
-ENV OPENSSL_ROOT_DIR /usr/local/ssl/
diff --git a/docker/manylinux2014/Dockerfile b/docker/manylinux2014/Dockerfile
deleted file mode 100644
index 5cab10c..0000000
--- a/docker/manylinux2014/Dockerfile
+++ /dev/null
@@ -1,130 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-ARG ARCH
-FROM quay.io/pypa/manylinux2014_${ARCH}
-
-ARG PYTHON_VERSION
-ARG PYTHON_SPEC
-
-ENV PYTHON_VERSION=${PYTHON_VERSION}
-ENV PYTHON_SPEC=${PYTHON_SPEC}
-
-ARG ARCH
-ENV ARCH=${ARCH}
-
-
-ENV PATH="/opt/python/${PYTHON_SPEC}/bin:${PATH}"
-
-RUN ln -s /opt/python/${PYTHON_SPEC}/include/python${PYTHON_VERSION}m /opt/python/${PYTHON_SPEC}/include/python${PYTHON_VERSION}
-
-# Perl (required for building OpenSSL)
-RUN curl -O -L https://www.cpan.org/src/5.0/perl-5.10.0.tar.gz && \
- tar xvfz perl-5.10.0.tar.gz && \
- cd perl-5.10.0 && \
- ./configure.gnu --prefix=/usr/local/ && \
- make && make install && \
- rm -rf /perl-5.10.0.tar.gz /perl-5.10.0
-
-####################################
-# These dependencies can be found in Ubuntu but they're not compiled with -fPIC,
-# so they cannot be statically linked into a shared library
-####################################
-
-# ZLib
-RUN curl -O -L https://zlib.net/zlib-1.2.12.tar.gz && \
- tar xvfz zlib-1.2.12.tar.gz && \
- cd zlib-1.2.12 && \
- CFLAGS="-fPIC -O3" ./configure && \
- make -j8 && make install && \
- rm -rf /zlib-1.2.12.tar.gz /zlib-1.2.12
-
-# Compile OpenSSL
-RUN curl -O -L https://github.com/openssl/openssl/archive/OpenSSL_1_1_1n.tar.gz && \
- tar xvfz OpenSSL_1_1_1n.tar.gz && \
- cd openssl-OpenSSL_1_1_1n/ && \
- ./Configure -fPIC --prefix=/usr/local/ssl/ no-shared linux-${ARCH} && \
- make -j8 && make install && \
- rm -rf /OpenSSL_1_1_1n.tar.gz /openssl-OpenSSL_1_1_1n
-
-ENV LD_LIBRARY_PATH /usr/local/ssl/lib/:
-
-# Download and compile boost
-RUN curl -O -L https://boostorg.jfrog.io/artifactory/main/release/1.78.0/source/boost_1_78_0.tar.gz && \
- tar xvfz boost_1_78_0.tar.gz && \
- cd /boost_1_78_0 && \
- ./bootstrap.sh --with-libraries=program_options,filesystem,regex,thread,system,python && \
- ./b2 address-model=64 cxxflags=-fPIC link=static threading=multi variant=release install -j8 && \
- rm -rf /boost_1_78_0.tar.gz /boost_1_78_0
-
-# Download and copile protoubf
-RUN curl -O -L https://github.com/google/protobuf/releases/download/v3.20.0/protobuf-cpp-3.20.0.tar.gz && \
- tar xvfz protobuf-cpp-3.20.0.tar.gz && \
- cd protobuf-3.20.0/ && \
- CXXFLAGS=-fPIC ./configure && \
- make -j8 && make install && ldconfig && \
- rm -rf /protobuf-cpp-3.20.0.tar.gz /protobuf-3.20.0
-
-# Compile expat
-RUN curl -O -L https://github.com/libexpat/libexpat/archive/R_2_2_0.tar.gz && \
- tar xfvz R_2_2_0.tar.gz && \
- cd libexpat-R_2_2_0/expat && \
- ./buildconf.sh && \
- CFLAGS=-fPIC CXXFLAGS=-fPIC ./configure && \
- make -j8 && make installlib && \
- rm -rf /R_2_2_0.tar.gz /libexpat-R_2_2_0
-
-RUN curl -O -L https://github.com/Kitware/CMake/archive/v3.12.1.tar.gz && \
- tar xvfz v3.12.1.tar.gz && \
- cd CMake-3.12.1 && \
- ./configure && \
- make -j8 && make install && \
- rm -rf /v3.12.1.tar.gz /CMake-3.12.1
-
-# Zstandard
-RUN curl -O -L https://github.com/facebook/zstd/releases/download/v1.3.7/zstd-1.3.7.tar.gz && \
- tar xvfz zstd-1.3.7.tar.gz && \
- cd zstd-1.3.7 && \
- CFLAGS="-fPIC -O3" make -j8 && \
- make install && \
- rm -rf /zstd-1.3.7 /zstd-1.3.7.tar.gz
-
-# Snappy
-RUN curl -O -L https://github.com/google/snappy/releases/download/1.1.3/snappy-1.1.3.tar.gz && \
- tar xvfz snappy-1.1.3.tar.gz && \
- cd snappy-1.1.3 && \
- CXXFLAGS="-fPIC -O3" ./configure && \
- make -j8 && make install && \
- rm -rf /snappy-1.1.3 /snappy-1.1.3.tar.gz
-
-# LibCurl
-RUN curl -O -L https://github.com/curl/curl/releases/download/curl-7_61_0/curl-7.61.0.tar.gz && \
- tar xvfz curl-7.61.0.tar.gz && \
- cd curl-7.61.0 && \
- CFLAGS=-fPIC ./configure --with-ssl=/usr/local/ssl/ && \
- make -j8 && make install && \
- rm -rf /curl-7.61.0.tar.gz /curl-7.61.0
-
-RUN pip install twine
-RUN pip install fastavro
-
-
-ENV PYTHON_INCLUDE_DIR /opt/python/${PYTHON_SPEC}/include
-ENV PYTHON_LIBRARIES /opt/python/${PYTHON_SPEC}/lib/python${PYTHON_VERSION}
-ENV OPENSSL_ROOT_DIR /usr/local/ssl/
diff --git a/docker/manylinux_musl/Dockerfile b/docker/manylinux_musl/Dockerfile
deleted file mode 100644
index 8eec249..0000000
--- a/docker/manylinux_musl/Dockerfile
+++ /dev/null
@@ -1,116 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-ARG ARCH
-FROM quay.io/pypa/musllinux_1_1_${ARCH}
-
-ARG PYTHON_VERSION
-ARG PYTHON_SPEC
-
-ENV PYTHON_VERSION=${PYTHON_VERSION}
-ENV PYTHON_SPEC=${PYTHON_SPEC}
-
-ARG ARCH
-ENV ARCH=${ARCH}
-
-
-ENV PATH="/opt/python/${PYTHON_SPEC}/bin:${PATH}"
-
-RUN ln -s /opt/python/${PYTHON_SPEC}/include/python${PYTHON_VERSION}m /opt/python/${PYTHON_SPEC}/include/python${PYTHON_VERSION}
-
-# Perl (required for building OpenSSL)
-RUN curl -O -L https://www.cpan.org/src/5.0/perl-5.10.0.tar.gz && \
- tar xvfz perl-5.10.0.tar.gz && \
- cd perl-5.10.0 && \
- ./configure.gnu --prefix=/usr/local/ && \
- make && make install && \
- rm -rf /perl-5.10.0.tar.gz /perl-5.10.0
-
-####################################
-# These dependencies can be found in Ubuntu but they're not compiled with -fPIC,
-# so they cannot be statically linked into a shared library
-####################################
-
-# ZLib
-RUN curl -O -L https://zlib.net/zlib-1.2.12.tar.gz && \
- tar xvfz zlib-1.2.12.tar.gz && \
- cd zlib-1.2.12 && \
- CFLAGS="-fPIC -O3" ./configure && \
- make -j8 && make install && \
- rm -rf /zlib-1.2.12.tar.gz /zlib-1.2.12
-
-# Compile OpenSSL
-RUN curl -O -L https://github.com/openssl/openssl/archive/OpenSSL_1_1_1n.tar.gz && \
- tar xvfz OpenSSL_1_1_1n.tar.gz && \
- cd openssl-OpenSSL_1_1_1n/ && \
- ./Configure -fPIC --prefix=/usr/local/ssl/ no-shared linux-${ARCH} && \
- make -j8 && make install && \
- rm -rf /OpenSSL_1_1_1n.tar.gz /openssl-OpenSSL_1_1_1n
-
-ENV LD_LIBRARY_PATH /usr/local/ssl/lib/:
-
-# Download and compile boost
-RUN curl -O -L https://boostorg.jfrog.io/artifactory/main/release/1.78.0/source/boost_1_78_0.tar.gz && \
- tar xvfz boost_1_78_0.tar.gz && \
- cd /boost_1_78_0 && \
- ./bootstrap.sh --with-libraries=program_options,filesystem,regex,thread,system,python && \
- ./b2 address-model=64 cxxflags=-fPIC link=static threading=multi variant=release install -j8 && \
- rm -rf /boost_1_78_0.tar.gz /boost_1_78_0
-
-# Download and copile protoubf
-RUN curl -O -L https://github.com/google/protobuf/releases/download/v3.20.0/protobuf-cpp-3.20.0.tar.gz && \
- tar xvfz protobuf-cpp-3.20.0.tar.gz && \
- cd protobuf-3.20.0/ && \
- CXXFLAGS=-fPIC ./configure && \
- make -j8 && make install && \
- rm -rf /protobuf-cpp-3.20.0.tar.gz /protobuf-3.20.0
-
-RUN apk add cmake
-
-# Zstandard
-RUN curl -O -L https://github.com/facebook/zstd/releases/download/v1.3.7/zstd-1.3.7.tar.gz && \
- tar xvfz zstd-1.3.7.tar.gz && \
- cd zstd-1.3.7 && \
- CFLAGS="-fPIC -O3" make -j8 && \
- make install && \
- rm -rf /zstd-1.3.7 /zstd-1.3.7.tar.gz
-
-# Snappy
-RUN curl -O -L https://github.com/google/snappy/releases/download/1.1.3/snappy-1.1.3.tar.gz && \
- tar xvfz snappy-1.1.3.tar.gz && \
- cd snappy-1.1.3 && \
- CXXFLAGS="-fPIC -O3" ./configure && \
- make -j8 && make install && \
- rm -rf /snappy-1.1.3 /snappy-1.1.3.tar.gz
-
-# LibCurl
-RUN curl -O -L https://github.com/curl/curl/releases/download/curl-7_61_0/curl-7.61.0.tar.gz && \
- tar xvfz curl-7.61.0.tar.gz && \
- cd curl-7.61.0 && \
- CFLAGS=-fPIC ./configure --with-ssl=/usr/local/ssl/ && \
- make -j8 && make install && \
- rm -rf /curl-7.61.0.tar.gz /curl-7.61.0
-
-RUN pip install twine
-RUN pip install fastavro
-
-
-ENV PYTHON_INCLUDE_DIR /opt/python/${PYTHON_SPEC}/include
-ENV PYTHON_LIBRARIES /opt/python/${PYTHON_SPEC}/lib/python${PYTHON_VERSION}
-ENV OPENSSL_ROOT_DIR /usr/local/ssl/
diff --git a/docker/python-versions.sh b/docker/python-versions.sh
deleted file mode 100644
index e83d5d9..0000000
--- a/docker/python-versions.sh
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-PYTHON_VERSIONS=(
- '3.7 cp37-cp37m manylinux2014 x86_64'
- '3.8 cp38-cp38 manylinux2014 x86_64'
- '3.9 cp39-cp39 manylinux2014 x86_64'
- '3.10 cp310-cp310 manylinux2014 x86_64'
-
- '3.7 cp37-cp37m manylinux2014 aarch64'
- '3.8 cp38-cp38 manylinux2014 aarch64'
- '3.9 cp39-cp39 manylinux2014 aarch64'
- '3.10 cp310-cp310 manylinux2014 aarch64'
-
- # Alpine compatible wheels
- '3.7 cp37-cp37m manylinux_musl aarch64'
- '3.8 cp38-cp38 manylinux_musl aarch64'
- '3.9 cp39-cp39 manylinux_musl aarch64'
- '3.10 cp310-cp310 manylinux_musl aarch64'
-
- '3.7 cp37-cp37m manylinux_musl x86_64'
- '3.8 cp38-cp38 manylinux_musl x86_64'
- '3.9 cp39-cp39 manylinux_musl x86_64'
- '3.10 cp310-cp310 manylinux_musl x86_64'
-)
diff --git a/python/.gitignore b/python/.gitignore
deleted file mode 100644
index 5cb909f..0000000
--- a/python/.gitignore
+++ /dev/null
@@ -1,4 +0,0 @@
-MANIFEST
-build
-dist
-*.egg-info
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
deleted file mode 100644
index 63cf163..0000000
--- a/python/CMakeLists.txt
+++ /dev/null
@@ -1,103 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-INCLUDE_DIRECTORIES("${Boost_INCLUDE_DIRS}" "${PYTHON_INCLUDE_DIRS}")
-
-ADD_LIBRARY(_pulsar SHARED src/pulsar.cc
- src/producer.cc
- src/consumer.cc
- src/config.cc
- src/enums.cc
- src/client.cc
- src/message.cc
- src/authentication.cc
- src/reader.cc
- src/schema.cc
- src/cryptoKeyReader.cc
- src/exceptions.cc
- src/utils.cc
- )
-
-SET(CMAKE_SHARED_LIBRARY_PREFIX )
-SET(CMAKE_SHARED_LIBRARY_SUFFIX .so)
-
-if (NOT APPLE AND NOT MSVC)
- SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS_PYTHON}")
-endif()
-
-if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang")
- set(CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS "${CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS} -Qunused-arguments -undefined dynamic_lookup")
-endif()
-
-# Newer boost versions don't use the -mt suffix
-if (NOT DEFINED ${Boost_PYTHON37-MT_LIBRARY})
- set(Boost_PYTHON37-MT_LIBRARY ${Boost_PYTHON37_LIBRARY})
-endif()
-
-if (NOT DEFINED ${Boost_PYTHON38-MT_LIBRARY})
- set(Boost_PYTHON38-MT_LIBRARY ${Boost_PYTHON38_LIBRARY})
-endif()
-
-if (NOT DEFINED ${Boost_PYTHON39-MT_LIBRARY})
- set(Boost_PYTHON39-MT_LIBRARY ${Boost_PYTHON39_LIBRARY})
-endif()
-
-if (NOT DEFINED ${Boost_PYTHON310-MT_LIBRARY})
- set(Boost_PYTHON310-MT_LIBRARY ${Boost_PYTHON310_LIBRARY})
-endif()
-
-# Try all possible boost-python variable namings
-set(PYTHON_WRAPPER_LIBS ${Boost_PYTHON_LIBRARY}
- ${Boost_PYTHON3_LIBRARY}
- ${Boost_PYTHON37-MT_LIBRARY}
- ${Boost_PYTHON38_LIBRARY}
- ${Boost_PYTHON39_LIBRARY}
- ${Boost_PYTHON310_LIBRARY}
- )
-
-if (APPLE)
- if (Boost_PYTHON37-MT_LIBRARY_RELEASE)
- set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON37-MT_LIBRARY_RELEASE})
- endif ()
- if (Boost_PYTHON38-MT_LIBRARY_RELEASE)
- set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON38-MT_LIBRARY_RELEASE})
- endif ()
- if (Boost_PYTHON39-MT_LIBRARY_RELEASE)
- set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON39-MT_LIBRARY_RELEASE})
- endif ()
- if (Boost_PYTHON310-MT_LIBRARY_RELEASE)
- set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON310-MT_LIBRARY_RELEASE})
- endif ()
-endif()
-
-message(STATUS "Using Boost Python libs: ${PYTHON_WRAPPER_LIBS}")
-
-if (NOT PYTHON_WRAPPER_LIBS)
- MESSAGE(FATAL_ERROR "Could not find Boost Python library")
-endif ()
-
-if (APPLE)
- set(CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS "${CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS} -undefined dynamic_lookup")
- target_link_libraries(_pulsar -Wl,-all_load pulsarStatic ${PYTHON_WRAPPER_LIBS} ${COMMON_LIBS} ${ICU_LIBS})
-else ()
- if (NOT MSVC)
- set (CMAKE_SHARED_LINKER_FLAGS " -static-libgcc -static-libstdc++")
- endif()
- target_link_libraries(_pulsar pulsarStatic ${PYTHON_WRAPPER_LIBS} ${COMMON_LIBS})
-endif ()
diff --git a/python/build-mac-wheels.sh b/python/build-mac-wheels.sh
deleted file mode 100755
index 6a4dae7..0000000
--- a/python/build-mac-wheels.sh
+++ /dev/null
@@ -1,300 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-set -e
-
-PYTHON_VERSIONS=(
- '3.7 3.7.13'
- '3.8 3.8.13'
- '3.9 3.9.10'
- '3.10 3.10.2'
-)
-
-export MACOSX_DEPLOYMENT_TARGET=10.15
-MACOSX_DEPLOYMENT_TARGET_MAJOR=${MACOSX_DEPLOYMENT_TARGET%%.*}
-
-ZLIB_VERSION=1.2.12
-OPENSSL_VERSION=1_1_1n
-BOOST_VERSION=1.78.0
-PROTOBUF_VERSION=3.20.0
-ZSTD_VERSION=1.5.2
-SNAPPY_VERSION=1.1.3
-CURL_VERSION=7.61.0
-
-ROOT_DIR=$(git rev-parse --show-toplevel)
-cd "${ROOT_DIR}/pulsar-client-cpp"
-
-
-# Compile and cache dependencies
-CACHE_DIR=~/.pulsar-mac-wheels-cache
-mkdir -p $CACHE_DIR
-
-cd $CACHE_DIR
-
-PREFIX=$CACHE_DIR/install
-
-###############################################################################
-for line in "${PYTHON_VERSIONS[@]}"; do
- read -r -a PY <<< "$line"
- PYTHON_VERSION=${PY[0]}
- PYTHON_VERSION_LONG=${PY[1]}
-
- if [ ! -f Python-${PYTHON_VERSION_LONG}/.done ]; then
- echo "Building Python $PYTHON_VERSION_LONG"
- curl -O -L https://www.python.org/ftp/python/${PYTHON_VERSION_LONG}/Python-${PYTHON_VERSION_LONG}.tgz
- tar xfz Python-${PYTHON_VERSION_LONG}.tgz
-
- PY_PREFIX=$CACHE_DIR/py-$PYTHON_VERSION
- pushd Python-${PYTHON_VERSION_LONG}
- if [ $PYTHON_VERSION = '3.7' ]; then
- UNIVERSAL_ARCHS='intel-64'
- PY_CFLAGS=" -arch x86_64"
- else
- UNIVERSAL_ARCHS='universal2'
- fi
-
- CFLAGS="-fPIC -O3 -mmacosx-version-min=${MACOSX_DEPLOYMENT_TARGET} -I${PREFIX}/include ${PY_CFLAGS}" \
- LDFLAGS=" ${PY_CFLAGS} -L${PREFIX}/lib" \
- ./configure --prefix=$PY_PREFIX --enable-shared --enable-universalsdk --with-universal-archs=${UNIVERSAL_ARCHS}
- make -j16
- make install
-
- curl -O -L https://files.pythonhosted.org/packages/27/d6/003e593296a85fd6ed616ed962795b2f87709c3eee2bca4f6d0fe55c6d00/wheel-0.37.1-py2.py3-none-any.whl
- $PY_PREFIX/bin/pip3 install wheel-*.whl
-
- touch .done
- popd
- else
- echo "Using cached Python $PYTHON_VERSION_LONG"
- fi
-done
-
-
-###############################################################################
-if [ ! -f zlib-${ZLIB_VERSION}/.done ]; then
- echo "Building ZLib"
- curl -O -L https://zlib.net/zlib-${ZLIB_VERSION}.tar.gz
- tar xvfz zlib-$ZLIB_VERSION.tar.gz
- pushd zlib-$ZLIB_VERSION
- CFLAGS="-fPIC -O3 -arch arm64 -arch x86_64 -mmacosx-version-min=${MACOSX_DEPLOYMENT_TARGET}" ./configure --prefix=$PREFIX
- make -j16
- make install
- touch .done
- popd
-else
- echo "Using cached ZLib"
-fi
-
-###############################################################################
-if [ ! -f openssl-OpenSSL_${OPENSSL_VERSION}.done ]; then
- echo "Building OpenSSL"
- curl -O -L https://github.com/openssl/openssl/archive/OpenSSL_${OPENSSL_VERSION}.tar.gz
- # -arch arm64 -arch x86_64
- tar xvfz OpenSSL_${OPENSSL_VERSION}.tar.gz
- mv openssl-OpenSSL_${OPENSSL_VERSION} openssl-OpenSSL_${OPENSSL_VERSION}-arm64
- pushd openssl-OpenSSL_${OPENSSL_VERSION}-arm64
- CFLAGS="-fPIC -mmacosx-version-min=${MACOSX_DEPLOYMENT_TARGET}" \
- ./Configure --prefix=$PREFIX no-shared darwin64-arm64-cc
- make -j8
- make install
- popd
-
- tar xvfz OpenSSL_${OPENSSL_VERSION}.tar.gz
- mv openssl-OpenSSL_${OPENSSL_VERSION} openssl-OpenSSL_${OPENSSL_VERSION}-x86_64
- pushd openssl-OpenSSL_${OPENSSL_VERSION}-x86_64
- CFLAGS="-fPIC -mmacosx-version-min=${MACOSX_DEPLOYMENT_TARGET}" \
- ./Configure --prefix=$PREFIX no-shared darwin64-x86_64-cc
- make -j8
- make install
- popd
-
- # Create universal binaries
- lipo -create openssl-OpenSSL_${OPENSSL_VERSION}-arm64/libssl.a openssl-OpenSSL_${OPENSSL_VERSION}-x86_64/libssl.a \
- -output $PREFIX/lib/libssl.a
- lipo -create openssl-OpenSSL_${OPENSSL_VERSION}-arm64/libcrypto.a openssl-OpenSSL_${OPENSSL_VERSION}-x86_64/libcrypto.a \
- -output $PREFIX/lib/libcrypto.a
-
- touch openssl-OpenSSL_${OPENSSL_VERSION}.done
-else
- echo "Using cached OpenSSL"
-fi
-
-###############################################################################
-BOOST_VERSION_=${BOOST_VERSION//./_}
-for line in "${PYTHON_VERSIONS[@]}"; do
- read -r -a PY <<< "$line"
- PYTHON_VERSION=${PY[0]}
- PYTHON_VERSION_LONG=${PY[1]}
-
- DIR=boost-src-${BOOST_VERSION}-python-${PYTHON_VERSION}
- if [ ! -f $DIR/.done ]; then
- echo "Building Boost for Py $PYTHON_VERSION"
- curl -O -L https://boostorg.jfrog.io/artifactory/main/release/${BOOST_VERSION}/source/boost_${BOOST_VERSION_}.tar.gz
- tar xfz boost_${BOOST_VERSION_}.tar.gz
- mv boost_${BOOST_VERSION_} $DIR
-
- PY_PREFIX=$CACHE_DIR/py-$PYTHON_VERSION
- PY_INCLUDE_DIR=${PY_PREFIX}/include/python${PYTHON_VERSION}
- if [ $PYTHON_VERSION = '3.7' ]; then
- PY_INCLUDE_DIR=${PY_INCLUDE_DIR}m
- fi
-
- pushd $DIR
- cat <<EOF > user-config.jam
- using python : $PYTHON_VERSION
- : python3
- : ${PY_INCLUDE_DIR}
- : ${PY_PREFIX}/lib
- ;
-EOF
- ./bootstrap.sh --with-libraries=python --with-python=python3 --with-python-root=$PY_PREFIX \
- --prefix=$CACHE_DIR/boost-py-$PYTHON_VERSION
- ./b2 address-model=64 cxxflags="-fPIC -arch arm64 -arch x86_64 -mmacosx-version-min=${MACOSX_DEPLOYMENT_TARGET}" \
- link=static threading=multi \
- --user-config=./user-config.jam \
- variant=release python=${PYTHON_VERSION} \
- -j16 \
- install
- touch .done
- popd
- else
- echo "Using cached Boost for Py $PYTHON_VERSION"
- fi
-
-done
-
-
-
-###############################################################################
-if [ ! -f protobuf-${PROTOBUF_VERSION}/.done ]; then
- echo "Building Protobuf"
- curl -O -L https://github.com/google/protobuf/releases/download/v${PROTOBUF_VERSION}/protobuf-cpp-${PROTOBUF_VERSION}.tar.gz
- tar xvfz protobuf-cpp-${PROTOBUF_VERSION}.tar.gz
- pushd protobuf-${PROTOBUF_VERSION}
- CXXFLAGS="-fPIC -arch arm64 -arch x86_64 -mmacosx-version-min=${MACOSX_DEPLOYMENT_TARGET}" \
- ./configure --prefix=$PREFIX
- make -j16
- make install
- touch .done
- popd
-else
- echo "Using cached Protobuf"
-fi
-
-###############################################################################
-if [ ! -f zstd-${ZSTD_VERSION}/.done ]; then
- echo "Building ZStd"
- curl -O -L https://github.com/facebook/zstd/releases/download/v${ZSTD_VERSION}/zstd-${ZSTD_VERSION}.tar.gz
- tar xvfz zstd-${ZSTD_VERSION}.tar.gz
- pushd zstd-${ZSTD_VERSION}
- CFLAGS="-fPIC -O3 -arch arm64 -arch x86_64 -mmacosx-version-min=${MACOSX_DEPLOYMENT_TARGET}" PREFIX=$PREFIX \
- make -j16 install
- touch .done
- popd
-else
- echo "Using cached ZStd"
-fi
-
-###############################################################################
-if [ ! -f snappy-${SNAPPY_VERSION}/.done ]; then
- echo "Building Snappy"
- curl -O -L https://github.com/google/snappy/releases/download/${SNAPPY_VERSION}/snappy-${SNAPPY_VERSION}.tar.gz
- tar xvfz snappy-${SNAPPY_VERSION}.tar.gz
- pushd snappy-${SNAPPY_VERSION}
- CXXFLAGS="-fPIC -O3 -arch arm64 -arch x86_64 -mmacosx-version-min=${MACOSX_DEPLOYMENT_TARGET}" \
- ./configure --prefix=$PREFIX
- make -j16
- make install
- touch .done
- popd
-else
- echo "Using cached Snappy"
-fi
-
-###############################################################################
-if [ ! -f curl-${CURL_VERSION}/.done ]; then
- echo "Building LibCurl"
- CURL_VERSION_=${CURL_VERSION//./_}
- curl -O -L https://github.com/curl/curl/releases/download/curl-${CURL_VERSION_}/curl-${CURL_VERSION}.tar.gz
- tar xfz curl-${CURL_VERSION}.tar.gz
- pushd curl-${CURL_VERSION}
- CFLAGS="-fPIC -arch arm64 -arch x86_64 -mmacosx-version-min=${MACOSX_DEPLOYMENT_TARGET}" \
- ./configure --with-ssl=$PREFIX \
- --without-nghttp2 --without-libidn2 --disable-ldap \
- --prefix=$PREFIX
- make -j16 install
- touch .done
- popd
-else
- echo "Using cached LibCurl"
-fi
-
-###############################################################################
-###############################################################################
-###############################################################################
-###############################################################################
-
-for line in "${PYTHON_VERSIONS[@]}"; do
- read -r -a PY <<< "$line"
- PYTHON_VERSION=${PY[0]}
- PYTHON_VERSION_LONG=${PY[1]}
- echo '----------------------------------------------------------------------------'
- echo '----------------------------------------------------------------------------'
- echo '----------------------------------------------------------------------------'
- echo "Build wheel for Python $PYTHON_VERSION"
-
- cd "${ROOT_DIR}/pulsar-client-cpp"
-
- find . -name CMakeCache.txt | xargs -r rm
- find . -name CMakeFiles | xargs -r rm -rf
-
- PY_PREFIX=$CACHE_DIR/py-$PYTHON_VERSION
- PY_EXE=$PY_PREFIX/bin/python3
-
- PY_INCLUDE_DIR=${PY_PREFIX}/include/python${PYTHON_VERSION}
- ARCHS='arm64;x86_64'
- if [ $PYTHON_VERSION = '3.7' ]; then
- PY_INCLUDE_DIR=${PY_INCLUDE_DIR}m
- ARCHS='x86_64'
- fi
-
- set -x
- cmake . \
- -DCMAKE_OSX_ARCHITECTURES=${ARCHS} \
- -DCMAKE_OSX_DEPLOYMENT_TARGET=${MACOSX_DEPLOYMENT_TARGET} \
- -DCMAKE_INSTALL_PREFIX=$PREFIX \
- -DCMAKE_BUILD_TYPE=Release \
- -DCMAKE_PREFIX_PATH=$PREFIX \
- -DCMAKE_CXX_FLAGS=-I$PREFIX/include \
- -DBoost_INCLUDE_DIR=$CACHE_DIR/boost-py-$PYTHON_VERSION/include \
- -DBoost_LIBRARY_DIR=$CACHE_DIR/boost-py-$PYTHON_VERSION/lib \
- -DPYTHON_INCLUDE_DIR=$PY_INCLUDE_DIR \
- -DPYTHON_LIBRARY=$PY_PREFIX/lib/libpython${PYTHON_VERSION}.dylib \
- -DLINK_STATIC=ON \
- -DBUILD_TESTS=OFF \
- -DBUILD_WIRESHARK=OFF \
- -DPROTOC_PATH=$PREFIX/bin/protoc
-
- make clean
- make _pulsar -j16
-
- cd python
- $PY_EXE setup.py bdist_wheel
-done
diff --git a/python/custom_logger_test.py b/python/custom_logger_test.py
deleted file mode 100755
index 60f3315..0000000
--- a/python/custom_logger_test.py
+++ /dev/null
@@ -1,54 +0,0 @@
-#!/usr/bin/env python3
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from unittest import TestCase, main
-import asyncio
-import logging
-from pulsar import Client
-
-class CustomLoggingTest(TestCase):
-
- serviceUrl = 'pulsar://localhost:6650'
-
- def test_async_func_with_custom_logger(self):
- # boost::python::call may fail in C++ destructors, even worse, calls
- # to PyErr_Print could corrupt the Python interpreter.
- # See https://github.com/boostorg/python/issues/374 for details.
- # This test is to verify these functions won't be called in C++ destructors
- # so that Python's async function works well.
- client = Client(
- self.serviceUrl,
- logger=logging.getLogger('custom-logger')
- )
-
- async def async_get(value):
- consumer = client.subscribe('test_async_get', 'sub')
- consumer.close()
- return value
-
- value = 'foo'
- result = asyncio.run(async_get(value))
- self.assertEqual(value, result)
-
- client.close()
-
-if __name__ == '__main__':
- logging.basicConfig(level=logging.DEBUG)
- main()
diff --git a/python/examples/company.avsc b/python/examples/company.avsc
deleted file mode 100644
index 5fb1860..0000000
--- a/python/examples/company.avsc
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "doc": "this is doc",
- "namespace": "example.avro",
- "type": "record",
- "name": "Company",
- "fields": [
- {"name": "name", "type": ["null", "string"]},
- {"name": "address", "type": ["null", "string"]},
- {"name": "employees", "type": ["null", {"type": "array", "items": {
- "type": "record",
- "name": "Employee",
- "fields": [
- {"name": "name", "type": ["null", "string"]},
- {"name": "age", "type": ["null", "int"]}
- ]
- }}]},
- {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]},
- {"name": "companyType", "type": ["null", {"type": "enum", "name": "CompanyType", "symbols":
- ["companyType1", "companyType2", "companyType3"]}]}
- ]
-}
\ No newline at end of file
diff --git a/python/examples/rpc_client.py b/python/examples/rpc_client.py
deleted file mode 100755
index fceac54..0000000
--- a/python/examples/rpc_client.py
+++ /dev/null
@@ -1,80 +0,0 @@
-#!/usr/bin/env python3
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
-import pulsar
-import threading
-import uuid
-
-
-DEFAULT_CLIENT_TOPIC = 'rpc-client-topic'
-DEFAULT_SERVER_TOPIC = 'rpc-server-topic'
-UUID = str(uuid.uuid4())
-NUM_CLIENT = 0
-LOCK = threading.Lock()
-
-
-class RPCClient(object):
-
- def __init__(self,
- client_topic=DEFAULT_CLIENT_TOPIC,
- server_topic=DEFAULT_SERVER_TOPIC):
- self.client_topic = client_topic
- self.server_topic = server_topic
-
- global NUM_CLIENT
- with LOCK:
- self.client_no = NUM_CLIENT
- NUM_CLIENT += 1
-
- self.response = None
- self.partition_key = '{0}_{1}'.format(UUID, self.client_no)
- self.client = pulsar.Client('pulsar://localhost:6650')
- self.producer = self.client.create_producer(server_topic)
- self.consumer = \
- self.client.subscribe(client_topic,
- 'rpc-client-{}'.format(self.partition_key),
- message_listener=self.on_response)
-
- self.consumer.resume_message_listener()
-
- def on_response(self, consumer, message):
- if message.partition_key() == self.partition_key \
- and consumer.topic() == self.client_topic:
- msg = message.data().decode('utf-8')
- print('Received: {0}'.format(msg))
- self.response = msg
- consumer.acknowledge(message)
-
- def call(self, message):
- self.response = None
- self.producer.send(message.encode('utf-8'), partition_key=self.partition_key)
-
- while self.response is None:
- pass
-
- return self.response
-
-
-msg = 'foo'
-rpc_client = RPCClient()
-ret = rpc_client.call(msg)
-
-print('RPCClient message sent: {0}, result: {1}'.format(msg, ret))
diff --git a/python/examples/rpc_server.py b/python/examples/rpc_server.py
deleted file mode 100755
index d5c445f..0000000
--- a/python/examples/rpc_server.py
+++ /dev/null
@@ -1,63 +0,0 @@
-#!/usr/bin/env python3
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
-import pulsar
-
-
-DEFAULT_CLIENT_TOPIC = 'rpc-client-topic'
-DEFAULT_SERVER_TOPIC = 'rpc-server-topic'
-
-
-class RPCServer(object):
- def __init__(self,
- client_topic=DEFAULT_CLIENT_TOPIC,
- server_topic=DEFAULT_SERVER_TOPIC):
- self.client_topic = client_topic
- self.server_topic = server_topic
-
- self.client = pulsar.Client('pulsar://localhost:6650')
- self.producer = self.client.create_producer(client_topic)
- self.consumer = \
- self.client.subscribe(server_topic,
- 'rpc-server',
- pulsar.ConsumerType.Shared,
- message_listener=self.on_response)
-
- def on_response(self, consumer, message):
- print('Received from {0}: {1}'.format(message.partition_key(),
- message.data().decode('utf-8')))
-
- self.producer.send('{} bar'.format(message.data().decode('utf-8')),
- partition_key=message.partition_key())
- consumer.acknowledge(message)
-
- def start(self):
- self.consumer.resume_message_listener()
-
-
-rpc_server = RPCServer()
-rpc_server.start()
-
-try:
- while True:
- pass
-except KeyboardInterrupt:
- print('Interrupted.')
diff --git a/python/pulsar/__init__.py b/python/pulsar/__init__.py
deleted file mode 100644
index 942ec8f..0000000
--- a/python/pulsar/__init__.py
+++ /dev/null
@@ -1,1428 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-The Pulsar Python client library is based on the existing C++ client library.
-All the same features are exposed through the Python interface.
-
-Currently, the supported Python versions are 3.7, 3.8, 3.9 and 3.10.
-
-## Install from PyPI
-
-Download Python wheel binary files for MacOS and Linux
-directly from the PyPI archive.
-
- #!shell
- $ sudo pip install pulsar-client
-
-## Install from sources
-
-Follow the instructions to compile the Pulsar C++ client library. This method
-will also build the Python binding for the library.
-
-To install the Python bindings:
-
- #!shell
- $ cd pulsar-client-cpp/python
- $ sudo python setup.py install
-
-## Examples
-
-### [Producer](#pulsar.Producer) example
-
- #!python
- import pulsar
-
- client = pulsar.Client('pulsar://localhost:6650')
-
- producer = client.create_producer('my-topic')
-
- for i in range(10):
- producer.send(('Hello-%d' % i).encode('utf-8'))
-
- client.close()
-
-#### [Consumer](#pulsar.Consumer) Example
-
- #!python
- import pulsar
-
- client = pulsar.Client('pulsar://localhost:6650')
-
- consumer = client.subscribe('my-topic', 'my-subscription')
-
- while True:
- msg = consumer.receive()
- try:
- print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
- consumer.acknowledge(msg)
- except Exception:
- consumer.negative_acknowledge(msg)
-
- client.close()
-
-### [Async producer](#pulsar.Producer.send_async) example
-
- #!python
- import pulsar
-
- client = pulsar.Client('pulsar://localhost:6650')
-
- producer = client.create_producer(
- 'my-topic',
- block_if_queue_full=True,
- batching_enabled=True,
- batching_max_publish_delay_ms=10
- )
-
- def send_callback(res, msg_id):
- print('Message published res=%s', res)
-
- while True:
- producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback)
-
- client.close()
-"""
-
-import logging
-import _pulsar
-
-from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType # noqa: F401
-
-from pulsar.exceptions import *
-
-from pulsar.functions.function import Function
-from pulsar.functions.context import Context
-from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
-from pulsar import schema
-_schema = schema
-
-import re
-_retype = type(re.compile('x'))
-
-import certifi
-from datetime import timedelta
-
-
-class MessageId:
- """
- Represents a message id
- """
-
- def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
- self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
-
- 'Represents the earliest message stored in a topic'
- earliest = _pulsar.MessageId.earliest
-
- 'Represents the latest message published on a topic'
- latest = _pulsar.MessageId.latest
-
- def ledger_id(self):
- return self._msg_id.ledger_id()
-
- def entry_id(self):
- return self._msg_id.entry_id()
-
- def batch_index(self):
- return self._msg_id.batch_index()
-
- def partition(self):
- return self._msg_id.partition()
-
- def serialize(self):
- """
- Returns a bytes representation of the message id.
- This bytes sequence can be stored and later deserialized.
- """
- return self._msg_id.serialize()
-
- @staticmethod
- def deserialize(message_id_bytes):
- """
- Deserialize a message id object from a previously
- serialized bytes sequence.
- """
- return _pulsar.MessageId.deserialize(message_id_bytes)
-
-
-class Message:
- """
- Message objects are returned by a consumer, either by calling `receive` or
- through a listener.
- """
-
- def data(self):
- """
- Returns object typed bytes with the payload of the message.
- """
- return self._message.data()
-
- def value(self):
- """
- Returns object with the de-serialized version of the message content
- """
- return self._schema.decode(self._message.data())
-
- def properties(self):
- """
- Return the properties attached to the message. Properties are
- application-defined key/value pairs that will be attached to the
- message.
- """
- return self._message.properties()
-
- def partition_key(self):
- """
- Get the partitioning key for the message.
- """
- return self._message.partition_key()
-
- def publish_timestamp(self):
- """
- Get the timestamp in milliseconds with the message publish time.
- """
- return self._message.publish_timestamp()
-
- def event_timestamp(self):
- """
- Get the timestamp in milliseconds with the message event time.
- """
- return self._message.event_timestamp()
-
- def message_id(self):
- """
- The message ID that can be used to refere to this particular message.
- """
- return self._message.message_id()
-
- def topic_name(self):
- """
- Get the topic Name from which this message originated from
- """
- return self._message.topic_name()
-
- def redelivery_count(self):
- """
- Get the redelivery count for this message
- """
- return self._message.redelivery_count()
-
- def schema_version(self):
- """
- Get the schema version for this message
- """
- return self._message.schema_version()
-
- @staticmethod
- def _wrap(_message):
- self = Message()
- self._message = _message
- return self
-
-
-class MessageBatch:
-
- def __init__(self):
- self._msg_batch = _pulsar.MessageBatch()
-
- def with_message_id(self, msg_id):
- if not isinstance(msg_id, _pulsar.MessageId):
- if isinstance(msg_id, MessageId):
- msg_id = msg_id._msg_id
- else:
- raise TypeError("unknown message id type")
- self._msg_batch.with_message_id(msg_id)
- return self
-
- def parse_from(self, data, size):
- self._msg_batch.parse_from(data, size)
- _msgs = self._msg_batch.messages()
- return list(map(Message._wrap, _msgs))
-
-
-class Authentication:
- """
- Authentication provider object. Used to load authentication from an external
- shared library.
- """
- def __init__(self, dynamicLibPath, authParamsString):
- """
- Create the authentication provider instance.
-
- **Args**
-
- * `dynamicLibPath`: Path to the authentication provider shared library
- (such as `tls.so`)
- * `authParamsString`: Comma-separated list of provider-specific
- configuration params
- """
- _check_type(str, dynamicLibPath, 'dynamicLibPath')
- _check_type(str, authParamsString, 'authParamsString')
- self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
-
-
-class AuthenticationTLS(Authentication):
- """
- TLS Authentication implementation
- """
- def __init__(self, certificate_path, private_key_path):
- """
- Create the TLS authentication provider instance.
-
- **Args**
-
- * `certificatePath`: Path to the public certificate
- * `privateKeyPath`: Path to private TLS key
- """
- _check_type(str, certificate_path, 'certificate_path')
- _check_type(str, private_key_path, 'private_key_path')
- self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
-
-
-class AuthenticationToken(Authentication):
- """
- Token based authentication implementation
- """
- def __init__(self, token):
- """
- Create the token authentication provider instance.
-
- **Args**
-
- * `token`: A string containing the token or a functions that provides a
- string with the token
- """
- if not (isinstance(token, str) or callable(token)):
- raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
- self.auth = _pulsar.AuthenticationToken(token)
-
-
-class AuthenticationAthenz(Authentication):
- """
- Athenz Authentication implementation
- """
- def __init__(self, auth_params_string):
- """
- Create the Athenz authentication provider instance.
-
- **Args**
-
- * `auth_params_string`: JSON encoded configuration for Athenz client
- """
- _check_type(str, auth_params_string, 'auth_params_string')
- self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
-
-class AuthenticationOauth2(Authentication):
- """
- Oauth2 Authentication implementation
- """
- def __init__(self, auth_params_string):
- """
- Create the Oauth2 authentication provider instance.
-
- **Args**
-
- * `auth_params_string`: JSON encoded configuration for Oauth2 client
- """
- _check_type(str, auth_params_string, 'auth_params_string')
- self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
-
-class AuthenticationBasic(Authentication):
- """
- Basic Authentication implementation
- """
- def __init__(self, username, password):
- """
- Create the Basic authentication provider instance.
-
- **Args**
-
- * `username`: Used to authentication as username
- * `password`: Used to authentication as password
- """
- _check_type(str, username, 'username')
- _check_type(str, password, 'password')
- self.auth = _pulsar.AuthenticationBasic(username, password)
-
-class Client:
- """
- The Pulsar client. A single client instance can be used to create producers
- and consumers on multiple topics.
-
- The client will share the same connection pool and threads across all
- producers and consumers.
- """
-
- def __init__(self, service_url,
- authentication=None,
- operation_timeout_seconds=30,
- io_threads=1,
- message_listener_threads=1,
- concurrent_lookup_requests=50000,
- log_conf_file_path=None,
- use_tls=False,
- tls_trust_certs_file_path=None,
- tls_allow_insecure_connection=False,
- tls_validate_hostname=False,
- logger=None,
- connection_timeout_ms=10000,
- listener_name=None
- ):
- """
- Create a new Pulsar client instance.
-
- **Args**
-
- * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
-
- **Options**
-
- * `authentication`:
- Set the authentication provider to be used with the broker. For example:
- `AuthenticationTls`, `AuthenticationToken`, `AuthenticationAthenz` or `AuthenticationOauth2`
- * `operation_timeout_seconds`:
- Set timeout on client operations (subscribe, create producer, close,
- unsubscribe).
- * `io_threads`:
- Set the number of IO threads to be used by the Pulsar client.
- * `message_listener_threads`:
- Set the number of threads to be used by the Pulsar client when
- delivering messages through message listener. The default is 1 thread
- per Pulsar client. If using more than 1 thread, messages for distinct
- `message_listener`s will be delivered in different threads, however a
- single `MessageListener` will always be assigned to the same thread.
- * `concurrent_lookup_requests`:
- Number of concurrent lookup-requests allowed on each broker connection
- to prevent overload on the broker.
- * `log_conf_file_path`:
- Initialize log4cxx from a configuration file.
- * `use_tls`:
- Configure whether to use TLS encryption on the connection. This setting
- is deprecated. TLS will be automatically enabled if the `serviceUrl` is
- set to `pulsar+ssl://` or `https://`
- * `tls_trust_certs_file_path`:
- Set the path to the trusted TLS certificate file. If empty defaults to
- certifi.
- * `tls_allow_insecure_connection`:
- Configure whether the Pulsar client accepts untrusted TLS certificates
- from the broker.
- * `tls_validate_hostname`:
- Configure whether the Pulsar client validates that the hostname of the
- endpoint, matches the common name on the TLS certificate presented by
- the endpoint.
- * `logger`:
- Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
- * `connection_timeout_ms`:
- Set timeout in milliseconds on TCP connections.
- * `listener_name`:
- Listener name for lookup. Clients can use listenerName to choose one of the listeners
- as the service URL to create a connection to the broker as long as the network is accessible.
- advertisedListeners must enabled in broker side.
- """
- _check_type(str, service_url, 'service_url')
- _check_type_or_none(Authentication, authentication, 'authentication')
- _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
- _check_type(int, connection_timeout_ms, 'connection_timeout_ms')
- _check_type(int, io_threads, 'io_threads')
- _check_type(int, message_listener_threads, 'message_listener_threads')
- _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
- _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
- _check_type(bool, use_tls, 'use_tls')
- _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
- _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
- _check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
- _check_type_or_none(logging.Logger, logger, 'logger')
- _check_type_or_none(str, listener_name, 'listener_name')
-
- conf = _pulsar.ClientConfiguration()
- if authentication:
- conf.authentication(authentication.auth)
- conf.operation_timeout_seconds(operation_timeout_seconds)
- conf.connection_timeout(connection_timeout_ms)
- conf.io_threads(io_threads)
- conf.message_listener_threads(message_listener_threads)
- conf.concurrent_lookup_requests(concurrent_lookup_requests)
- if log_conf_file_path:
- conf.log_conf_file_path(log_conf_file_path)
- conf.set_logger(self._prepare_logger(logger) if logger else None)
- if listener_name:
- conf.listener_name(listener_name)
- if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
- conf.use_tls(True)
- if tls_trust_certs_file_path:
- conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
- else:
- conf.tls_trust_certs_file_path(certifi.where())
- conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
- conf.tls_validate_hostname(tls_validate_hostname)
- self._client = _pulsar.Client(service_url, conf)
- self._consumers = []
-
- @staticmethod
- def _prepare_logger(logger):
- import logging
- def log(level, message):
- old_threads = logging.logThreads
- logging.logThreads = False
- logger.log(logging.getLevelName(level), message)
- logging.logThreads = old_threads
- return log
-
- def create_producer(self, topic,
- producer_name=None,
- schema=schema.BytesSchema(),
- initial_sequence_id=None,
- send_timeout_millis=30000,
- compression_type=CompressionType.NONE,
- max_pending_messages=1000,
- max_pending_messages_across_partitions=50000,
- block_if_queue_full=False,
- batching_enabled=False,
- batching_max_messages=1000,
- batching_max_allowed_size_in_bytes=128*1024,
- batching_max_publish_delay_ms=10,
- chunking_enabled=False,
- message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
- lazy_start_partitioned_producers=False,
- properties=None,
- batching_type=BatchingType.Default,
- encryption_key=None,
- crypto_key_reader=None
- ):
- """
- Create a new producer on a given topic.
-
- **Args**
-
- * `topic`:
- The topic name
-
- **Options**
-
- * `producer_name`:
- Specify a name for the producer. If not assigned,
- the system will generate a globally unique name which can be accessed
- with `Producer.producer_name()`. When specifying a name, it is app to
- the user to ensure that, for a given topic, the producer name is unique
- across all Pulsar's clusters.
- * `schema`:
- Define the schema of the data that will be published by this producer.
- The schema will be used for two purposes:
- - Validate the data format against the topic defined schema
- - Perform serialization/deserialization between data and objects
- An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
- * `initial_sequence_id`:
- Set the baseline for the sequence ids for messages
- published by the producer. First message will be using
- `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
- be assigned incremental sequence ids, if not otherwise specified.
- * `send_timeout_millis`:
- If a message is not acknowledged by the server before the
- `send_timeout` expires, an error will be reported.
- * `compression_type`:
- Set the compression type for the producer. By default, message
- payloads are not compressed. Supported compression types are
- `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
- ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
- release in order to be able to receive messages compressed with ZSTD.
- SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
- release in order to be able to receive messages compressed with SNAPPY.
- * `max_pending_messages`:
- Set the max size of the queue holding the messages pending to receive
- an acknowledgment from the broker.
- * `max_pending_messages_across_partitions`:
- Set the max size of the queue holding the messages pending to receive
- an acknowledgment across partitions from the broker.
- * `block_if_queue_full`: Set whether `send_async` operations should
- block when the outgoing message queue is full.
- * `message_routing_mode`:
- Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
- other option is `PartitionsRoutingMode.UseSinglePartition`
- * `lazy_start_partitioned_producers`:
- This config affects producers of partitioned topics only. It controls whether
- producers register and connect immediately to the owner broker of each partition
- or start lazily on demand. The internal producer of one partition is always
- started eagerly, chosen by the routing policy, but the internal producers of
- any additional partitions are started on demand, upon receiving their first
- message.
- Using this mode can reduce the strain on brokers for topics with large numbers of
- partitions and when the SinglePartition routing policy is used without keyed messages.
- Because producer connection can be on demand, this can produce extra send latency
- for the first messages of a given partition.
- * `properties`:
- Sets the properties for the producer. The properties associated with a producer
- can be used for identify a producer at broker side.
- * `batching_type`:
- Sets the batching type for the producer.
- There are two batching type: DefaultBatching and KeyBasedBatching.
- - Default batching
- incoming single messages:
- (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
- batched into single batch message:
- [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
-
- - KeyBasedBatching
- incoming single messages:
- (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
- batched into single batch message:
- [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
- * `chunking_enabled`:
- If message size is higher than allowed max publish-payload size by broker then chunking_enabled
- helps producer to split message into multiple chunks and publish them to broker separately and in
- order. So, it allows client to successfully publish large size of messages in pulsar.
- * encryption_key:
- The key used for symmetric encryption, configured on the producer side
- * crypto_key_reader:
- Symmetric encryption class implementation, configuring public key encryption messages for the producer
- and private key decryption messages for the consumer
- """
- _check_type(str, topic, 'topic')
- _check_type_or_none(str, producer_name, 'producer_name')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
- _check_type(int, send_timeout_millis, 'send_timeout_millis')
- _check_type(CompressionType, compression_type, 'compression_type')
- _check_type(int, max_pending_messages, 'max_pending_messages')
- _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
- _check_type(bool, block_if_queue_full, 'block_if_queue_full')
- _check_type(bool, batching_enabled, 'batching_enabled')
- _check_type(int, batching_max_messages, 'batching_max_messages')
- _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
- _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
- _check_type(bool, chunking_enabled, 'chunking_enabled')
- _check_type_or_none(dict, properties, 'properties')
- _check_type(BatchingType, batching_type, 'batching_type')
- _check_type_or_none(str, encryption_key, 'encryption_key')
- _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
- _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')
-
- conf = _pulsar.ProducerConfiguration()
- conf.send_timeout_millis(send_timeout_millis)
- conf.compression_type(compression_type)
- conf.max_pending_messages(max_pending_messages)
- conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
- conf.block_if_queue_full(block_if_queue_full)
- conf.batching_enabled(batching_enabled)
- conf.batching_max_messages(batching_max_messages)
- conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
- conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
- conf.partitions_routing_mode(message_routing_mode)
- conf.batching_type(batching_type)
- conf.chunking_enabled(chunking_enabled)
- conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
- if producer_name:
- conf.producer_name(producer_name)
- if initial_sequence_id:
- conf.initial_sequence_id(initial_sequence_id)
- if properties:
- for k, v in properties.items():
- conf.property(k, v)
-
- conf.schema(schema.schema_info())
- if encryption_key:
- conf.encryption_key(encryption_key)
- if crypto_key_reader:
- conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-
- if batching_enabled and chunking_enabled:
- raise ValueError("Batching and chunking of messages can't be enabled together.")
-
- p = Producer()
- p._producer = self._client.create_producer(topic, conf)
- p._schema = schema
- p._client = self._client
- return p
-
- def subscribe(self, topic, subscription_name,
- consumer_type=ConsumerType.Exclusive,
- schema=schema.BytesSchema(),
- message_listener=None,
- receiver_queue_size=1000,
- max_total_receiver_queue_size_across_partitions=50000,
- consumer_name=None,
- unacked_messages_timeout_ms=None,
- broker_consumer_stats_cache_time_ms=30000,
- negative_ack_redelivery_delay_ms=60000,
- is_read_compacted=False,
- properties=None,
- pattern_auto_discovery_period=60,
- initial_position=InitialPosition.Latest,
- crypto_key_reader=None,
- replicate_subscription_state_enabled=False,
- max_pending_chunked_message=10,
- auto_ack_oldest_chunked_message_on_queue_full=False
- ):
- """
- Subscribe to the given topic and subscription combination.
-
- **Args**
-
- * `topic`: The name of the topic, list of topics or regex pattern.
- This method will accept these forms:
- - `topic='my-topic'`
- - `topic=['topic-1', 'topic-2', 'topic-3']`
- - `topic=re.compile('persistent://public/default/topic-*')`
- * `subscription`: The name of the subscription.
-
- **Options**
-
- * `consumer_type`:
- Select the subscription type to be used when subscribing to the topic.
- * `schema`:
- Define the schema of the data that will be received by this consumer.
- * `message_listener`:
- Sets a message listener for the consumer. When the listener is set,
- the application will receive messages through it. Calls to
- `consumer.receive()` will not be allowed. The listener function needs
- to accept (consumer, message), for example:
-
- #!python
- def my_listener(consumer, message):
- # process message
- consumer.acknowledge(message)
-
- * `receiver_queue_size`:
- Sets the size of the consumer receive queue. The consumer receive
- queue controls how many messages can be accumulated by the consumer
- before the application calls `receive()`. Using a higher value could
- potentially increase the consumer throughput at the expense of higher
- memory utilization. Setting the consumer queue size to zero decreases
- the throughput of the consumer by disabling pre-fetching of messages.
- This approach improves the message distribution on shared subscription
- by pushing messages only to those consumers that are ready to process
- them. Neither receive with timeout nor partitioned topics can be used
- if the consumer queue size is zero. The `receive()` function call
- should not be interrupted when the consumer queue size is zero. The
- default value is 1000 messages and should work well for most use
- cases.
- * `max_total_receiver_queue_size_across_partitions`
- Set the max total receiver queue size across partitions.
- This setting will be used to reduce the receiver queue size for individual partitions
- * `consumer_name`:
- Sets the consumer name.
- * `unacked_messages_timeout_ms`:
- Sets the timeout in milliseconds for unacknowledged messages. The
- timeout needs to be greater than 10 seconds. An exception is thrown if
- the given value is less than 10 seconds. If a successful
- acknowledgement is not sent within the timeout, all the unacknowledged
- messages are redelivered.
- * `negative_ack_redelivery_delay_ms`:
- The delay after which to redeliver the messages that failed to be
- processed (with the `consumer.negative_acknowledge()`)
- * `broker_consumer_stats_cache_time_ms`:
- Sets the time duration for which the broker-side consumer stats will
- be cached in the client.
- * `is_read_compacted`:
- Selects whether to read the compacted version of the topic
- * `properties`:
- Sets the properties for the consumer. The properties associated with a consumer
- can be used for identify a consumer at broker side.
- * `pattern_auto_discovery_period`:
- Periods of seconds for consumer to auto discover match topics.
- * `initial_position`:
- Set the initial position of a consumer when subscribing to the topic.
- It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
- Default: `Latest`.
- * crypto_key_reader:
- Symmetric encryption class implementation, configuring public key encryption messages for the producer
- and private key decryption messages for the consumer
- * replicate_subscription_state_enabled:
- Set whether the subscription status should be replicated.
- Default: `False`.
- * max_pending_chunked_message:
- Consumer buffers chunk messages into memory until it receives all the chunks of the original message.
- While consuming chunk-messages, chunks from same message might not be contiguous in the stream and they
- might be mixed with other messages' chunks. so, consumer has to maintain multiple buffers to manage
- chunks coming from different messages. This mainly happens when multiple publishers are publishing
- messages on the topic concurrently or publisher failed to publish all chunks of the messages.
-
- If it's zero, the pending chunked messages will not be limited.
-
- Default: `10`.
- * auto_ack_oldest_chunked_message_on_queue_full:
- Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it
- can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage.
- Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking
- if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
- Default: `False`.
- """
- _check_type(str, subscription_name, 'subscription_name')
- _check_type(ConsumerType, consumer_type, 'consumer_type')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type(int, receiver_queue_size, 'receiver_queue_size')
- _check_type(int, max_total_receiver_queue_size_across_partitions,
- 'max_total_receiver_queue_size_across_partitions')
- _check_type_or_none(str, consumer_name, 'consumer_name')
- _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
- _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
- _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
- _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
- _check_type(bool, is_read_compacted, 'is_read_compacted')
- _check_type_or_none(dict, properties, 'properties')
- _check_type(InitialPosition, initial_position, 'initial_position')
- _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
- _check_type(int, max_pending_chunked_message, 'max_pending_chunked_message')
- _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full')
-
- conf = _pulsar.ConsumerConfiguration()
- conf.consumer_type(consumer_type)
- conf.read_compacted(is_read_compacted)
- if message_listener:
- conf.message_listener(_listener_wrapper(message_listener, schema))
- conf.receiver_queue_size(receiver_queue_size)
- conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
- if consumer_name:
- conf.consumer_name(consumer_name)
- if unacked_messages_timeout_ms:
- conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-
- conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
- conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
- if properties:
- for k, v in properties.items():
- conf.property(k, v)
- conf.subscription_initial_position(initial_position)
-
- conf.schema(schema.schema_info())
-
- if crypto_key_reader:
- conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-
- conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
- conf.max_pending_chunked_message(max_pending_chunked_message)
- conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
-
- c = Consumer()
- if isinstance(topic, str):
- # Single topic
- c._consumer = self._client.subscribe(topic, subscription_name, conf)
- elif isinstance(topic, list):
- # List of topics
- c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
- elif isinstance(topic, _retype):
- # Regex pattern
- c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
- else:
- raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
-
- c._client = self
- c._schema = schema
- self._consumers.append(c)
- return c
-
- def create_reader(self, topic, start_message_id,
- schema=schema.BytesSchema(),
- reader_listener=None,
- receiver_queue_size=1000,
- reader_name=None,
- subscription_role_prefix=None,
- is_read_compacted=False,
- crypto_key_reader=None
- ):
- """
- Create a reader on a particular topic
-
- **Args**
-
- * `topic`: The name of the topic.
- * `start_message_id`: The initial reader positioning is done by specifying a message id.
- The options are:
- * `MessageId.earliest`: Start reading from the earliest message available in the topic
- * `MessageId.latest`: Start reading from the end topic, only getting messages published
- after the reader was created
- * `MessageId`: When passing a particular message id, the reader will position itself on
- that specific position. The first message to be read will be the message next to the
- specified messageId. Message id can be serialized into a string and deserialized
- back into a `MessageId` object:
-
- # Serialize to string
- s = msg.message_id().serialize()
-
- # Deserialize from string
- msg_id = MessageId.deserialize(s)
-
- **Options**
-
- * `schema`:
- Define the schema of the data that will be received by this reader.
- * `reader_listener`:
- Sets a message listener for the reader. When the listener is set,
- the application will receive messages through it. Calls to
- `reader.read_next()` will not be allowed. The listener function needs
- to accept (reader, message), for example:
-
- def my_listener(reader, message):
- # process message
- pass
-
- * `receiver_queue_size`:
- Sets the size of the reader receive queue. The reader receive
- queue controls how many messages can be accumulated by the reader
- before the application calls `read_next()`. Using a higher value could
- potentially increase the reader throughput at the expense of higher
- memory utilization.
- * `reader_name`:
- Sets the reader name.
- * `subscription_role_prefix`:
- Sets the subscription role prefix.
- * `is_read_compacted`:
- Selects whether to read the compacted version of the topic
- * crypto_key_reader:
- Symmetric encryption class implementation, configuring public key encryption messages for the producer
- and private key decryption messages for the consumer
- """
- _check_type(str, topic, 'topic')
- _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type(int, receiver_queue_size, 'receiver_queue_size')
- _check_type_or_none(str, reader_name, 'reader_name')
- _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
- _check_type(bool, is_read_compacted, 'is_read_compacted')
- _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
-
- conf = _pulsar.ReaderConfiguration()
- if reader_listener:
- conf.reader_listener(_listener_wrapper(reader_listener, schema))
- conf.receiver_queue_size(receiver_queue_size)
- if reader_name:
- conf.reader_name(reader_name)
- if subscription_role_prefix:
- conf.subscription_role_prefix(subscription_role_prefix)
- conf.schema(schema.schema_info())
- conf.read_compacted(is_read_compacted)
- if crypto_key_reader:
- conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-
- c = Reader()
- c._reader = self._client.create_reader(topic, start_message_id, conf)
- c._client = self
- c._schema = schema
- self._consumers.append(c)
- return c
-
- def get_topic_partitions(self, topic):
- """
- Get the list of partitions for a given topic.
-
- If the topic is partitioned, this will return a list of partition names. If the topic is not
- partitioned, the returned list will contain the topic name itself.
-
- This can be used to discover the partitions and create Reader, Consumer or Producer
- instances directly on a particular partition.
- :param topic: the topic name to lookup
- :return: a list of partition name
- """
- _check_type(str, topic, 'topic')
- return self._client.get_topic_partitions(topic)
-
- def shutdown(self):
- """
- Perform immediate shutdown of Pulsar client.
-
- Release all resources and close all producer, consumer, and readers without waiting
- for ongoing operations to complete.
- """
- self._client.shutdown()
-
- def close(self):
- """
- Close the client and all the associated producers and consumers
- """
- self._client.close()
-
-
-class Producer:
- """
- The Pulsar message producer, used to publish messages on a topic.
- """
-
- def topic(self):
- """
- Return the topic which producer is publishing to
- """
- return self._producer.topic()
-
- def producer_name(self):
- """
- Return the producer name which could have been assigned by the
- system or specified by the client
- """
- return self._producer.producer_name()
-
- def last_sequence_id(self):
- """
- Get the last sequence id that was published by this producer.
-
- This represent either the automatically assigned or custom sequence id
- (set on the `MessageBuilder`) that was published and acknowledged by the broker.
-
- After recreating a producer with the same producer name, this will return the
- last message that was published in the previous producer session, or -1 if
- there no message was ever published.
- """
- return self._producer.last_sequence_id()
-
- def send(self, content,
- properties=None,
- partition_key=None,
- sequence_id=None,
- replication_clusters=None,
- disable_replication=False,
- event_timestamp=None,
- deliver_at=None,
- deliver_after=None,
- ):
- """
- Publish a message on the topic. Blocks until the message is acknowledged
-
- Returns a `MessageId` object that represents where the message is persisted.
-
- **Args**
-
- * `content`:
- A `bytes` object with the message payload.
-
- **Options**
-
- * `properties`:
- A dict of application-defined string properties.
- * `partition_key`:
- Sets the partition key for message routing. A hash of this key is used
- to determine the message's topic partition.
- * `sequence_id`:
- Specify a custom sequence id for the message being published.
- * `replication_clusters`:
- Override namespace replication clusters. Note that it is the caller's
- responsibility to provide valid cluster names and that all clusters
- have been previously configured as topics. Given an empty list,
- the message will replicate according to the namespace configuration.
- * `disable_replication`:
- Do not replicate this message.
- * `event_timestamp`:
- Timestamp in millis of the timestamp of event creation
- * `deliver_at`:
- Specify the this message should not be delivered earlier than the
- specified timestamp.
- The timestamp is milliseconds and based on UTC
- * `deliver_after`:
- Specify a delay in timedelta for the delivery of the messages.
-
- """
- msg = self._build_msg(content, properties, partition_key, sequence_id,
- replication_clusters, disable_replication, event_timestamp,
- deliver_at, deliver_after)
- return MessageId.deserialize(self._producer.send(msg))
-
- def send_async(self, content, callback,
- properties=None,
- partition_key=None,
- sequence_id=None,
- replication_clusters=None,
- disable_replication=False,
- event_timestamp=None,
- deliver_at=None,
- deliver_after=None,
- ):
- """
- Send a message asynchronously.
-
- The `callback` will be invoked once the message has been acknowledged
- by the broker.
-
- Example:
-
- #!python
- def callback(res, msg_id):
- print('Message published: %s' % res)
-
- producer.send_async(msg, callback)
-
- When the producer queue is full, by default the message will be rejected
- and the callback invoked with an error code.
-
- **Args**
-
- * `content`:
- A `bytes` object with the message payload.
-
- **Options**
-
- * `properties`:
- A dict of application0-defined string properties.
- * `partition_key`:
- Sets the partition key for the message routing. A hash of this key is
- used to determine the message's topic partition.
- * `sequence_id`:
- Specify a custom sequence id for the message being published.
- * `replication_clusters`: Override namespace replication clusters. Note
- that it is the caller's responsibility to provide valid cluster names
- and that all clusters have been previously configured as topics.
- Given an empty list, the message will replicate per the namespace
- configuration.
- * `disable_replication`:
- Do not replicate this message.
- * `event_timestamp`:
- Timestamp in millis of the timestamp of event creation
- * `deliver_at`:
- Specify the this message should not be delivered earlier than the
- specified timestamp.
- The timestamp is milliseconds and based on UTC
- * `deliver_after`:
- Specify a delay in timedelta for the delivery of the messages.
- """
- msg = self._build_msg(content, properties, partition_key, sequence_id,
- replication_clusters, disable_replication, event_timestamp,
- deliver_at, deliver_after)
- self._producer.send_async(msg, callback)
-
-
- def flush(self):
- """
- Flush all the messages buffered in the client and wait until all messages have been
- successfully persisted
- """
- self._producer.flush()
-
-
- def close(self):
- """
- Close the producer.
- """
- self._producer.close()
-
- def _build_msg(self, content, properties, partition_key, sequence_id,
- replication_clusters, disable_replication, event_timestamp,
- deliver_at, deliver_after):
- data = self._schema.encode(content)
-
- _check_type(bytes, data, 'data')
- _check_type_or_none(dict, properties, 'properties')
- _check_type_or_none(str, partition_key, 'partition_key')
- _check_type_or_none(int, sequence_id, 'sequence_id')
- _check_type_or_none(list, replication_clusters, 'replication_clusters')
- _check_type(bool, disable_replication, 'disable_replication')
- _check_type_or_none(int, event_timestamp, 'event_timestamp')
- _check_type_or_none(int, deliver_at, 'deliver_at')
- _check_type_or_none(timedelta, deliver_after, 'deliver_after')
-
- mb = _pulsar.MessageBuilder()
- mb.content(data)
- if properties:
- for k, v in properties.items():
- mb.property(k, v)
- if partition_key:
- mb.partition_key(partition_key)
- if sequence_id:
- mb.sequence_id(sequence_id)
- if replication_clusters:
- mb.replication_clusters(replication_clusters)
- if disable_replication:
- mb.disable_replication(disable_replication)
- if event_timestamp:
- mb.event_timestamp(event_timestamp)
- if deliver_at:
- mb.deliver_at(deliver_at)
- if deliver_after:
- mb.deliver_after(deliver_after)
-
- return mb.build()
-
- def is_connected(self):
- """
- Check if the producer is connected or not.
- """
- return self._producer.is_connected()
-
-
-class Consumer:
- """
- Pulsar consumer.
- """
-
- def topic(self):
- """
- Return the topic this consumer is subscribed to.
- """
- return self._consumer.topic()
-
- def subscription_name(self):
- """
- Return the subscription name.
- """
- return self._consumer.subscription_name()
-
- def unsubscribe(self):
- """
- Unsubscribe the current consumer from the topic.
-
- This method will block until the operation is completed. Once the
- consumer is unsubscribed, no more messages will be received and
- subsequent new messages will not be retained for this consumer.
-
- This consumer object cannot be reused.
- """
- return self._consumer.unsubscribe()
-
- def receive(self, timeout_millis=None):
- """
- Receive a single message.
-
- If a message is not immediately available, this method will block until
- a new message is available.
-
- **Options**
-
- * `timeout_millis`:
- If specified, the receive will raise an exception if a message is not
- available within the timeout.
- """
- if timeout_millis is None:
- msg = self._consumer.receive()
- else:
- _check_type(int, timeout_millis, 'timeout_millis')
- msg = self._consumer.receive(timeout_millis)
-
- m = Message()
- m._message = msg
- m._schema = self._schema
- return m
-
- def acknowledge(self, message):
- """
- Acknowledge the reception of a single message.
-
- This method will block until an acknowledgement is sent to the broker.
- After that, the message will not be re-delivered to this consumer.
-
- **Args**
-
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.acknowledge(message._message)
- else:
- self._consumer.acknowledge(message)
-
- def acknowledge_cumulative(self, message):
- """
- Acknowledge the reception of all the messages in the stream up to (and
- including) the provided message.
-
- This method will block until an acknowledgement is sent to the broker.
- After that, the messages will not be re-delivered to this consumer.
-
- **Args**
-
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.acknowledge_cumulative(message._message)
- else:
- self._consumer.acknowledge_cumulative(message)
-
- def negative_acknowledge(self, message):
- """
- Acknowledge the failure to process a single message.
-
- When a message is "negatively acked" it will be marked for redelivery after
- some fixed delay. The delay is configurable when constructing the consumer
- with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
-
- This call is not blocking.
-
- **Args**
-
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.negative_acknowledge(message._message)
- else:
- self._consumer.negative_acknowledge(message)
-
- def pause_message_listener(self):
- """
- Pause receiving messages via the `message_listener` until
- `resume_message_listener()` is called.
- """
- self._consumer.pause_message_listener()
-
- def resume_message_listener(self):
- """
- Resume receiving the messages via the message listener.
- Asynchronously receive all the messages enqueued from the time
- `pause_message_listener()` was called.
- """
- self._consumer.resume_message_listener()
-
- def redeliver_unacknowledged_messages(self):
- """
- Redelivers all the unacknowledged messages. In failover mode, the
- request is ignored if the consumer is not active for the given topic. In
- shared mode, the consumer's messages to be redelivered are distributed
- across all the connected consumers. This is a non-blocking call and
- doesn't throw an exception. In case the connection breaks, the messages
- are redelivered after reconnect.
- """
- self._consumer.redeliver_unacknowledged_messages()
-
- def seek(self, messageid):
- """
- Reset the subscription associated with this consumer to a specific message id or publish timestamp.
- The message id can either be a specific message or represent the first or last messages in the topic.
- Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
- seek() on the individual partitions.
-
- **Args**
-
- * `message`:
- The message id for seek, OR an integer event time to seek to
- """
- self._consumer.seek(messageid)
-
- def close(self):
- """
- Close the consumer.
- """
- self._consumer.close()
- self._client._consumers.remove(self)
-
- def is_connected(self):
- """
- Check if the consumer is connected or not.
- """
- return self._consumer.is_connected()
-
- def get_last_message_id(self):
- """
- Get the last message id.
- """
- return self._consumer.get_last_message_id()
-
-
-class Reader:
- """
- Pulsar topic reader.
- """
-
- def topic(self):
- """
- Return the topic this reader is reading from.
- """
- return self._reader.topic()
-
- def read_next(self, timeout_millis=None):
- """
- Read a single message.
-
- If a message is not immediately available, this method will block until
- a new message is available.
-
- **Options**
-
- * `timeout_millis`:
- If specified, the receive will raise an exception if a message is not
- available within the timeout.
- """
- if timeout_millis is None:
- msg = self._reader.read_next()
- else:
- _check_type(int, timeout_millis, 'timeout_millis')
- msg = self._reader.read_next(timeout_millis)
-
- m = Message()
- m._message = msg
- m._schema = self._schema
- return m
-
- def has_message_available(self):
- """
- Check if there is any message available to read from the current position.
- """
- return self._reader.has_message_available();
-
- def seek(self, messageid):
- """
- Reset this reader to a specific message id or publish timestamp.
- The message id can either be a specific message or represent the first or last messages in the topic.
- Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
- seek() on the individual partitions.
-
- **Args**
-
- * `message`:
- The message id for seek, OR an integer event time to seek to
- """
- self._reader.seek(messageid)
-
- def close(self):
- """
- Close the reader.
- """
- self._reader.close()
- self._client._consumers.remove(self)
-
- def is_connected(self):
- """
- Check if the reader is connected or not.
- """
- return self._reader.is_connected()
-
-
-class CryptoKeyReader:
- """
- Default crypto key reader implementation
- """
- def __init__(self, public_key_path, private_key_path):
- """
- Create crypto key reader.
-
- **Args**
-
- * `public_key_path`: Path to the public key
- * `private_key_path`: Path to private key
- """
- _check_type(str, public_key_path, 'public_key_path')
- _check_type(str, private_key_path, 'private_key_path')
- self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
-
-def _check_type(var_type, var, name):
- if not isinstance(var, var_type):
- raise ValueError("Argument %s is expected to be of type '%s' and not '%s'"
- % (name, var_type.__name__, type(var).__name__))
-
-
-def _check_type_or_none(var_type, var, name):
- if var is not None and not isinstance(var, var_type):
- raise ValueError("Argument %s is expected to be either None or of type '%s'"
- % (name, var_type.__name__))
-
-
-def _listener_wrapper(listener, schema):
- def wrapper(consumer, msg):
- c = Consumer()
- c._consumer = consumer
- m = Message()
- m._message = msg
- m._schema = schema
- listener(c, m)
- return wrapper
diff --git a/python/pulsar/exceptions.py b/python/pulsar/exceptions.py
deleted file mode 100644
index d151564..0000000
--- a/python/pulsar/exceptions.py
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from _pulsar import PulsarException, UnknownError, InvalidConfiguration, Timeout, LookupError, ConnectError, \
- ReadError, AuthenticationError, AuthorizationError, ErrorGettingAuthenticationData, BrokerMetadataError, \
- BrokerPersistenceError, ChecksumError, ConsumerBusy, NotConnected, AlreadyClosed, InvalidMessage, \
- ConsumerNotInitialized, ProducerNotInitialized, ProducerBusy, TooManyLookupRequestException, InvalidTopicName, \
- InvalidUrl, ServiceUnitNotReady, OperationNotSupported, ProducerBlockedQuotaExceededError, \
- ProducerBlockedQuotaExceededException, ProducerQueueIsFull, MessageTooBig, TopicNotFound, SubscriptionNotFound, \
- ConsumerNotFound, UnsupportedVersionError, TopicTerminated, CryptoError, IncompatibleSchema, ConsumerAssignError, \
- CumulativeAcknowledgementNotAllowedError, TransactionCoordinatorNotFoundError, InvalidTxnStatusError, \
- NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull
diff --git a/python/pulsar/functions/__init__.py b/python/pulsar/functions/__init__.py
deleted file mode 100644
index 47c179a..0000000
--- a/python/pulsar/functions/__init__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# -*- encoding: utf-8 -*-
diff --git a/python/pulsar/functions/context.py b/python/pulsar/functions/context.py
deleted file mode 100644
index c1f6801..0000000
--- a/python/pulsar/functions/context.py
+++ /dev/null
@@ -1,191 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-"""context.py: Context defines context information available during
-# processing of a request.
-"""
-from abc import abstractmethod
-
-class Context(object):
- """Interface defining information available at process time"""
- @abstractmethod
- def get_message_id(self):
- """Return the messageid of the current message that we are processing"""
- pass
-
- @abstractmethod
- def get_message_key(self):
- """Return the key of the current message that we are processing"""
- pass
-
- @abstractmethod
- def get_message_eventtime(self):
- """Return the event time of the current message that we are processing"""
- pass
-
- @abstractmethod
- def get_message_properties(self):
- """Return the message properties kv map of the current message that we are processing"""
- pass
-
- @abstractmethod
- def get_current_message_topic_name(self):
- """Returns the topic name of the message that we are processing"""
- pass
-
- @abstractmethod
- def get_function_tenant(self):
- """Returns the tenant of the message that's being processed"""
- pass
-
- @abstractmethod
- def get_function_namespace(self):
- """Returns the namespace of the message that's being processed"""
-
- @abstractmethod
- def get_function_name(self):
- """Returns the function name that we are a part of"""
- pass
-
- @abstractmethod
- def get_function_id(self):
- """Returns the function id that we are a part of"""
- pass
-
- @abstractmethod
- def get_instance_id(self):
- """Returns the instance id that is executing the function"""
- pass
-
- @abstractmethod
- def get_function_version(self):
- """Returns the version of function that we are executing"""
- pass
-
- @abstractmethod
- def get_logger(self):
- """Returns the logger object that can be used to do logging"""
- pass
-
- @abstractmethod
- def get_user_config_value(self, key):
- """Returns the value of the user-defined config. If the key doesn't exist, None is returned"""
- pass
-
- @abstractmethod
- def get_user_config_map(self):
- """Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)"""
- pass
-
- @abstractmethod
- def get_secret(self, secret_name):
- """Returns the secret value associated with the name. None if nothing was found"""
- pass
-
- @abstractmethod
- def get_partition_key(self):
- """Returns partition key of the input message is one exists"""
- pass
-
-
- @abstractmethod
- def record_metric(self, metric_name, metric_value):
- """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
- pass
-
- @abstractmethod
- def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None):
- """Publishes message to topic_name by first serializing the message using serde_class_name serde
- The message will have properties specified if any
-
- The available options for message_conf:
-
- properties,
- partition_key,
- sequence_id,
- replication_clusters,
- disable_replication,
- event_timestamp
-
- """
- pass
-
- @abstractmethod
- def get_input_topics(self):
- """Returns the input topics of function"""
- pass
-
- @abstractmethod
- def get_output_topic(self):
- """Returns the output topic of function"""
- pass
-
- @abstractmethod
- def get_output_serde_class_name(self):
- """return output Serde class"""
- pass
-
- @abstractmethod
- def ack(self, msgid, topic):
- """ack this message id"""
- pass
-
- @abstractmethod
- def incr_counter(self, key, amount):
- """incr the counter of a given key in the managed state"""
- pass
-
- @abstractmethod
- def get_counter(self, key):
- """get the counter of a given key in the managed state"""
- pass
-
- @abstractmethod
- def del_counter(self, key):
- """delete the counter of a given key in the managed state"""
- pass
-
- @abstractmethod
- def put_state(self, key, value):
- """update the value of a given key in the managed state"""
- pass
-
- @abstractmethod
- def get_state(self, key):
- """get the value of a given key in the managed state"""
- pass
diff --git a/python/pulsar/functions/function.py b/python/pulsar/functions/function.py
deleted file mode 100644
index ce2919d..0000000
--- a/python/pulsar/functions/function.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-"""function.py: This is the core interface of the function api.
-# The process method is called for every message of the input topic of the
-# function. The incoming input bytes are deserialized using the serde.
-# The process function can optionally emit an output
-"""
-from abc import abstractmethod
-
-class Function(object):
- """Interface for Pulsar Function"""
- @abstractmethod
- def process(self, input, context):
- """Process input message"""
- pass
\ No newline at end of file
diff --git a/python/pulsar/functions/serde.py b/python/pulsar/functions/serde.py
deleted file mode 100644
index 7b07673..0000000
--- a/python/pulsar/functions/serde.py
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-"""serde.py: SerDe defines the interface for serialization/deserialization.
-# Everytime a message is read from pulsar topic, the serde is invoked to
-# serialize the bytes into an object before invoking the process method.
-# Anytime a python object needs to be written back to pulsar, it is
-# serialized into bytes before writing.
-"""
-from abc import abstractmethod
-
-import pickle
-
-class SerDe(object):
- """Interface for Serialization/Deserialization"""
- @abstractmethod
- def serialize(self, input):
- """Serialize input message into bytes"""
- pass
-
- @abstractmethod
- def deserialize(self, input_bytes):
- """Serialize input_bytes into an object"""
- pass
-
-class PickleSerDe(SerDe):
- """Pickle based serializer"""
- def serialize(self, input):
- return pickle.dumps(input)
-
- def deserialize(self, input_bytes):
- return pickle.loads(input_bytes)
-
-class IdentitySerDe(SerDe):
- """Simple Serde that just conversion to string and back"""
- def __init__(self):
- self._types = [int, float, complex, str]
-
- def serialize(self, input):
- if type(input) in self._types:
- return str(input).encode('utf-8')
- if type(input) == bytes:
- return input
- raise TypeError("IdentitySerde cannot serialize object of type %s" % type(input))
-
- def deserialize(self, input_bytes):
- for typ in self._types:
- try:
- return typ(input_bytes.decode('utf-8'))
- except:
- pass
- return input_bytes
diff --git a/python/pulsar/schema/__init__.py b/python/pulsar/schema/__init__.py
deleted file mode 100644
index efa6806..0000000
--- a/python/pulsar/schema/__init__.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from .definition import Record, Field, Null, Boolean, Integer, Long, \
- Float, Double, Bytes, String, Array, Map, CustomEnum
-
-from .schema import Schema, BytesSchema, StringSchema, JsonSchema
-from .schema_avro import AvroSchema
diff --git a/python/pulsar/schema/definition.py b/python/pulsar/schema/definition.py
deleted file mode 100644
index 60ab7cc..0000000
--- a/python/pulsar/schema/definition.py
+++ /dev/null
@@ -1,515 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import copy
-from abc import abstractmethod
-from collections import OrderedDict
-from enum import Enum, EnumMeta
-
-
-def _check_record_or_field(x):
- if (type(x) is type and not issubclass(x, Record)) \
- and not isinstance(x, Field):
- raise Exception('Argument ' + x + ' is not a Record or a Field')
-
-
-class RecordMeta(type):
- def __new__(metacls, name, parents, dct):
- if name != 'Record':
- # Do not apply this logic to the base class itself
- dct['_fields'] = RecordMeta._get_fields(dct)
- dct['_required'] = False
- return type.__new__(metacls, name, parents, dct)
-
- @classmethod
- def _get_fields(cls, dct):
- # Build a set of valid fields for this record
- fields = OrderedDict()
- for name, value in dct.items():
- if issubclass(type(value), EnumMeta):
- value = CustomEnum(value)
- elif type(value) == RecordMeta:
- # We expect an instance of a record rather than the class itself
- value = value()
-
- if isinstance(value, Record) or isinstance(value, Field):
- fields[name] = value
- return fields
-
-
-class Record(metaclass=RecordMeta):
-
- # This field is used to set namespace for Avro Record schema.
- _avro_namespace = None
-
- # Generate a schema where fields are sorted alphabetically
- _sorted_fields = False
-
- def __init__(self, default=None, required_default=False, required=False, *args, **kwargs):
- self._required_default = required_default
- self._default = default
- self._required = required
-
- for k, value in self._fields.items():
- if k in kwargs:
- if isinstance(value, Record) and isinstance(kwargs[k], dict):
- # Use dict init Record object
- copied = copy.copy(value)
- copied.__init__(**kwargs[k])
- self.__setattr__(k, copied)
- elif isinstance(value, Array) and isinstance(kwargs[k], list) and len(kwargs[k]) > 0 \
- and isinstance(value.array_type, Record) and isinstance(kwargs[k][0], dict):
- arr = []
- for item in kwargs[k]:
- copied = copy.copy(value.array_type)
- copied.__init__(**item)
- arr.append(copied)
- self.__setattr__(k, arr)
- elif isinstance(value, Map) and isinstance(kwargs[k], dict) and len(kwargs[k]) > 0 \
- and isinstance(value.value_type, Record) and isinstance(list(kwargs[k].values())[0], dict):
- dic = {}
- for mapKey, mapValue in kwargs[k].items():
- copied = copy.copy(value.value_type)
- copied.__init__(**mapValue)
- dic[mapKey] = copied
- self.__setattr__(k, dic)
- else:
- # Value was overridden at constructor
- self.__setattr__(k, kwargs[k])
- elif isinstance(value, Record):
- # Value is a subrecord
- self.__setattr__(k, value)
- else:
- # Set field to default value, without revalidating the default value type
- super(Record, self).__setattr__(k, value.default())
-
- @classmethod
- def schema(cls):
- return cls.schema_info(set())
-
- @classmethod
- def schema_info(cls, defined_names):
- namespace_prefix = ''
- if cls._avro_namespace is not None:
- namespace_prefix = cls._avro_namespace + '.'
- namespace_name = namespace_prefix + cls.__name__
-
- if namespace_name in defined_names:
- return namespace_name
-
- defined_names.add(namespace_name)
-
- schema = {
- 'type': 'record',
- 'name': str(cls.__name__)
- }
- if cls._avro_namespace is not None:
- schema['namespace'] = cls._avro_namespace
- schema['fields'] = []
-
- def get_filed_default_value(value):
- if isinstance(value, Enum):
- return value.name
- else:
- return value
-
- if cls._sorted_fields:
- fields = sorted(cls._fields.keys())
- else:
- fields = cls._fields.keys()
- for name in fields:
- field = cls._fields[name]
- field_type = field.schema_info(defined_names) \
- if field._required else ['null', field.schema_info(defined_names)]
- schema['fields'].append({
- 'name': name,
- 'default': get_filed_default_value(field.default()),
- 'type': field_type
- }) if field.required_default() else schema['fields'].append({
- 'name': name,
- 'type': field_type,
- })
-
- return schema
-
- def __setattr__(self, key, value):
- if key == '_default':
- super(Record, self).__setattr__(key, value)
- elif key == '_required_default':
- super(Record, self).__setattr__(key, value)
- elif key == '_required':
- super(Record, self).__setattr__(key, value)
- else:
- if key not in self._fields:
- raise AttributeError('Cannot set undeclared field ' + key + ' on record')
-
- # Check that type of value matches the field type
- field = self._fields[key]
- value = field.validate_type(key, value)
- super(Record, self).__setattr__(key, value)
-
- def __eq__(self, other):
- for field in self._fields:
- if self.__getattribute__(field) != other.__getattribute__(field):
- return False
- return True
-
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def __str__(self):
- return str(self.__dict__)
-
- def type(self):
- return str(self.__class__.__name__)
-
- def python_type(self):
- return self.__class__
-
- def validate_type(self, name, val):
- if val is None and not self._required:
- return self.default()
-
- if not isinstance(val, self.__class__):
- raise TypeError("Invalid type '%s' for sub-record field '%s'. Expected: %s" % (
- type(val), name, self.__class__))
- return val
-
- def default(self):
- if self._default is not None:
- return self._default
- else:
- return None
-
- def required_default(self):
- return self._required_default
-
-
-class Field(object):
- def __init__(self, default=None, required=False, required_default=False):
- if default is not None:
- default = self.validate_type('default', default)
- self._default = default
- self._required_default = required_default
- self._required = required
-
- @abstractmethod
- def type(self):
- pass
-
- @abstractmethod
- def python_type(self):
- pass
-
- def validate_type(self, name, val):
- if val is None and not self._required:
- return self.default()
-
- if type(val) != self.python_type():
- raise TypeError("Invalid type '%s' for field '%s'. Expected: %s" % (type(val), name, self.python_type()))
- return val
-
- def schema(self):
- # For primitive types, the schema would just be the type itself
- return self.type()
-
- def schema_info(self, defined_names):
- return self.type()
-
- def default(self):
- return self._default
-
- def required_default(self):
- return self._required_default
-
-
-# All types
-
-
-class Null(Field):
- def type(self):
- return 'null'
-
- def python_type(self):
- return type(None)
-
- def validate_type(self, name, val):
- if val is not None:
- raise TypeError('Field ' + name + ' is set to be None')
- return val
-
-
-class Boolean(Field):
- def type(self):
- return 'boolean'
-
- def python_type(self):
- return bool
-
- def default(self):
- if self._default is not None:
- return self._default
- else:
- return False
-
-
-class Integer(Field):
- def type(self):
- return 'int'
-
- def python_type(self):
- return int
-
- def default(self):
- if self._default is not None:
- return self._default
- else:
- return None
-
-
-class Long(Field):
- def type(self):
- return 'long'
-
- def python_type(self):
- return int
-
- def default(self):
- if self._default is not None:
- return self._default
- else:
- return None
-
-
-class Float(Field):
- def type(self):
- return 'float'
-
- def python_type(self):
- return float
-
- def default(self):
- if self._default is not None:
- return self._default
- else:
- return None
-
-
-class Double(Field):
- def type(self):
- return 'double'
-
- def python_type(self):
- return float
-
- def default(self):
- if self._default is not None:
- return self._default
- else:
- return None
-
-
-class Bytes(Field):
- def type(self):
- return 'bytes'
-
- def python_type(self):
- return bytes
-
- def default(self):
- if self._default is not None:
- return self._default
- else:
- return None
-
-
-class String(Field):
- def type(self):
- return 'string'
-
- def python_type(self):
- return str
-
- def validate_type(self, name, val):
- t = type(val)
-
- if val is None and not self._required:
- return self.default()
-
- if not (t is str or t.__name__ == 'unicode'):
- raise TypeError("Invalid type '%s' for field '%s'. Expected a string" % (t, name))
- return val
-
- def default(self):
- if self._default is not None:
- return self._default
- else:
- return None
-
-# Complex types
-
-
-class CustomEnum(Field):
- def __init__(self, enum_type, default=None, required=False, required_default=False):
- if not issubclass(enum_type, Enum):
- raise Exception(enum_type + " is not a valid Enum type")
- self.enum_type = enum_type
- self.values = {}
- for x in enum_type.__members__.values():
- self.values[x.value] = x
- super(CustomEnum, self).__init__(default, required, required_default)
-
- def type(self):
- return 'enum'
-
- def python_type(self):
- return self.enum_type
-
- def validate_type(self, name, val):
- if val is None:
- return None
-
- if type(val) is str:
- # The enum was passed as a string, we need to check it against the possible values
- if val in self.enum_type.__members__:
- return self.enum_type.__members__[val]
- else:
- raise TypeError(
- "Invalid enum value '%s' for field '%s'. Expected: %s" % (val, name, self.enum_type.__members__.keys()))
- elif type(val) is int:
- # The enum was passed as an int, we need to check it against the possible values
- if val in self.values:
- return self.values[val]
- else:
- raise TypeError(
- "Invalid enum value '%s' for field '%s'. Expected: %s" % (val, name, self.values.keys()))
- elif type(val) != self.python_type():
- raise TypeError("Invalid type '%s' for field '%s'. Expected: %s" % (type(val), name, self.python_type()))
- else:
- return val
-
- def schema(self):
- return self.schema_info(set())
-
- def schema_info(self, defined_names):
- if self.enum_type.__name__ in defined_names:
- return self.enum_type.__name__
- defined_names.add(self.enum_type.__name__)
- return {
- 'type': self.type(),
- 'name': self.enum_type.__name__,
- 'symbols': [x.name for x in self.enum_type]
- }
-
- def default(self):
- if self._default is not None:
- return self._default
- else:
- return None
-
-
-class Array(Field):
- def __init__(self, array_type, default=None, required=False, required_default=False):
- _check_record_or_field(array_type)
- self.array_type = array_type
- super(Array, self).__init__(default=default, required=required, required_default=required_default)
-
- def type(self):
- return 'array'
-
- def python_type(self):
- return list
-
- def validate_type(self, name, val):
- if val is None:
- return None
-
- super(Array, self).validate_type(name, val)
-
- for x in val:
- if type(x) != self.array_type.python_type():
- raise TypeError('Array field ' + name + ' items should all be of type ' +
- self.array_type.type())
- return val
-
- def schema(self):
- return self.schema_info(set())
-
- def schema_info(self, defined_names):
- return {
- 'type': self.type(),
- 'items': self.array_type.schema_info(defined_names) if isinstance(self.array_type, (Array, Map, Record))
- else self.array_type.type()
- }
-
- def default(self):
- if self._default is not None:
- return self._default
- else:
- return None
-
-
-class Map(Field):
- def __init__(self, value_type, default=None, required=False, required_default=False):
- _check_record_or_field(value_type)
- self.value_type = value_type
- super(Map, self).__init__(default=default, required=required, required_default=required_default)
-
- def type(self):
- return 'map'
-
- def python_type(self):
- return dict
-
- def validate_type(self, name, val):
- if val is None:
- return None
-
- super(Map, self).validate_type(name, val)
-
- for k, v in val.items():
- if type(k) != str and not is_unicode(k):
- raise TypeError('Map keys for field ' + name + ' should all be strings')
- if type(v) != self.value_type.python_type():
- raise TypeError('Map values for field ' + name + ' should all be of type '
- + self.value_type.python_type())
-
- return val
-
- def schema(self):
- return self.schema_info(set())
-
- def schema_info(self, defined_names):
- return {
- 'type': self.type(),
- 'values': self.value_type.schema_info(defined_names) if isinstance(self.value_type, (Array, Map, Record))
- else self.value_type.type()
- }
-
- def default(self):
- if self._default is not None:
- return self._default
- else:
- return None
-
-
-# Python3 has no `unicode` type, so here we use a tricky way to check if the type of `x` is `unicode` in Python2
-# and also make it work well with Python3.
-def is_unicode(x):
- return 'encode' in dir(x) and type(x.encode()) == str
diff --git a/python/pulsar/schema/schema.py b/python/pulsar/schema/schema.py
deleted file mode 100644
index f062c2e..0000000
--- a/python/pulsar/schema/schema.py
+++ /dev/null
@@ -1,111 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
-from abc import abstractmethod
-import json
-import _pulsar
-import enum
-
-
-class Schema(object):
- def __init__(self, record_cls, schema_type, schema_definition, schema_name):
- self._record_cls = record_cls
- self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name,
- json.dumps(schema_definition, indent=True))
-
- @abstractmethod
- def encode(self, obj):
- pass
-
- @abstractmethod
- def decode(self, data):
- pass
-
- def schema_info(self):
- return self._schema_info
-
- def _validate_object_type(self, obj):
- if not isinstance(obj, self._record_cls):
- raise TypeError('Invalid record obj of type ' + str(type(obj))
- + ' - expected type is ' + str(self._record_cls))
-
-
-class BytesSchema(Schema):
- def __init__(self):
- super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, None, 'BYTES')
-
- def encode(self, data):
- self._validate_object_type(data)
- return data
-
- def decode(self, data):
- return data
-
- def __str__(self):
- return 'BytesSchema'
-
-
-class StringSchema(Schema):
- def __init__(self):
- super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, None, 'STRING')
-
- def encode(self, obj):
- self._validate_object_type(obj)
- return obj.encode('utf-8')
-
- def decode(self, data):
- return data.decode('utf-8')
-
- def __str__(self):
- return 'StringSchema'
-
-
-def remove_reserved_key(data):
- if '_default' in data:
- del data['_default']
- if '_required' in data:
- del data['_required']
- if '_required_default' in data:
- del data['_required_default']
-
-
-class JsonSchema(Schema):
-
- def __init__(self, record_cls):
- super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON,
- record_cls.schema(), 'JSON')
-
- def _get_serialized_value(self, o):
- if isinstance(o, enum.Enum):
- return o.value
- else:
- data = o.__dict__.copy()
- remove_reserved_key(data)
- return data
-
- def encode(self, obj):
- self._validate_object_type(obj)
- # Copy the dict of the object as to not modify the provided object via the reference provided
- data = obj.__dict__.copy()
- remove_reserved_key(data)
- return json.dumps(data, default=self._get_serialized_value, indent=True).encode('utf-8')
-
- def decode(self, data):
- return self._record_cls(**json.loads(data))
diff --git a/python/pulsar/schema/schema_avro.py b/python/pulsar/schema/schema_avro.py
deleted file mode 100644
index 05ceb8e..0000000
--- a/python/pulsar/schema/schema_avro.py
+++ /dev/null
@@ -1,96 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import _pulsar
-import io
-import enum
-
-from . import Record
-from .schema import Schema
-
-try:
- import fastavro
- HAS_AVRO = True
-except ImportError:
- HAS_AVRO = False
-
-if HAS_AVRO:
- class AvroSchema(Schema):
- def __init__(self, record_cls, schema_definition=None):
- if record_cls is None and schema_definition is None:
- raise AssertionError("The param record_cls and schema_definition shouldn't be both None.")
-
- if record_cls is not None:
- self._schema = record_cls.schema()
- else:
- self._schema = schema_definition
- super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO')
-
- def _get_serialized_value(self, x):
- if isinstance(x, enum.Enum):
- return x.name
- elif isinstance(x, Record):
- return self.encode_dict(x.__dict__)
- elif isinstance(x, list):
- arr = []
- for item in x:
- arr.append(self._get_serialized_value(item))
- return arr
- elif isinstance(x, dict):
- return self.encode_dict(x)
- else:
- return x
-
- def encode(self, obj):
- buffer = io.BytesIO()
- m = obj
- if self._record_cls is not None:
- self._validate_object_type(obj)
- m = self.encode_dict(obj.__dict__)
- elif not isinstance(obj, dict):
- raise ValueError('If using the custom schema, the record data should be dict type.')
-
- fastavro.schemaless_writer(buffer, self._schema, m)
- return buffer.getvalue()
-
- def encode_dict(self, d):
- obj = {}
- for k, v in d.items():
- obj[k] = self._get_serialized_value(v)
- return obj
-
- def decode(self, data):
- buffer = io.BytesIO(data)
- d = fastavro.schemaless_reader(buffer, self._schema)
- if self._record_cls is not None:
- return self._record_cls(**d)
- else:
- return d
-
-else:
- class AvroSchema(Schema):
- def __init__(self, _record_cls, _schema_definition):
- raise Exception("Avro library support was not found. Make sure to install Pulsar client " +
- "with Avro support: pip3 install 'pulsar-client[avro]'")
-
- def encode(self, obj):
- pass
-
- def decode(self, data):
- pass
diff --git a/python/pulsar_test.py b/python/pulsar_test.py
deleted file mode 100755
index 375afe4..0000000
--- a/python/pulsar_test.py
+++ /dev/null
@@ -1,1341 +0,0 @@
-#!/usr/bin/env python3
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
-import threading
-import logging
-from unittest import TestCase, main
-import time
-import os
-import pulsar
-import uuid
-from datetime import timedelta
-from pulsar import (
- Client,
- MessageId,
- CompressionType,
- ConsumerType,
- PartitionsRoutingMode,
- AuthenticationBasic,
- AuthenticationTLS,
- Authentication,
- AuthenticationToken,
- InitialPosition,
- CryptoKeyReader,
-)
-from pulsar.schema import JsonSchema, Record, Integer
-
-from _pulsar import ProducerConfiguration, ConsumerConfiguration
-
-from schema_test import *
-
-try:
- # For Python 3.0 and later
- from urllib.request import urlopen, Request
-except ImportError:
- # Fall back to Python 2's urllib2
- from urllib2 import urlopen, Request
-
-TM = 10000 # Do not wait forever in tests
-
-
-def doHttpPost(url, data):
- req = Request(url, data.encode())
- req.add_header("Content-Type", "application/json")
- urlopen(req)
-
-
-def doHttpPut(url, data):
- try:
- req = Request(url, data.encode())
- req.add_header("Content-Type", "application/json")
- req.get_method = lambda: "PUT"
- urlopen(req)
- except Exception as ex:
- # ignore conflicts exception to have test idempotency
- if "409" in str(ex):
- pass
- else:
- raise ex
-
-
-def doHttpGet(url):
- req = Request(url)
- req.add_header("Accept", "application/json")
- return urlopen(req).read()
-
-
-class TestRecord(Record):
- a = Integer()
- b = Integer()
-
-
-class PulsarTest(TestCase):
-
- serviceUrl = "pulsar://localhost:6650"
- adminUrl = "http://localhost:8080"
-
- serviceUrlTls = "pulsar+ssl://localhost:6651"
-
- def test_producer_config(self):
- conf = ProducerConfiguration()
- conf.send_timeout_millis(12)
- self.assertEqual(conf.send_timeout_millis(), 12)
-
- self.assertEqual(conf.compression_type(), CompressionType.NONE)
- conf.compression_type(CompressionType.LZ4)
- self.assertEqual(conf.compression_type(), CompressionType.LZ4)
-
- conf.max_pending_messages(120)
- self.assertEqual(conf.max_pending_messages(), 120)
-
- def test_consumer_config(self):
- conf = ConsumerConfiguration()
- self.assertEqual(conf.consumer_type(), ConsumerType.Exclusive)
- conf.consumer_type(ConsumerType.Shared)
- self.assertEqual(conf.consumer_type(), ConsumerType.Shared)
-
- self.assertEqual(conf.consumer_name(), "")
- conf.consumer_name("my-name")
- self.assertEqual(conf.consumer_name(), "my-name")
-
- self.assertEqual(conf.replicate_subscription_state_enabled(), False)
- conf.replicate_subscription_state_enabled(True)
- self.assertEqual(conf.replicate_subscription_state_enabled(), True)
-
- def test_connect_error(self):
- with self.assertRaises(ValueError):
- Client("fakeServiceUrl")
-
- def test_exception_inheritance(self):
- assert issubclass(pulsar.ConnectError, pulsar.PulsarException)
- assert issubclass(pulsar.PulsarException, Exception)
-
- def test_simple_producer(self):
- client = Client(self.serviceUrl)
- producer = client.create_producer("my-python-topic")
- producer.send(b"hello")
- producer.close()
- client.close()
-
- def test_producer_send_async(self):
- client = Client(self.serviceUrl)
- producer = client.create_producer("my-python-topic")
-
- sent_messages = []
-
- def send_callback(producer, msg):
- sent_messages.append(msg)
-
- producer.send_async(b"hello", send_callback)
- producer.send_async(b"hello", send_callback)
- producer.send_async(b"hello", send_callback)
-
- i = 0
- while len(sent_messages) < 3 and i < 100:
- time.sleep(0.1)
- i += 1
- self.assertEqual(len(sent_messages), 3)
- client.close()
-
- def test_producer_send(self):
- client = Client(self.serviceUrl)
- topic = "test_producer_send"
- producer = client.create_producer(topic)
- consumer = client.subscribe(topic, "sub-name")
- msg_id = producer.send(b"hello")
- print("send to {}".format(msg_id))
- msg = consumer.receive(TM)
- consumer.acknowledge(msg)
- print("receive from {}".format(msg.message_id()))
- self.assertEqual(msg_id, msg.message_id())
- client.close()
-
- def test_producer_is_connected(self):
- client = Client(self.serviceUrl)
- topic = "test_producer_is_connected"
- producer = client.create_producer(topic)
- self.assertTrue(producer.is_connected())
- producer.close()
- self.assertFalse(producer.is_connected())
- client.close()
-
- def test_producer_consumer(self):
- client = Client(self.serviceUrl)
- consumer = client.subscribe("my-python-topic-producer-consumer", "my-sub", consumer_type=ConsumerType.Shared)
- producer = client.create_producer("my-python-topic-producer-consumer")
- producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
-
- consumer.unsubscribe()
- client.close()
-
- def test_redelivery_count(self):
- client = Client(self.serviceUrl)
- consumer = client.subscribe(
- "my-python-topic-redelivery-count",
- "my-sub",
- consumer_type=ConsumerType.Shared,
- negative_ack_redelivery_delay_ms=500,
- )
- producer = client.create_producer("my-python-topic-redelivery-count")
- producer.send(b"hello")
-
- redelivery_count = 0
- for i in range(4):
- msg = consumer.receive(TM)
- print("Received message %s" % msg.data())
- consumer.negative_acknowledge(msg)
- redelivery_count = msg.redelivery_count()
-
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
- self.assertEqual(3, redelivery_count)
- consumer.unsubscribe()
- producer.close()
- client.close()
-
- def test_deliver_at(self):
- client = Client(self.serviceUrl)
- consumer = client.subscribe("my-python-topic-deliver-at", "my-sub", consumer_type=ConsumerType.Shared)
- producer = client.create_producer("my-python-topic-deliver-at")
- # Delay message in 1.1s
- producer.send(b"hello", deliver_at=int(round(time.time() * 1000)) + 1100)
-
- # Message should not be available in the next second
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(1000)
-
- # Message should be published now
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
- consumer.unsubscribe()
- producer.close()
- client.close()
-
- def test_deliver_after(self):
- client = Client(self.serviceUrl)
- consumer = client.subscribe("my-python-topic-deliver-after", "my-sub", consumer_type=ConsumerType.Shared)
- producer = client.create_producer("my-python-topic-deliver-after")
- # Delay message in 1.1s
- producer.send(b"hello", deliver_after=timedelta(milliseconds=1100))
-
- # Message should not be available in the next second
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(1000)
-
- # Message should be published in the next 500ms
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
- consumer.unsubscribe()
- producer.close()
- client.close()
-
- def test_consumer_initial_position(self):
- client = Client(self.serviceUrl)
- producer = client.create_producer("consumer-initial-position")
-
- # Sending 5 messages before consumer creation.
- # These should be received with initial_position set to Earliest but not with Latest.
- for i in range(5):
- producer.send(b"hello-%d" % i)
-
- consumer = client.subscribe(
- "consumer-initial-position",
- "my-sub",
- consumer_type=ConsumerType.Shared,
- initial_position=InitialPosition.Earliest,
- )
-
- # Sending 5 other messages that should be received regardless of the initial_position.
- for i in range(5, 10):
- producer.send(b"hello-%d" % i)
-
- for i in range(10):
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello-%d" % i)
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
-
- consumer.unsubscribe()
- client.close()
-
- def test_consumer_queue_size_is_zero(self):
- client = Client(self.serviceUrl)
- consumer = client.subscribe(
- "my-python-topic-consumer-init-queue-size-is-zero",
- "my-sub",
- consumer_type=ConsumerType.Shared,
- receiver_queue_size=0,
- initial_position=InitialPosition.Earliest,
- )
- producer = client.create_producer("my-python-topic-consumer-init-queue-size-is-zero")
- producer.send(b"hello")
- time.sleep(0.1)
- msg = consumer.receive()
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
-
- consumer.unsubscribe()
- client.close()
-
- def test_message_properties(self):
- client = Client(self.serviceUrl)
- topic = "my-python-test-message-properties"
- consumer = client.subscribe(
- topic=topic, subscription_name="my-subscription", schema=pulsar.schema.StringSchema()
- )
- producer = client.create_producer(topic=topic, schema=pulsar.schema.StringSchema())
- producer.send("hello", properties={"a": "1", "b": "2"})
-
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.value(), "hello")
- self.assertEqual(msg.properties(), {"a": "1", "b": "2"})
-
- consumer.unsubscribe()
- client.close()
-
- def test_tls_auth(self):
- certs_dir = "/pulsar/pulsar-broker/src/test/resources/authentication/tls/"
- if not os.path.exists(certs_dir):
- certs_dir = "../../pulsar-broker/src/test/resources/authentication/tls/"
- client = Client(
- self.serviceUrlTls,
- tls_trust_certs_file_path=certs_dir + "cacert.pem",
- tls_allow_insecure_connection=False,
- authentication=AuthenticationTLS(certs_dir + "client-cert.pem", certs_dir + "client-key.pem"),
- )
-
- topic = "my-python-topic-tls-auth-" + str(time.time())
- consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
- producer = client.create_producer(topic)
- producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
-
- client.close()
-
- def test_tls_auth2(self):
- certs_dir = "/pulsar/pulsar-broker/src/test/resources/authentication/tls/"
- if not os.path.exists(certs_dir):
- certs_dir = "../../pulsar-broker/src/test/resources/authentication/tls/"
- authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationTls"
- authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (certs_dir, certs_dir)
-
- client = Client(
- self.serviceUrlTls,
- tls_trust_certs_file_path=certs_dir + "cacert.pem",
- tls_allow_insecure_connection=False,
- authentication=Authentication(authPlugin, authParams),
- )
-
- topic = "my-python-topic-tls-auth-2-" + str(time.time())
- consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
- producer = client.create_producer(topic)
- producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
-
- client.close()
-
- def test_encryption(self):
- publicKeyPath = "/pulsar//pulsar-broker/src/test/resources/certificate/public-key.client-rsa.pem"
- privateKeyPath = "/pulsar/pulsar-broker/src/test/resources/certificate/private-key.client-rsa.pem"
- crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
- client = Client(self.serviceUrl)
- topic = "my-python-test-end-to-end-encryption"
- consumer = client.subscribe(
- topic=topic, subscription_name="my-subscription", crypto_key_reader=crypto_key_reader
- )
- producer = client.create_producer(
- topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader
- )
- reader = client.create_reader(
- topic=topic, start_message_id=MessageId.earliest, crypto_key_reader=crypto_key_reader
- )
- producer.send(b"hello")
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.value(), b"hello")
- consumer.unsubscribe()
-
- msg = reader.read_next(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
-
- with self.assertRaises(pulsar.Timeout):
- reader.read_next(100)
-
- reader.close()
-
- client.close()
-
- def test_tls_auth3(self):
- certs_dir = "/pulsar/pulsar-broker/src/test/resources/authentication/tls/"
- if not os.path.exists(certs_dir):
- certs_dir = "../../pulsar-broker/src/test/resources/authentication/tls/"
- authPlugin = "tls"
- authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (certs_dir, certs_dir)
-
- client = Client(
- self.serviceUrlTls,
- tls_trust_certs_file_path=certs_dir + "cacert.pem",
- tls_allow_insecure_connection=False,
- authentication=Authentication(authPlugin, authParams),
- )
-
- topic = "my-python-topic-tls-auth-3-" + str(time.time())
- consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
- producer = client.create_producer(topic)
- producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
-
- client.close()
-
- def test_auth_junk_params(self):
- certs_dir = "/pulsar/pulsar-broker/src/test/resources/authentication/tls/"
- if not os.path.exists(certs_dir):
- certs_dir = "../../pulsar-broker/src/test/resources/authentication/tls/"
- authPlugin = "someoldjunk.so"
- authParams = "blah"
- client = Client(
- self.serviceUrlTls,
- tls_trust_certs_file_path=certs_dir + "cacert.pem",
- tls_allow_insecure_connection=False,
- authentication=Authentication(authPlugin, authParams),
- )
-
- with self.assertRaises(pulsar.ConnectError):
- client.subscribe("my-python-topic-auth-junk-params", "my-sub", consumer_type=ConsumerType.Shared)
-
- def test_message_listener(self):
- client = Client(self.serviceUrl)
-
- received_messages = []
-
- def listener(consumer, msg):
- print("Got message: %s" % msg)
- received_messages.append(msg)
- consumer.acknowledge(msg)
-
- client.subscribe(
- "my-python-topic-listener", "my-sub", consumer_type=ConsumerType.Exclusive, message_listener=listener
- )
- producer = client.create_producer("my-python-topic-listener")
- producer.send(b"hello-1")
- producer.send(b"hello-2")
- producer.send(b"hello-3")
-
- time.sleep(0.1)
- self.assertEqual(len(received_messages), 3)
- self.assertEqual(received_messages[0].data(), b"hello-1")
- self.assertEqual(received_messages[1].data(), b"hello-2")
- self.assertEqual(received_messages[2].data(), b"hello-3")
- client.close()
-
- def test_consumer_is_connected(self):
- client = Client(self.serviceUrl)
- topic = "test_consumer_is_connected"
- sub = "sub"
- consumer = client.subscribe(topic, sub)
- self.assertTrue(consumer.is_connected())
- consumer.close()
- self.assertFalse(consumer.is_connected())
- client.close()
-
- def test_reader_simple(self):
- client = Client(self.serviceUrl)
- reader = client.create_reader("my-python-topic-reader-simple", MessageId.earliest)
-
- producer = client.create_producer("my-python-topic-reader-simple")
- producer.send(b"hello")
-
- msg = reader.read_next(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
-
- with self.assertRaises(pulsar.Timeout):
- reader.read_next(100)
-
- reader.close()
- client.close()
-
- def test_reader_on_last_message(self):
- client = Client(self.serviceUrl)
- producer = client.create_producer("my-python-topic-reader-on-last-message")
-
- for i in range(10):
- producer.send(b"hello-%d" % i)
-
- reader = client.create_reader("my-python-topic-reader-on-last-message", MessageId.latest)
-
- for i in range(10, 20):
- producer.send(b"hello-%d" % i)
-
- for i in range(10, 20):
- msg = reader.read_next(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello-%d" % i)
-
- reader.close()
- client.close()
-
- def test_reader_on_specific_message(self):
- num_of_msgs = 10
- client = Client(self.serviceUrl)
- producer = client.create_producer("my-python-topic-reader-on-specific-message")
-
- for i in range(num_of_msgs):
- producer.send(b"hello-%d" % i)
-
- reader1 = client.create_reader("my-python-topic-reader-on-specific-message", MessageId.earliest)
-
- for i in range(num_of_msgs // 2):
- msg = reader1.read_next(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello-%d" % i)
- last_msg_id = msg.message_id()
- last_msg_idx = i
-
- reader2 = client.create_reader("my-python-topic-reader-on-specific-message", last_msg_id)
-
- # The reset would be effectively done on the next position relative to reset.
- # When available, we should test this behaviour with `startMessageIdInclusive` opt.
- from_msg_idx = last_msg_idx
- for i in range(from_msg_idx + 1, num_of_msgs):
- msg = reader2.read_next(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello-%d" % i)
-
- reader1.close()
- reader2.close()
- client.close()
-
- def test_reader_on_specific_message_with_batches(self):
- client = Client(self.serviceUrl)
- producer = client.create_producer(
- "my-python-topic-reader-on-specific-message-with-batches",
- batching_enabled=True,
- batching_max_publish_delay_ms=1000,
- )
-
- for i in range(10):
- producer.send_async(b"hello-%d" % i, None)
-
- # Send one sync message to make sure everything was published
- producer.send(b"hello-10")
-
- reader1 = client.create_reader("my-python-topic-reader-on-specific-message-with-batches", MessageId.earliest)
-
- for i in range(5):
- msg = reader1.read_next(TM)
- last_msg_id = msg.message_id()
-
- reader2 = client.create_reader("my-python-topic-reader-on-specific-message-with-batches", last_msg_id)
-
- for i in range(5, 11):
- msg = reader2.read_next(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello-%d" % i)
-
- reader1.close()
- reader2.close()
- client.close()
-
- def test_reader_is_connected(self):
- client = Client(self.serviceUrl)
- topic = "test_reader_is_connected"
- reader = client.create_reader(topic, MessageId.earliest)
- self.assertTrue(reader.is_connected())
- reader.close()
- self.assertFalse(reader.is_connected())
- client.close()
-
- def test_producer_sequence_after_reconnection(self):
- # Enable deduplication on namespace
- doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")
- client = Client(self.serviceUrl)
-
- topic = "my-python-test-producer-sequence-after-reconnection-" + str(time.time())
-
- producer = client.create_producer(topic, producer_name="my-producer-name")
- self.assertEqual(producer.last_sequence_id(), -1)
-
- for i in range(10):
- producer.send(b"hello-%d" % i)
- self.assertEqual(producer.last_sequence_id(), i)
-
- producer.close()
-
- producer = client.create_producer(topic, producer_name="my-producer-name")
- self.assertEqual(producer.last_sequence_id(), 9)
-
- for i in range(10, 20):
- producer.send(b"hello-%d" % i)
- self.assertEqual(producer.last_sequence_id(), i)
-
- client.close()
-
- doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "false")
-
- def test_producer_deduplication(self):
- # Enable deduplication on namespace
- doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")
- client = Client(self.serviceUrl)
-
- topic = "my-python-test-producer-deduplication-" + str(time.time())
-
- producer = client.create_producer(topic, producer_name="my-producer-name")
- self.assertEqual(producer.last_sequence_id(), -1)
-
- consumer = client.subscribe(topic, "my-sub")
-
- producer.send(b"hello-0", sequence_id=0)
- producer.send(b"hello-1", sequence_id=1)
- producer.send(b"hello-2", sequence_id=2)
- self.assertEqual(producer.last_sequence_id(), 2)
-
- # Repeat the messages and verify they're not received by consumer
- producer.send(b"hello-1", sequence_id=1)
- producer.send(b"hello-2", sequence_id=2)
- self.assertEqual(producer.last_sequence_id(), 2)
-
- for i in range(3):
- msg = consumer.receive(TM)
- self.assertEqual(msg.data(), b"hello-%d" % i)
- consumer.acknowledge(msg)
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
-
- producer.close()
-
- producer = client.create_producer(topic, producer_name="my-producer-name")
- self.assertEqual(producer.last_sequence_id(), 2)
-
- # Repeat the messages and verify they're not received by consumer
- producer.send(b"hello-1", sequence_id=1)
- producer.send(b"hello-2", sequence_id=2)
- self.assertEqual(producer.last_sequence_id(), 2)
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
-
- client.close()
-
- doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "false")
-
- def test_producer_routing_mode(self):
- client = Client(self.serviceUrl)
- producer = client.create_producer(
- "my-python-test-producer", message_routing_mode=PartitionsRoutingMode.UseSinglePartition
- )
- producer.send(b"test")
- client.close()
-
- def test_message_argument_errors(self):
- client = Client(self.serviceUrl)
- topic = "my-python-test-producer"
- producer = client.create_producer(topic)
-
- content = "test".encode("utf-8")
-
- self._check_type_error(lambda: producer.send(5))
- self._check_value_error(lambda: producer.send(content, properties="test"))
- self._check_value_error(lambda: producer.send(content, partition_key=5))
- self._check_value_error(lambda: producer.send(content, sequence_id="test"))
- self._check_value_error(lambda: producer.send(content, replication_clusters=5))
- self._check_value_error(lambda: producer.send(content, disable_replication="test"))
- self._check_value_error(lambda: producer.send(content, event_timestamp="test"))
- self._check_value_error(lambda: producer.send(content, deliver_at="test"))
- self._check_value_error(lambda: producer.send(content, deliver_after="test"))
- client.close()
-
- def test_client_argument_errors(self):
- self._check_value_error(lambda: Client(None))
- self._check_value_error(lambda: Client(self.serviceUrl, authentication="test"))
- self._check_value_error(lambda: Client(self.serviceUrl, operation_timeout_seconds="test"))
- self._check_value_error(lambda: Client(self.serviceUrl, io_threads="test"))
- self._check_value_error(lambda: Client(self.serviceUrl, message_listener_threads="test"))
- self._check_value_error(lambda: Client(self.serviceUrl, concurrent_lookup_requests="test"))
- self._check_value_error(lambda: Client(self.serviceUrl, log_conf_file_path=5))
- self._check_value_error(lambda: Client(self.serviceUrl, use_tls="test"))
- self._check_value_error(lambda: Client(self.serviceUrl, tls_trust_certs_file_path=5))
- self._check_value_error(lambda: Client(self.serviceUrl, tls_allow_insecure_connection="test"))
-
- def test_producer_argument_errors(self):
- client = Client(self.serviceUrl)
-
- self._check_value_error(lambda: client.create_producer(None))
-
- topic = "my-python-test-producer"
-
- self._check_value_error(lambda: client.create_producer(topic, producer_name=5))
- self._check_value_error(lambda: client.create_producer(topic, initial_sequence_id="test"))
- self._check_value_error(lambda: client.create_producer(topic, send_timeout_millis="test"))
- self._check_value_error(lambda: client.create_producer(topic, compression_type=None))
- self._check_value_error(lambda: client.create_producer(topic, max_pending_messages="test"))
- self._check_value_error(lambda: client.create_producer(topic, block_if_queue_full="test"))
- self._check_value_error(lambda: client.create_producer(topic, batching_enabled="test"))
- self._check_value_error(lambda: client.create_producer(topic, batching_enabled="test"))
- self._check_value_error(lambda: client.create_producer(topic, batching_max_allowed_size_in_bytes="test"))
- self._check_value_error(lambda: client.create_producer(topic, batching_max_publish_delay_ms="test"))
- client.close()
-
- def test_consumer_argument_errors(self):
- client = Client(self.serviceUrl)
-
- topic = "my-python-test-producer"
- sub_name = "my-sub-name"
-
- self._check_value_error(lambda: client.subscribe(None, sub_name))
- self._check_value_error(lambda: client.subscribe(topic, None))
- self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_type=None))
- self._check_value_error(lambda: client.subscribe(topic, sub_name, receiver_queue_size="test"))
- self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_name=5))
- self._check_value_error(lambda: client.subscribe(topic, sub_name, unacked_messages_timeout_ms="test"))
- self._check_value_error(lambda: client.subscribe(topic, sub_name, broker_consumer_stats_cache_time_ms="test"))
- client.close()
-
- def test_reader_argument_errors(self):
- client = Client(self.serviceUrl)
- topic = "my-python-test-producer"
-
- # This should not raise exception
- client.create_reader(topic, MessageId.earliest)
-
- self._check_value_error(lambda: client.create_reader(None, MessageId.earliest))
- self._check_value_error(lambda: client.create_reader(topic, None))
- self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, receiver_queue_size="test"))
- self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, reader_name=5))
- client.close()
-
- def test_get_last_message_id(self):
- client = Client(self.serviceUrl)
- consumer = client.subscribe(
- "persistent://public/default/topic_name_test", "topic_name_test_sub", consumer_type=ConsumerType.Shared
- )
- producer = client.create_producer("persistent://public/default/topic_name_test")
- msg_id = producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertEqual(msg.message_id(), msg_id)
- client.close()
-
- def test_publish_compact_and_consume(self):
- client = Client(self.serviceUrl)
- topic = "compaction_%s" % (uuid.uuid4())
- producer = client.create_producer(topic, producer_name="my-producer-name", batching_enabled=False)
- self.assertEqual(producer.last_sequence_id(), -1)
- consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True)
- consumer.close()
- consumer2 = client.subscribe(topic, "my-sub2", is_read_compacted=False)
-
- # producer create 2 messages with same key.
- producer.send(b"hello-0", partition_key="key0")
- producer.send(b"hello-1", partition_key="key0")
- producer.close()
-
- # issue compact command, and wait success
- url = "%s/admin/v2/persistent/public/default/%s/compaction" % (self.adminUrl, topic)
- doHttpPut(url, "")
- while True:
- s = doHttpGet(url).decode("utf-8")
- if "RUNNING" in s:
- print(s)
- print("Compact still running")
- time.sleep(0.2)
- else:
- print(s)
- print("Compact Complete now")
- self.assertTrue("SUCCESS" in s)
- break
-
- # after compaction completes the compacted ledger is recorded
- # as a property of a cursor. As persisting the cursor is async
- # and we don't wait for the acknowledgement of the acknowledgement,
- # there may be a race if we try to read the compacted ledger immediately.
- # therefore wait a second to allow the compacted ledger to be updated on
- # the broker.
- time.sleep(1.0)
-
- # after compact, consumer with `is_read_compacted=True`, expected read only the second message for same key.
- consumer1 = client.subscribe(topic, "my-sub1", is_read_compacted=True)
- msg0 = consumer1.receive(TM)
- self.assertEqual(msg0.data(), b"hello-1")
- consumer1.acknowledge(msg0)
- consumer1.close()
-
- # ditto for reader
- reader1 = client.create_reader(topic, MessageId.earliest, is_read_compacted=True)
- msg0 = reader1.read_next(TM)
- self.assertEqual(msg0.data(), b"hello-1")
- reader1.close()
-
- # after compact, consumer with `is_read_compacted=False`, expected read 2 messages for same key.
- msg0 = consumer2.receive(TM)
- self.assertEqual(msg0.data(), b"hello-0")
- consumer2.acknowledge(msg0)
- msg1 = consumer2.receive(TM)
- self.assertEqual(msg1.data(), b"hello-1")
- consumer2.acknowledge(msg1)
- consumer2.close()
-
- # ditto for reader
- reader2 = client.create_reader(topic, MessageId.earliest, is_read_compacted=False)
- msg0 = reader2.read_next(TM)
- self.assertEqual(msg0.data(), b"hello-0")
- msg1 = reader2.read_next(TM)
- self.assertEqual(msg1.data(), b"hello-1")
- reader2.close()
- client.close()
-
- def test_reader_has_message_available(self):
- # create client, producer, reader
- client = Client(self.serviceUrl)
- producer = client.create_producer("my-python-topic-reader-has-message-available")
- reader = client.create_reader("my-python-topic-reader-has-message-available", MessageId.latest)
-
- # before produce data, expected not has message available
- self.assertFalse(reader.has_message_available())
-
- for i in range(10):
- producer.send(b"hello-%d" % i)
-
- # produced data, expected has message available
- self.assertTrue(reader.has_message_available())
-
- for i in range(10):
- msg = reader.read_next(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello-%d" % i)
-
- # consumed all data, expected not has message available
- self.assertFalse(reader.has_message_available())
-
- for i in range(10, 20):
- producer.send(b"hello-%d" % i)
-
- # produced data again, expected has message available
- self.assertTrue(reader.has_message_available())
- reader.close()
- producer.close()
- client.close()
-
- def test_seek(self):
- client = Client(self.serviceUrl)
- topic = "my-python-topic-seek-" + str(time.time())
- consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
- producer = client.create_producer(topic)
-
- for i in range(100):
- if i > 0:
- time.sleep(0.02)
- producer.send(b"hello-%d" % i)
-
- ids = []
- timestamps = []
- for i in range(100):
- msg = consumer.receive(TM)
- self.assertEqual(msg.data(), b"hello-%d" % i)
- ids.append(msg.message_id())
- timestamps.append(msg.publish_timestamp())
- consumer.acknowledge(msg)
-
- # seek, and after reconnect, expected receive first message.
- consumer.seek(MessageId.earliest)
- time.sleep(0.5)
- msg = consumer.receive(TM)
- self.assertEqual(msg.data(), b"hello-0")
-
- # seek on messageId
- consumer.seek(ids[50])
- time.sleep(0.5)
- msg = consumer.receive(TM)
- self.assertEqual(msg.data(), b"hello-51")
-
- # ditto, but seek on timestamp
- consumer.seek(timestamps[42])
- time.sleep(0.5)
- msg = consumer.receive(TM)
- self.assertEqual(msg.data(), b"hello-42")
-
- # repeat with reader
- reader = client.create_reader(topic, MessageId.latest)
- with self.assertRaises(pulsar.Timeout):
- reader.read_next(100)
-
- # earliest
- reader.seek(MessageId.earliest)
- time.sleep(0.5)
- msg = reader.read_next(TM)
- self.assertEqual(msg.data(), b"hello-0")
- msg = reader.read_next(TM)
- self.assertEqual(msg.data(), b"hello-1")
-
- # seek on messageId
- reader.seek(ids[33])
- time.sleep(0.5)
- msg = reader.read_next(TM)
- self.assertEqual(msg.data(), b"hello-34")
- msg = reader.read_next(TM)
- self.assertEqual(msg.data(), b"hello-35")
-
- # seek on timestamp
- reader.seek(timestamps[79])
- time.sleep(0.5)
- msg = reader.read_next(TM)
- self.assertEqual(msg.data(), b"hello-79")
- msg = reader.read_next(TM)
- self.assertEqual(msg.data(), b"hello-80")
-
- reader.close()
- client.close()
-
- def test_v2_topics(self):
- self._v2_topics(self.serviceUrl)
-
- def test_v2_topics_http(self):
- self._v2_topics(self.adminUrl)
-
- def _v2_topics(self, url):
- client = Client(url)
- consumer = client.subscribe("my-v2-topic-producer-consumer", "my-sub", consumer_type=ConsumerType.Shared)
- producer = client.create_producer("my-v2-topic-producer-consumer")
- producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
- consumer.acknowledge(msg)
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
-
- client.close()
-
- def test_topics_consumer(self):
- client = Client(self.serviceUrl)
- topic1 = "persistent://public/default/my-python-topics-consumer-1"
- topic2 = "persistent://public/default/my-python-topics-consumer-2"
- topic3 = "persistent://public/default-2/my-python-topics-consumer-3" # topic from different namespace
- topics = [topic1, topic2, topic3]
-
- url1 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-topics-consumer-1/partitions"
- url2 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-topics-consumer-2/partitions"
- url3 = self.adminUrl + "/admin/v2/persistent/public/default-2/my-python-topics-consumer-3/partitions"
-
- doHttpPut(url1, "2")
- doHttpPut(url2, "3")
- doHttpPut(url3, "4")
-
- producer1 = client.create_producer(topic1)
- producer2 = client.create_producer(topic2)
- producer3 = client.create_producer(topic3)
-
- consumer = client.subscribe(
- topics, "my-topics-consumer-sub", consumer_type=ConsumerType.Shared, receiver_queue_size=10
- )
-
- for i in range(100):
- producer1.send(b"hello-1-%d" % i)
-
- for i in range(100):
- producer2.send(b"hello-2-%d" % i)
-
- for i in range(100):
- producer3.send(b"hello-3-%d" % i)
-
- for i in range(300):
- msg = consumer.receive(TM)
- consumer.acknowledge(msg)
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
- client.close()
-
- def test_topics_pattern_consumer(self):
- import re
-
- client = Client(self.serviceUrl)
-
- topics_pattern = "persistent://public/default/my-python-pattern-consumer.*"
-
- topic1 = "persistent://public/default/my-python-pattern-consumer-1"
- topic2 = "persistent://public/default/my-python-pattern-consumer-2"
- topic3 = "persistent://public/default/my-python-pattern-consumer-3"
-
- url1 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-1/partitions"
- url2 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-2/partitions"
- url3 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-3/partitions"
-
- doHttpPut(url1, "2")
- doHttpPut(url2, "3")
- doHttpPut(url3, "4")
-
- producer1 = client.create_producer(topic1)
- producer2 = client.create_producer(topic2)
- producer3 = client.create_producer(topic3)
-
- consumer = client.subscribe(
- re.compile(topics_pattern),
- "my-pattern-consumer-sub",
- consumer_type=ConsumerType.Shared,
- receiver_queue_size=10,
- pattern_auto_discovery_period=1,
- )
-
- # wait enough time to trigger auto discovery
- time.sleep(2)
-
- for i in range(100):
- producer1.send(b"hello-1-%d" % i)
-
- for i in range(100):
- producer2.send(b"hello-2-%d" % i)
-
- for i in range(100):
- producer3.send(b"hello-3-%d" % i)
-
- for i in range(300):
- msg = consumer.receive(TM)
- consumer.acknowledge(msg)
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
- client.close()
-
- def test_message_id(self):
- s = MessageId.earliest.serialize()
- self.assertEqual(MessageId.deserialize(s), MessageId.earliest)
-
- s = MessageId.latest.serialize()
- self.assertEqual(MessageId.deserialize(s), MessageId.latest)
-
- def test_get_topics_partitions(self):
- client = Client(self.serviceUrl)
- topic_partitioned = "persistent://public/default/test_get_topics_partitions"
- topic_non_partitioned = "persistent://public/default/test_get_topics_not-partitioned"
-
- url1 = self.adminUrl + "/admin/v2/persistent/public/default/test_get_topics_partitions/partitions"
- doHttpPut(url1, "3")
-
- self.assertEqual(
- client.get_topic_partitions(topic_partitioned),
- [
- "persistent://public/default/test_get_topics_partitions-partition-0",
- "persistent://public/default/test_get_topics_partitions-partition-1",
- "persistent://public/default/test_get_topics_partitions-partition-2",
- ],
- )
-
- self.assertEqual(client.get_topic_partitions(topic_non_partitioned), [topic_non_partitioned])
- client.close()
-
- def test_token_auth(self):
- with open("/tmp/pulsar-test-data/tokens/token.txt") as tf:
- token = tf.read().strip()
-
- # Use adminUrl to test both HTTP request and binary protocol
- client = Client(self.adminUrl, authentication=AuthenticationToken(token))
-
- consumer = client.subscribe(
- "persistent://private/auth/my-python-topic-token-auth", "my-sub", consumer_type=ConsumerType.Shared
- )
- producer = client.create_producer("persistent://private/auth/my-python-topic-token-auth")
- producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
- client.close()
-
- def test_token_auth_supplier(self):
- def read_token():
- with open("/tmp/pulsar-test-data/tokens/token.txt") as tf:
- return tf.read().strip()
-
- client = Client(self.serviceUrl, authentication=AuthenticationToken(read_token))
- consumer = client.subscribe(
- "persistent://private/auth/my-python-topic-token-auth", "my-sub", consumer_type=ConsumerType.Shared
- )
- producer = client.create_producer("persistent://private/auth/my-python-topic-token-auth")
- producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
- client.close()
-
- def test_producer_consumer_zstd(self):
- client = Client(self.serviceUrl)
- consumer = client.subscribe(
- "my-python-topic-producer-consumer-zstd", "my-sub", consumer_type=ConsumerType.Shared
- )
- producer = client.create_producer(
- "my-python-topic-producer-consumer-zstd", compression_type=CompressionType.ZSTD
- )
- producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
-
- consumer.unsubscribe()
- client.close()
-
- def test_client_reference_deleted(self):
- def get_producer():
- cl = Client(self.serviceUrl)
- return cl.create_producer(topic="foobar")
-
- producer = get_producer()
- producer.send(b"test_payload")
-
- #####
-
- def test_get_topic_name(self):
- client = Client(self.serviceUrl)
- consumer = client.subscribe(
- "persistent://public/default/topic_name_test", "topic_name_test_sub", consumer_type=ConsumerType.Shared
- )
- producer = client.create_producer("persistent://public/default/topic_name_test")
- producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertEqual(msg.topic_name(), "persistent://public/default/topic_name_test")
- client.close()
-
- def test_get_partitioned_topic_name(self):
- client = Client(self.serviceUrl)
- url1 = self.adminUrl + "/admin/v2/persistent/public/default/partitioned_topic_name_test/partitions"
- doHttpPut(url1, "3")
-
- partitions = [
- "persistent://public/default/partitioned_topic_name_test-partition-0",
- "persistent://public/default/partitioned_topic_name_test-partition-1",
- "persistent://public/default/partitioned_topic_name_test-partition-2",
- ]
- self.assertEqual(
- client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test"), partitions
- )
-
- consumer = client.subscribe(
- "persistent://public/default/partitioned_topic_name_test",
- "partitioned_topic_name_test_sub",
- consumer_type=ConsumerType.Shared,
- )
- producer = client.create_producer("persistent://public/default/partitioned_topic_name_test")
- producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertTrue(msg.topic_name() in partitions)
- client.close()
-
- def test_shutdown_client(self):
- client = Client(self.serviceUrl)
- producer = client.create_producer("persistent://public/default/partitioned_topic_name_test")
- producer.send(b"hello")
- client.shutdown()
-
- try:
- producer.send(b"hello")
- self.assertTrue(False)
- except pulsar.PulsarException:
- # Expected
- pass
-
- def test_listener_name_client(self):
- client = Client(self.serviceUrl, listener_name='test')
- try:
- producer = client.create_producer("persistent://public/default/partitioned_topic_name_test")
- self.fail()
- except pulsar.PulsarException:
- # Expected
- pass
- client.close()
-
- def test_negative_acks(self):
- client = Client(self.serviceUrl)
- consumer = client.subscribe(
- "test_negative_acks", "test", schema=pulsar.schema.StringSchema(), negative_ack_redelivery_delay_ms=1000
- )
- producer = client.create_producer("test_negative_acks", schema=pulsar.schema.StringSchema())
- for i in range(10):
- producer.send_async("hello-%d" % i, callback=None)
-
- producer.flush()
-
- for i in range(10):
- msg = consumer.receive()
- self.assertEqual(msg.value(), "hello-%d" % i)
- consumer.negative_acknowledge(msg)
-
- for i in range(10):
- msg = consumer.receive()
- self.assertEqual(msg.value(), "hello-%d" % i)
- consumer.acknowledge(msg)
-
- with self.assertRaises(pulsar.Timeout):
- consumer.receive(100)
- client.close()
-
- def test_connect_timeout(self):
- client = pulsar.Client(
- service_url="pulsar://192.0.2.1:1234",
- connection_timeout_ms=1000, # 1 second
- )
- t1 = time.time()
- try:
- producer = client.create_producer("test_connect_timeout")
- self.fail("create_producer should not succeed")
- except pulsar.ConnectError as expected:
- print("expected error: {} when create producer".format(expected))
- t2 = time.time()
- self.assertGreater(t2 - t1, 1.0)
- self.assertLess(t2 - t1, 1.5) # 1.5 seconds is long enough
- client.close()
-
- def test_json_schema_encode(self):
- schema = JsonSchema(TestRecord)
- record = TestRecord(a=1, b=2)
- # Ensure that encoding a JsonSchema more than once works and produces the same result
- first_encode = schema.encode(record)
- second_encode = schema.encode(record)
- self.assertEqual(first_encode, second_encode)
-
- def test_logger_thread_leaks(self):
- def _do_connect(close):
- logger = logging.getLogger(str(threading.current_thread().ident))
- logger.setLevel(logging.INFO)
- client = pulsar.Client(
- service_url="pulsar://localhost:6650",
- io_threads=4,
- message_listener_threads=4,
- operation_timeout_seconds=1,
- log_conf_file_path=None,
- authentication=None,
- logger=logger,
- )
- client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test")
- if close:
- client.close()
-
- for should_close in (True, False):
- self.assertEqual(threading.active_count(), 1, "Explicit close: {}; baseline is 1 thread".format(should_close))
- _do_connect(should_close)
- self.assertEqual(threading.active_count(), 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close))
- threads = []
- for _ in range(10):
- threads.append(threading.Thread(target=_do_connect, args=(should_close)))
- threads[-1].start()
- for thread in threads:
- thread.join()
- assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close)
-
- def test_chunking(self):
- client = Client(self.serviceUrl)
- data_size = 10 * 1024 * 1024
- producer = client.create_producer(
- 'test_chunking',
- chunking_enabled=True
- )
-
- consumer = client.subscribe('test_chunking', "my-subscription",
- max_pending_chunked_message=10,
- auto_ack_oldest_chunked_message_on_queue_full=False
- )
-
- producer.send(bytes(bytearray(os.urandom(data_size))), None)
- msg = consumer.receive(TM)
- self.assertEqual(len(msg.data()), data_size)
-
- def test_invalid_chunking_config(self):
- client = Client(self.serviceUrl)
-
- self._check_value_error(lambda: client.create_producer(
- 'test_invalid_chunking_config',
- chunking_enabled=True,
- batching_enabled=True
- ))
-
- def _check_value_error(self, fun):
- with self.assertRaises(ValueError):
- fun()
-
- def _check_type_error(self, fun):
- with self.assertRaises(TypeError):
- fun()
-
- def test_basic_auth(self):
- username = "admin"
- password = "123456"
- client = Client(self.adminUrl, authentication=AuthenticationBasic(username, password))
-
- topic = "persistent://private/auth/my-python-topic-basic-auth"
- consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
- producer = client.create_producer(topic)
- producer.send(b"hello")
-
- msg = consumer.receive(TM)
- self.assertTrue(msg)
- self.assertEqual(msg.data(), b"hello")
- client.close()
-
- def test_invalid_basic_auth(self):
- username = "invalid"
- password = "123456"
- client = Client(self.adminUrl, authentication=AuthenticationBasic(username, password))
- topic = "persistent://private/auth/my-python-topic-invalid-basic-auth"
- with self.assertRaises(pulsar.ConnectError):
- client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
-
-if __name__ == "__main__":
- main()
diff --git a/python/schema_test.py b/python/schema_test.py
deleted file mode 100755
index 47acc30..0000000
--- a/python/schema_test.py
+++ /dev/null
@@ -1,1291 +0,0 @@
-#!/usr/bin/env python3
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from unittest import TestCase, main
-
-import fastavro
-import pulsar
-from pulsar.schema import *
-from enum import Enum
-import json
-from fastavro.schema import load_schema
-
-
-class SchemaTest(TestCase):
-
- serviceUrl = 'pulsar://localhost:6650'
-
- def test_simple(self):
- class Color(Enum):
- red = 1
- green = 2
- blue = 3
-
- class Example(Record):
- _sorted_fields = True
- a = String()
- b = Integer()
- c = Array(String())
- d = Color
- e = Boolean()
- f = Float()
- g = Double()
- h = Bytes()
- i = Map(String())
- j = CustomEnum(Color)
-
- fastavro.parse_schema(Example.schema())
- self.assertEqual(Example.schema(), {
- "name": "Example",
- "type": "record",
- "fields": [
- {"name": "a", "type": ["null", "string"]},
- {"name": "b", "type": ["null", "int"]},
- {"name": "c", "type": ["null", {
- "type": "array",
- "items": "string"}]
- },
- {"name": "d",
- "type": ["null", {
- "type": "enum",
- "name": "Color",
- "symbols": ["red", "green", "blue"]}]
- },
- {"name": "e", "type": ["null", "boolean"]},
- {"name": "f", "type": ["null", "float"]},
- {"name": "g", "type": ["null", "double"]},
- {"name": "h", "type": ["null", "bytes"]},
- {"name": "i", "type": ["null", {
- "type": "map",
- "values": "string"}]
- },
- {"name": "j", "type": ["null", "Color"]}
- ]
- })
-
- def test_complex(self):
- class Color(Enum):
- red = 1
- green = 2
- blue = 3
-
- class MySubRecord(Record):
- _sorted_fields = True
- x = Integer()
- y = Long()
- z = String()
- color = CustomEnum(Color)
-
- class Example(Record):
- _sorted_fields = True
- a = String()
- sub = MySubRecord # Test with class
- sub2 = MySubRecord() # Test with instance
-
- fastavro.parse_schema(Example.schema())
- self.assertEqual(Example.schema(), {
- "name": "Example",
- "type": "record",
- "fields": [
- {"name": "a", "type": ["null", "string"]},
- {"name": "sub",
- "type": ["null", {
- "name": "MySubRecord",
- "type": "record",
- "fields": [
- {'name': 'color', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols':
- ['red', 'green', 'blue']}]},
- {"name": "x", "type": ["null", "int"]},
- {"name": "y", "type": ["null", "long"]},
- {"name": "z", "type": ["null", "string"]}]
- }]
- },
- {"name": "sub2",
- "type": ["null", 'MySubRecord']
- }
- ]
- })
-
- def test_complex_with_required_fields(self):
- class MySubRecord(Record):
- x = Integer(required=True)
- y = Long(required=True)
- z = String()
-
- class Example(Record):
- a = String(required=True)
- sub = MySubRecord(required=True)
-
- self.assertEqual(Example.schema(), {
- "name": "Example",
- "type": "record",
- "fields": [
- {"name": "a", "type": "string"},
- {"name": "sub",
- "type": {
- "name": "MySubRecord",
- "type": "record",
- "fields": [{"name": "x", "type": "int"},
- {"name": "y", "type": "long"},
- {"name": "z", "type": ["null", "string"]}]
- }
- },
- ]
- })
-
- def test_invalid_enum(self):
- class Color:
- red = 1
- green = 2
- blue = 3
-
- class InvalidEnum(Record):
- a = Integer()
- b = Color
-
- # Enum will be ignored
- self.assertEqual(InvalidEnum.schema(),
- {'name': 'InvalidEnum', 'type': 'record',
- 'fields': [{'name': 'a', 'type': ["null", 'int']}]})
-
- def test_initialization(self):
- class Example(Record):
- a = Integer()
- b = Integer()
-
- r = Example(a=1, b=2)
- self.assertEqual(r.a, 1)
- self.assertEqual(r.b, 2)
-
- r.b = 5
-
- self.assertEqual(r.b, 5)
-
- # Setting non-declared field should fail
- try:
- r.c = 3
- self.fail('Should have failed')
- except AttributeError:
- # Expected
- pass
-
- try:
- Record(a=1, c=8)
- self.fail('Should have failed')
- except AttributeError:
- # Expected
- pass
-
- except TypeError:
- # Expected
- pass
-
- def _expectTypeError(self, func):
- try:
- func()
- self.fail('Should have failed')
- except TypeError:
- # Expected
- pass
-
- def test_field_type_check(self):
- class Example(Record):
- a = Integer()
- b = String(required=False)
-
- self._expectTypeError(lambda: Example(a=1, b=2))
-
- class E2(Record):
- a = Boolean()
-
- E2(a=False) # ok
- self._expectTypeError(lambda: E2(a=1))
-
- class E3(Record):
- a = Float()
-
- E3(a=1.0) # Ok
- self._expectTypeError(lambda: E3(a=1))
-
- class E4(Record):
- a = Null()
-
- E4(a=None) # Ok
- self._expectTypeError(lambda: E4(a=1))
-
- class E5(Record):
- a = Long()
-
- E5(a=1234) # Ok
- self._expectTypeError(lambda: E5(a=1.12))
-
- class E6(Record):
- a = String()
-
- E6(a="hello") # Ok
- self._expectTypeError(lambda: E5(a=1.12))
-
- class E6(Record):
- a = Bytes()
-
- E6(a="hello".encode('utf-8')) # Ok
- self._expectTypeError(lambda: E5(a=1.12))
-
- class E7(Record):
- a = Double()
-
- E7(a=1.0) # Ok
- self._expectTypeError(lambda: E3(a=1))
-
- class Color(Enum):
- red = 1
- green = 2
- blue = 3
-
- class OtherEnum(Enum):
- red = 1
- green = 2
- blue = 3
-
- class E8(Record):
- a = Color
-
- e = E8(a=Color.red) # Ok
- self.assertEqual(e.a, Color.red)
-
- e = E8(a='red') # Ok
- self.assertEqual(e.a, Color.red)
-
- e = E8(a=1) # Ok
- self.assertEqual(e.a, Color.red)
-
- self._expectTypeError(lambda: E8(a='redx'))
- self._expectTypeError(lambda: E8(a=OtherEnum.red))
- self._expectTypeError(lambda: E8(a=5))
-
- class E9(Record):
- a = Array(String())
-
- E9(a=['a', 'b', 'c']) # Ok
- self._expectTypeError(lambda: E9(a=1))
- self._expectTypeError(lambda: E9(a=[1, 2, 3]))
- self._expectTypeError(lambda: E9(a=['1', '2', 3]))
-
- class E10(Record):
- a = Map(Integer())
-
- E10(a={'a': 1, 'b': 2}) # Ok
- self._expectTypeError(lambda: E10(a=1))
- self._expectTypeError(lambda: E10(a={'a': '1', 'b': 2}))
- self._expectTypeError(lambda: E10(a={1: 1, 'b': 2}))
-
- class SubRecord1(Record):
- s = Integer()
-
- class SubRecord2(Record):
- s = String()
-
- class E11(Record):
- a = SubRecord1
-
- E11(a=SubRecord1(s=1)) # Ok
- self._expectTypeError(lambda: E11(a=1))
- self._expectTypeError(lambda: E11(a=SubRecord2(s='hello')))
-
- def test_field_type_check_defaults(self):
- try:
- class Example(Record):
- a = Integer(default="xyz")
-
- self.fail("Class declaration should have failed")
- except TypeError:
- pass # Expected
-
- def test_serialize_json(self):
- class Example(Record):
- a = Integer()
- b = Integer()
-
- self.assertEqual(Example.schema(), {
- "name": "Example",
- "type": "record",
- "fields": [
- {"name": "a", "type": ["null", "int"]},
- {"name": "b", "type": ["null", "int"]},
- ]
- })
-
- s = JsonSchema(Example)
- r = Example(a=1, b=2)
- data = s.encode(r)
- self.assertEqual(json.loads(data), {'a': 1, 'b': 2})
-
- r2 = s.decode(data)
- self.assertEqual(r2.__class__.__name__, 'Example')
- self.assertEqual(r2, r)
-
- def test_serialize_avro(self):
- class Example(Record):
- a = Integer()
- b = Integer()
-
- self.assertEqual(Example.schema(), {
- "name": "Example",
- "type": "record",
- "fields": [
- {"name": "a", "type": ["null", "int"]},
- {"name": "b", "type": ["null", "int"]},
- ]
- })
-
- s = AvroSchema(Example)
- r = Example(a=1, b=2)
- data = s.encode(r)
-
- r2 = s.decode(data)
- self.assertEqual(r2.__class__.__name__, 'Example')
- self.assertEqual(r2, r)
-
- def test_non_sorted_fields(self):
- class T1(Record):
- a = Integer()
- b = Integer()
- c = Double()
- d = String()
-
- class T2(Record):
- b = Integer()
- a = Integer()
- d = String()
- c = Double()
-
- self.assertNotEqual(T1.schema()['fields'], T2.schema()['fields'])
-
- def test_sorted_fields(self):
- class T1(Record):
- _sorted_fields = True
- a = Integer()
- b = Integer()
-
- class T2(Record):
- _sorted_fields = True
- b = Integer()
- a = Integer()
-
- self.assertEqual(T1.schema()['fields'], T2.schema()['fields'])
-
- def test_schema_version(self):
- class Example(Record):
- a = Integer()
- b = Integer()
-
- client = pulsar.Client(self.serviceUrl)
- producer = client.create_producer(
- 'my-avro-python-schema-version-topic',
- schema=AvroSchema(Example))
-
- consumer = client.subscribe('my-avro-python-schema-version-topic', 'sub-1',
- schema=AvroSchema(Example))
-
- r = Example(a=1, b=2)
- producer.send(r)
-
- msg = consumer.receive()
-
- self.assertIsNotNone(msg.schema_version())
-
- self.assertEquals(b'\x00\x00\x00\x00\x00\x00\x00\x00', msg.schema_version().encode())
-
- self.assertEqual(r, msg.value())
-
- client.close()
-
- def test_serialize_wrong_types(self):
- class Example(Record):
- a = Integer()
- b = Integer()
-
- class Foo(Record):
- x = Integer()
- y = Integer()
-
- s = JsonSchema(Example)
- try:
- data = s.encode(Foo(x=1, y=2))
- self.fail('Should have failed')
- except TypeError:
- pass # expected
-
- try:
- data = s.encode('hello')
- self.fail('Should have failed')
- except TypeError:
- pass # expected
-
- def test_defaults(self):
- class Example(Record):
- a = Integer(default=5)
- b = Integer()
- c = String(default='hello')
-
- r = Example()
- self.assertEqual(r.a, 5)
- self.assertEqual(r.b, None)
- self.assertEqual(r.c, 'hello')
-
- def test_none_value(self):
- """
- The objective of the test is to check that if no value is assigned to the attribute, the validation is returning
- the expect default value as defined in the Field class
- """
- class Example(Record):
- a = Null()
- b = Boolean()
- c = Integer()
- d = Long()
- e = Float()
- f = Double()
- g = Bytes()
- h = String()
-
- r = Example()
-
- self.assertIsNone(r.a)
- self.assertFalse(r.b)
- self.assertIsNone(r.c)
- self.assertIsNone(r.d)
- self.assertIsNone(r.e)
- self.assertIsNone(r.f)
- self.assertIsNone(r.g)
- self.assertIsNone(r.h)
- ####
-
- def test_json_schema(self):
-
- class Example(Record):
- a = Integer()
- b = Integer()
-
- # Incompatible variation of the class
- class BadExample(Record):
- a = String()
- b = Integer()
-
- client = pulsar.Client(self.serviceUrl)
- producer = client.create_producer(
- 'my-json-python-topic',
- schema=JsonSchema(Example))
-
- # Validate that incompatible schema is rejected
- try:
- client.subscribe('my-json-python-topic', 'sub-1',
- schema=JsonSchema(BadExample))
- self.fail('Should have failed')
- except Exception as e:
- pass # Expected
-
- try:
- client.subscribe('my-json-python-topic', 'sub-1',
- schema=StringSchema(BadExample))
- self.fail('Should have failed')
- except Exception as e:
- pass # Expected
-
- try:
- client.subscribe('my-json-python-topic', 'sub-1',
- schema=AvroSchema(BadExample))
- self.fail('Should have failed')
- except Exception as e:
- pass # Expected
-
- consumer = client.subscribe('my-json-python-topic', 'sub-1',
- schema=JsonSchema(Example))
-
- r = Example(a=1, b=2)
- producer.send(r)
-
- msg = consumer.receive()
-
- self.assertEqual(r, msg.value())
-
- producer.close()
- consumer.close()
- client.close()
-
- def test_string_schema(self):
- client = pulsar.Client(self.serviceUrl)
- producer = client.create_producer(
- 'my-string-python-topic',
- schema=StringSchema())
-
-
- # Validate that incompatible schema is rejected
- try:
- class Example(Record):
- a = Integer()
- b = Integer()
-
- client.create_producer('my-string-python-topic',
- schema=JsonSchema(Example))
- self.fail('Should have failed')
- except Exception as e:
- pass # Expected
-
- consumer = client.subscribe('my-string-python-topic', 'sub-1',
- schema=StringSchema())
-
- producer.send("Hello")
-
- msg = consumer.receive()
-
- self.assertEqual("Hello", msg.value())
- self.assertEqual(b"Hello", msg.data())
- client.close()
-
- def test_bytes_schema(self):
- client = pulsar.Client(self.serviceUrl)
- producer = client.create_producer(
- 'my-bytes-python-topic',
- schema=BytesSchema())
-
- # Validate that incompatible schema is rejected
- try:
- class Example(Record):
- a = Integer()
- b = Integer()
-
- client.create_producer('my-bytes-python-topic',
- schema=JsonSchema(Example))
- self.fail('Should have failed')
- except Exception as e:
- pass # Expected
-
- consumer = client.subscribe('my-bytes-python-topic', 'sub-1',
- schema=BytesSchema())
-
- producer.send(b"Hello")
-
- msg = consumer.receive()
-
- self.assertEqual(b"Hello", msg.value())
- client.close()
-
- def test_avro_schema(self):
-
- class Example(Record):
- a = Integer()
- b = Integer()
-
- # Incompatible variation of the class
- class BadExample(Record):
- a = String()
- b = Integer()
-
- client = pulsar.Client(self.serviceUrl)
- producer = client.create_producer(
- 'my-avro-python-topic',
- schema=AvroSchema(Example))
-
- # Validate that incompatible schema is rejected
- try:
- client.subscribe('my-avro-python-topic', 'sub-1',
- schema=AvroSchema(BadExample))
- self.fail('Should have failed')
- except Exception as e:
- pass # Expected
-
- try:
- client.subscribe('my-avro-python-topic', 'sub-2',
- schema=JsonSchema(Example))
- self.fail('Should have failed')
- except Exception as e:
- pass # Expected
-
- consumer = client.subscribe('my-avro-python-topic', 'sub-3',
- schema=AvroSchema(Example))
-
- r = Example(a=1, b=2)
- producer.send(r)
-
- msg = consumer.receive()
-
- self.assertEqual(r, msg.value())
-
- producer.close()
- consumer.close()
- client.close()
-
- def test_json_enum(self):
- class MyEnum(Enum):
- A = 1
- B = 2
- C = 3
-
- class Example(Record):
- name = String()
- v = MyEnum
- w = CustomEnum(MyEnum)
- x = CustomEnum(MyEnum, required=True, default=MyEnum.A, required_default=True)
-
- topic = 'my-json-enum-topic'
-
- client = pulsar.Client(self.serviceUrl)
- producer = client.create_producer(
- topic=topic,
- schema=JsonSchema(Example))
-
- consumer = client.subscribe(topic, 'test',
- schema=JsonSchema(Example))
-
- r = Example(name='test', v=MyEnum.C, w=MyEnum.B)
- producer.send(r)
-
- msg = consumer.receive()
-
- self.assertEqual('test', msg.value().name)
- self.assertEqual(MyEnum.C, MyEnum(msg.value().v))
- self.assertEqual(MyEnum.B, MyEnum(msg.value().w))
- self.assertEqual(MyEnum.A, MyEnum(msg.value().x))
- client.close()
-
- def test_avro_enum(self):
- class MyEnum(Enum):
- A = 1
- B = 2
- C = 3
-
- class Example(Record):
- name = String()
- v = MyEnum
- w = CustomEnum(MyEnum)
- x = CustomEnum(MyEnum, required=True, default=MyEnum.B, required_default=True)
-
- topic = 'my-avro-enum-topic'
-
- client = pulsar.Client(self.serviceUrl)
- producer = client.create_producer(
- topic=topic,
- schema=AvroSchema(Example))
-
- consumer = client.subscribe(topic, 'test',
- schema=AvroSchema(Example))
-
- r = Example(name='test', v=MyEnum.C, w=MyEnum.A)
- producer.send(r)
-
- msg = consumer.receive()
- msg.value()
- self.assertEqual(MyEnum.C, msg.value().v)
- self.assertEqual(MyEnum.A, MyEnum(msg.value().w))
- self.assertEqual(MyEnum.B, MyEnum(msg.value().x))
- client.close()
-
- def test_avro_map_array(self):
- class MapArray(Record):
- values = Map(Array(Integer()))
-
- class MapMap(Record):
- values = Map(Map(Integer()))
-
- class ArrayMap(Record):
- values = Array(Map(Integer()))
-
- class ArrayArray(Record):
- values = Array(Array(Integer()))
-
- topic_prefix = "my-avro-map-array-topic-"
- data_list = (
- (topic_prefix + "0", AvroSchema(MapArray),
- MapArray(values={"A": [1, 2], "B": [3]})),
- (topic_prefix + "1", AvroSchema(MapMap),
- MapMap(values={"A": {"B": 2},})),
- (topic_prefix + "2", AvroSchema(ArrayMap),
- ArrayMap(values=[{"A": 1}, {"B": 2}, {"C": 3}])),
- (topic_prefix + "3", AvroSchema(ArrayArray),
- ArrayArray(values=[[1, 2, 3], [4]])),
- )
-
- client = pulsar.Client(self.serviceUrl)
- for data in data_list:
- topic = data[0]
- schema = data[1]
- record = data[2]
-
- producer = client.create_producer(topic, schema=schema)
- consumer = client.subscribe(topic, 'sub', schema=schema)
-
- producer.send(record)
- msg = consumer.receive()
- self.assertEqual(msg.value().values, record.values)
- consumer.acknowledge(msg)
- consumer.close()
- producer.close()
-
- client.close()
-
- def test_avro_required_default(self):
- class MySubRecord(Record):
- _sorted_fields = True
- x = Integer()
- y = Long()
- z = String()
-
- class Example(Record):
- a = Integer()
- b = Boolean(required=True)
- c = Long()
- d = Float()
- e = Double()
- f = String()
- g = Bytes()
- h = Array(String())
- i = Map(String())
- j = MySubRecord()
-
-
- class ExampleRequiredDefault(Record):
- _sorted_fields = True
- a = Integer(required_default=True)
- b = Boolean(required=True, required_default=True)
- c = Long(required_default=True)
- d = Float(required_default=True)
- e = Double(required_default=True)
- f = String(required_default=True)
- g = Bytes(required_default=True)
- h = Array(String(), required_default=True)
- i = Map(String(), required_default=True)
- j = MySubRecord(required_default=True)
- self.assertEqual(ExampleRequiredDefault.schema(), {
- "name": "ExampleRequiredDefault",
- "type": "record",
- "fields": [
- {
- "name": "a",
- "type": [
- "null",
- "int"
- ],
- "default": None
- },
- {
- "name": "b",
- "type": "boolean",
- "default": False
- },
- {
- "name": "c",
- "type": [
- "null",
- "long"
- ],
- "default": None
- },
- {
- "name": "d",
- "type": [
- "null",
- "float"
- ],
- "default": None
- },
- {
- "name": "e",
- "type": [
- "null",
- "double"
- ],
- "default": None
- },
- {
- "name": "f",
- "type": [
- "null",
- "string"
- ],
- "default": None
- },
- {
- "name": "g",
- "type": [
- "null",
- "bytes"
- ],
- "default": None
- },
- {
- "name": "h",
- "type": [
- "null",
- {
- "type": "array",
- "items": "string"
- }
- ],
- "default": None
- },
- {
- "name": "i",
- "type": [
- "null",
- {
- "type": "map",
- "values": "string"
- }
- ],
- "default": None
- },
- {
- "name": "j",
- "type": [
- "null",
- {
- "name": "MySubRecord",
- "type": "record",
- "fields": [
- {
- "name": "x",
- "type": [
- "null",
- "int"
- ]
- },
- {
- "name": "y",
- "type": [
- "null",
- "long"
- ],
- },
- {
- "name": "z",
- "type": [
- "null",
- "string"
- ]
- }
- ]
- }
- ],
- "default": None
- }
- ]
- })
-
- client = pulsar.Client(self.serviceUrl)
- producer = client.create_producer(
- 'my-avro-python-default-topic',
- schema=AvroSchema(Example))
-
- producer_default = client.create_producer(
- 'my-avro-python-default-topic',
- schema=AvroSchema(ExampleRequiredDefault))
-
- producer.close()
- producer_default.close()
-
- client.close()
-
- def test_default_value(self):
- class MyRecord(Record):
- A = Integer()
- B = String()
- C = Boolean(default=True, required=True)
- D = Double(default=6.4)
-
- topic = "my-default-value-topic"
-
- client = pulsar.Client(self.serviceUrl)
- producer = client.create_producer(
- topic=topic,
- schema=JsonSchema(MyRecord))
-
- consumer = client.subscribe(topic, 'test', schema=JsonSchema(MyRecord))
-
- r = MyRecord(A=5, B="text")
- producer.send(r)
-
- msg = consumer.receive()
- self.assertEqual(msg.value().A, 5)
- self.assertEqual(msg.value().B, u'text')
- self.assertEqual(msg.value().C, True)
- self.assertEqual(msg.value().D, 6.4)
-
- producer.close()
- consumer.close()
- client.close()
-
- def test_serialize_schema_complex(self):
- class Color(Enum):
- red = 1
- green = 2
- blue = 3
-
- class NestedObj1(Record):
- _sorted_fields = True
- na1 = String()
- nb1 = Double()
-
- class NestedObj2(Record):
- _sorted_fields = True
- na2 = Integer()
- nb2 = Boolean()
- nc2 = NestedObj1()
-
- class NestedObj3(Record):
- _sorted_fields = True
- color = CustomEnum(Color)
- na3 = Integer()
-
- class NestedObj4(Record):
- _avro_namespace = 'xxx4'
- _sorted_fields = True
- na4 = String()
- nb4 = Integer()
-
- class ComplexRecord(Record):
- _avro_namespace = 'xxx.xxx'
- _sorted_fields = True
- a = Integer()
- b = Integer()
- color = Color
- color2 = Color
- color3 = CustomEnum(Color, required=True, default=Color.red, required_default=True)
- nested = NestedObj2()
- nested2 = NestedObj2()
- mapNested = Map(NestedObj3())
- mapNested2 = Map(NestedObj3())
- arrayNested = Array(NestedObj4())
- arrayNested2 = Array(NestedObj4())
-
- print('complex schema: ', ComplexRecord.schema())
- self.assertEqual(ComplexRecord.schema(), {
- "name": "ComplexRecord",
- "namespace": "xxx.xxx",
- "type": "record",
- "fields": [
- {"name": "a", "type": ["null", "int"]},
- {'name': 'arrayNested', 'type': ['null', {'type': 'array', 'items':
- {'name': 'NestedObj4', 'namespace': 'xxx4', 'type': 'record', 'fields': [
- {'name': 'na4', 'type': ['null', 'string']},
- {'name': 'nb4', 'type': ['null', 'int']}
- ]}}
- ]},
- {'name': 'arrayNested2', 'type': ['null', {'type': 'array', 'items': 'xxx4.NestedObj4'}]},
- {"name": "b", "type": ["null", "int"]},
- {'name': 'color', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols': [
- 'red', 'green', 'blue']}]},
- {'name': 'color2', 'type': ['null', 'Color']},
- {'name': 'color3', 'default': 'red', 'type': 'Color'},
- {'name': 'mapNested', 'type': ['null', {'type': 'map', 'values':
- {'name': 'NestedObj3', 'type': 'record', 'fields': [
- {'name': 'color', 'type': ['null', 'Color']},
- {'name': 'na3', 'type': ['null', 'int']}
- ]}}
- ]},
- {'name': 'mapNested2', 'type': ['null', {'type': 'map', 'values': 'NestedObj3'}]},
- {"name": "nested", "type": ['null', {'name': 'NestedObj2', 'type': 'record', 'fields': [
- {'name': 'na2', 'type': ['null', 'int']},
- {'name': 'nb2', 'type': ['null', 'boolean']},
- {'name': 'nc2', 'type': ['null', {'name': 'NestedObj1', 'type': 'record', 'fields': [
- {'name': 'na1', 'type': ['null', 'string']},
- {'name': 'nb1', 'type': ['null', 'double']}
- ]}]}
- ]}]},
- {"name": "nested2", "type": ['null', 'NestedObj2']}
- ]
- })
-
- def encode_and_decode(schema_type):
- data_schema = AvroSchema(ComplexRecord)
- if schema_type == 'json':
- data_schema = JsonSchema(ComplexRecord)
-
- nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
- nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
- r = ComplexRecord(a=1, b=2, color=Color.red, color2=Color.blue,
- nested=nested_obj2, nested2=nested_obj2,
- mapNested={
- 'a': NestedObj3(na3=1, color=Color.green),
- 'b': NestedObj3(na3=2),
- 'c': NestedObj3(na3=3, color=Color.red)
- }, mapNested2={
- 'd': NestedObj3(na3=4, color=Color.red),
- 'e': NestedObj3(na3=5, color=Color.blue),
- 'f': NestedObj3(na3=6)
- }, arrayNested=[
- NestedObj4(na4='value na4 1', nb4=100),
- NestedObj4(na4='value na4 2', nb4=200)
- ], arrayNested2=[
- NestedObj4(na4='value na4 3', nb4=300),
- NestedObj4(na4='value na4 4', nb4=400)
- ])
- data_encode = data_schema.encode(r)
-
- data_decode = data_schema.decode(data_encode)
- self.assertEqual(data_decode.__class__.__name__, 'ComplexRecord')
- self.assertEqual(data_decode, r)
- self.assertEqual(r.color3, Color.red)
- self.assertEqual(r.mapNested['a'].color, Color.green)
- self.assertEqual(r.mapNested['b'].color, None)
- print('Encode and decode complex schema finish. schema_type: ', schema_type)
-
- encode_and_decode('avro')
- encode_and_decode('json')
-
- def test_sub_record_set_to_none(self):
- class NestedObj1(Record):
- na1 = String()
- nb1 = Double()
-
- class NestedObj2(Record):
- na2 = Integer()
- nb2 = Boolean()
- nc2 = NestedObj1()
-
- data_schema = AvroSchema(NestedObj2)
- r = NestedObj2(na2=1, nb2=True)
-
- data_encode = data_schema.encode(r)
- data_decode = data_schema.decode(data_encode)
-
- self.assertEqual(data_decode.__class__.__name__, 'NestedObj2')
- self.assertEqual(data_decode, r)
- self.assertEqual(data_decode.na2, 1)
- self.assertTrue(data_decode.nb2)
-
- def test_produce_and_consume_complex_schema_data(self):
- class Color(Enum):
- red = 1
- green = 2
- blue = 3
-
- class NestedObj1(Record):
- na1 = String()
- nb1 = Double()
-
- class NestedObj2(Record):
- na2 = Integer()
- nb2 = Boolean()
- nc2 = NestedObj1()
-
- class NestedObj3(Record):
- na3 = Integer()
- color = CustomEnum(Color, required=True, required_default=True, default=Color.blue)
-
- class NestedObj4(Record):
- na4 = String()
- nb4 = Integer()
-
- class ComplexRecord(Record):
- a = Integer()
- b = Integer()
- color = CustomEnum(Color)
- nested = NestedObj2()
- mapNested = Map(NestedObj3())
- arrayNested = Array(NestedObj4())
-
- client = pulsar.Client(self.serviceUrl)
-
- def produce_consume_test(schema_type):
- topic = "my-complex-schema-topic-" + schema_type
-
- data_schema = AvroSchema(ComplexRecord)
- if schema_type == 'json':
- data_schema= JsonSchema(ComplexRecord)
-
- producer = client.create_producer(
- topic=topic,
- schema=data_schema)
-
- consumer = client.subscribe(topic, 'test', schema=data_schema)
-
- nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
- nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
- r = ComplexRecord(a=1, b=2, nested=nested_obj2, mapNested={
- 'a': NestedObj3(na3=1, color=Color.red),
- 'b': NestedObj3(na3=2, color=Color.green),
- 'c': NestedObj3(na3=3)
- }, arrayNested=[
- NestedObj4(na4='value na4 1', nb4=100),
- NestedObj4(na4='value na4 2', nb4=200)
- ])
- producer.send(r)
-
- msg = consumer.receive()
- value = msg.value()
- self.assertEqual(value.__class__.__name__, 'ComplexRecord')
- self.assertEqual(value, r)
-
- print('Produce and consume complex schema data finish. schema_type', schema_type)
-
- produce_consume_test('avro')
- produce_consume_test('json')
-
- client.close()
-
- def custom_schema_test(self):
-
- def encode_and_decode(schema_definition):
- avro_schema = AvroSchema(None, schema_definition=schema_definition)
-
- company = {
- "name": "company-name",
- "address": 'xxx road xxx street',
- "employees": [
- {"name": "user1", "age": 25},
- {"name": "user2", "age": 30},
- {"name": "user3", "age": 35},
- ],
- "labels": {
- "industry": "software",
- "scale": ">100",
- "funds": "1000000.0"
- },
- "companyType": "companyType1"
- }
- data = avro_schema.encode(company)
- company_decode = avro_schema.decode(data)
- self.assertEqual(company, company_decode)
-
- schema_definition = {
- 'doc': 'this is doc',
- 'namespace': 'example.avro',
- 'type': 'record',
- 'name': 'Company',
- 'fields': [
- {'name': 'name', 'type': ['null', 'string']},
- {'name': 'address', 'type': ['null', 'string']},
- {'name': 'employees', 'type': ['null', {'type': 'array', 'items': {
- 'type': 'record',
- 'name': 'Employee',
- 'fields': [
- {'name': 'name', 'type': ['null', 'string']},
- {'name': 'age', 'type': ['null', 'int']}
- ]
- }}]},
- {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]},
- {'name': 'companyType', 'type': ['null', {'type': 'enum', 'name': 'CompanyType', 'symbols':
- ['companyType1', 'companyType2', 'companyType3']}]}
- ]
- }
- encode_and_decode(schema_definition)
- # Users could load schema from file by `fastavro.schema`
- # Or use `avro.schema` like this `avro.schema.parse(open("examples/company.avsc", "rb").read()).to_json()`
- encode_and_decode(load_schema("examples/company.avsc"))
-
- def custom_schema_produce_and_consume_test(self):
- client = pulsar.Client(self.serviceUrl)
-
- def produce_and_consume(topic, schema_definition):
- print('custom schema produce and consume test topic - ', topic)
- example_avro_schema = AvroSchema(None, schema_definition=schema_definition)
-
- producer = client.create_producer(
- topic=topic,
- schema=example_avro_schema)
- consumer = client.subscribe(topic, 'test', schema=example_avro_schema)
-
- for i in range(0, 10):
- company = {
- "name": "company-name" + str(i),
- "address": 'xxx road xxx street ' + str(i),
- "employees": [
- {"name": "user" + str(i), "age": 20 + i},
- {"name": "user" + str(i), "age": 30 + i},
- {"name": "user" + str(i), "age": 35 + i},
- ],
- "labels": {
- "industry": "software" + str(i),
- "scale": ">100",
- "funds": "1000000.0"
- },
- "companyType": "companyType" + str((i % 3) + 1)
- }
- producer.send(company)
-
- for i in range(0, 10):
- msg = consumer.receive()
- company = {
- "name": "company-name" + str(i),
- "address": 'xxx road xxx street ' + str(i),
- "employees": [
- {"name": "user" + str(i), "age": 20 + i},
- {"name": "user" + str(i), "age": 30 + i},
- {"name": "user" + str(i), "age": 35 + i},
- ],
- "labels": {
- "industry": "software" + str(i),
- "scale": ">100",
- "funds": "1000000.0"
- }
- }
- self.assertEqual(msg.value(), company)
- consumer.acknowledge(msg)
-
- consumer.close()
- producer.close()
-
- schema_definition = {
- 'doc': 'this is doc',
- 'namespace': 'example.avro',
- 'type': 'record',
- 'name': 'Company',
- 'fields': [
- {'name': 'name', 'type': ['null', 'string']},
- {'name': 'address', 'type': ['null', 'string']},
- {'name': 'employees', 'type': ['null', {'type': 'array', 'items': {
- 'type': 'record',
- 'name': 'Employee',
- 'fields': [
- {'name': 'name', 'type': ['null', 'string']},
- {'name': 'age', 'type': ['null', 'int']}
- ]
- }}]},
- {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]}
- ]
- }
- produce_and_consume('custom-schema-test-1', schema_definition=schema_definition)
- produce_and_consume('custom-schema-test-2', schema_definition=load_schema("examples/company.avsc"))
-
- client.close()
-
- def test_json_schema_encode_remove_reserved_key(self):
- class SchemaB(Record):
- field = String(required=True)
-
- class SchemaA(Record):
- field = SchemaB()
-
- a = SchemaA(field=SchemaB(field="something"))
- b = JsonSchema(SchemaA).encode(a)
- # reserved field should not be in the encoded json
- self.assertTrue(b'_default' not in b)
- self.assertTrue(b'_required' not in b)
- self.assertTrue(b'_required_default' not in b)
-
- def test_schema_array_wrong_type(self):
- class SomeSchema(Record):
- some_field = Array(Integer(), required=False, default=[])
- # descriptive error message
- with self.assertRaises(TypeError) as e:
- SomeSchema(some_field=["not", "integer"])
- self.assertEqual(str(e.exception), "Array field some_field items should all be of type int")
-if __name__ == '__main__':
- main()
diff --git a/python/setup.py b/python/setup.py
deleted file mode 100644
index 684d809..0000000
--- a/python/setup.py
+++ /dev/null
@@ -1,117 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from setuptools import setup
-from distutils.core import Extension
-from distutils.util import strtobool
-from os import environ
-
-from distutils.command import build_ext
-
-import xml.etree.ElementTree as ET
-from os.path import dirname, realpath, join
-
-def get_version():
- use_full_pom_name = strtobool(environ.get('USE_FULL_POM_NAME', 'False'))
-
- # Get the pulsar version from pom.xml
- TOP_LEVEL_PATH = dirname(dirname(dirname(realpath(__file__))))
- POM_PATH = join(TOP_LEVEL_PATH, 'pom.xml')
- root = ET.XML(open(POM_PATH).read())
- version = root.find('{http://maven.apache.org/POM/4.0.0}version').text.strip()
-
- if use_full_pom_name:
- return version
- else:
- # Strip the '-incubating' suffix, since it prevents the packages
- # from being uploaded into PyPI
- return version.split('-')[0]
-
-
-def get_name():
- postfix = environ.get('NAME_POSTFIX', '')
- base = 'pulsar-client'
- return base + postfix
-
-
-VERSION = get_version()
-NAME = get_name()
-
-print(VERSION)
-print(NAME)
-
-
-# This is a workaround to have setuptools to include
-# the already compiled _pulsar.so library
-class my_build_ext(build_ext.build_ext):
- def build_extension(self, ext):
- import shutil
- import os.path
-
- try:
- os.makedirs(os.path.dirname(self.get_ext_fullpath(ext.name)))
- except OSError as e:
- if e.errno != 17: # already exists
- raise
- shutil.copyfile('_pulsar.so', self.get_ext_fullpath(ext.name))
-
-
-# Core Client dependencies
-dependencies = [
- 'certifi',
-]
-
-extras_require = {}
-
-# functions dependencies
-extras_require["functions"] = sorted(
- {
- "protobuf>=3.6.1,<=3.20.*",
- "grpcio<1.28,>=1.8.2",
- "apache-bookkeeper-client>=4.9.2",
- "prometheus_client",
- "ratelimit"
- }
-)
-
-# avro dependencies
-extras_require["avro"] = sorted(
- {
- "fastavro==0.24.0"
- }
-)
-
-# all dependencies
-extras_require["all"] = sorted(set(sum(extras_require.values(), [])))
-
-setup(
- name=NAME,
- version=VERSION,
- packages=['pulsar', 'pulsar.schema', 'pulsar.functions'],
- cmdclass={'build_ext': my_build_ext},
- ext_modules=[Extension('_pulsar', [])],
-
- author="Pulsar Devs",
- author_email="dev@pulsar.apache.org",
- description="Apache Pulsar Python client library",
- license="Apache License v2.0",
- url="https://pulsar.apache.org/",
- install_requires=dependencies,
- extras_require=extras_require,
-)
diff --git a/python/src/authentication.cc b/python/src/authentication.cc
deleted file mode 100644
index 7917498..0000000
--- a/python/src/authentication.cc
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "utils.h"
-
-AuthenticationWrapper::AuthenticationWrapper() {}
-
-AuthenticationWrapper::AuthenticationWrapper(const std::string& dynamicLibPath,
- const std::string& authParamsString) {
- this->auth = AuthFactory::create(dynamicLibPath, authParamsString);
-}
-
-struct AuthenticationTlsWrapper : public AuthenticationWrapper {
- AuthenticationTlsWrapper(const std::string& certificatePath, const std::string& privateKeyPath)
- : AuthenticationWrapper() {
- this->auth = AuthTls::create(certificatePath, privateKeyPath);
- }
-};
-
-struct TokenSupplierWrapper {
- PyObject* _pySupplier;
-
- TokenSupplierWrapper(py::object pySupplier) : _pySupplier(pySupplier.ptr()) { Py_XINCREF(_pySupplier); }
-
- TokenSupplierWrapper(const TokenSupplierWrapper& other) {
- _pySupplier = other._pySupplier;
- Py_XINCREF(_pySupplier);
- }
-
- TokenSupplierWrapper& operator=(const TokenSupplierWrapper& other) {
- _pySupplier = other._pySupplier;
- Py_XINCREF(_pySupplier);
- return *this;
- }
-
- virtual ~TokenSupplierWrapper() { Py_XDECREF(_pySupplier); }
-
- std::string operator()() {
- PyGILState_STATE state = PyGILState_Ensure();
-
- std::string token;
- try {
- token = py::call<std::string>(_pySupplier);
- } catch (const py::error_already_set& e) {
- PyErr_Print();
- }
-
- PyGILState_Release(state);
- return token;
- }
-};
-
-struct AuthenticationTokenWrapper : public AuthenticationWrapper {
- AuthenticationTokenWrapper(py::object token) : AuthenticationWrapper() {
- if (py::extract<std::string>(token).check()) {
- // It's a string
- std::string tokenStr = py::extract<std::string>(token);
- this->auth = AuthToken::createWithToken(tokenStr);
- } else {
- // It's a function object
- this->auth = AuthToken::create(TokenSupplierWrapper(token));
- }
- }
-};
-
-struct AuthenticationAthenzWrapper : public AuthenticationWrapper {
- AuthenticationAthenzWrapper(const std::string& authParamsString) : AuthenticationWrapper() {
- this->auth = AuthAthenz::create(authParamsString);
- }
-};
-
-struct AuthenticationOauth2Wrapper : public AuthenticationWrapper {
- AuthenticationOauth2Wrapper(const std::string& authParamsString) : AuthenticationWrapper() {
- this->auth = AuthOauth2::create(authParamsString);
- }
-};
-
-struct AuthenticationBasicWrapper : public AuthenticationWrapper {
- AuthenticationBasicWrapper(const std::string& username, const std::string& password)
- : AuthenticationWrapper() {
- this->auth = AuthBasic::create(username, password);
- }
-};
-
-void export_authentication() {
- using namespace boost::python;
-
- class_<AuthenticationWrapper>("Authentication", init<const std::string&, const std::string&>());
-
- class_<AuthenticationTlsWrapper, bases<AuthenticationWrapper> >(
- "AuthenticationTLS", init<const std::string&, const std::string&>());
-
- class_<AuthenticationTokenWrapper, bases<AuthenticationWrapper> >("AuthenticationToken",
- init<py::object>());
-
- class_<AuthenticationAthenzWrapper, bases<AuthenticationWrapper> >("AuthenticationAthenz",
- init<const std::string&>());
-
- class_<AuthenticationOauth2Wrapper, bases<AuthenticationWrapper> >("AuthenticationOauth2",
- init<const std::string&>());
-
- class_<AuthenticationBasicWrapper, bases<AuthenticationWrapper> >(
- "AuthenticationBasic", init<const std::string&, const std::string&>());
-}
diff --git a/python/src/client.cc b/python/src/client.cc
deleted file mode 100644
index 701578d..0000000
--- a/python/src/client.cc
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "utils.h"
-
-Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) {
- Producer producer;
-
- waitForAsyncValue(std::function<void(CreateProducerCallback)>([&](CreateProducerCallback callback) {
- client.createProducerAsync(topic, conf, callback);
- }),
- producer);
-
- return producer;
-}
-
-Consumer Client_subscribe(Client& client, const std::string& topic, const std::string& subscriptionName,
- const ConsumerConfiguration& conf) {
- Consumer consumer;
-
- waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
- client.subscribeAsync(topic, subscriptionName, conf, callback);
- }),
- consumer);
-
- return consumer;
-}
-
-Consumer Client_subscribe_topics(Client& client, boost::python::list& topics,
- const std::string& subscriptionName, const ConsumerConfiguration& conf) {
- std::vector<std::string> topics_vector;
- for (int i = 0; i < len(topics); i++) {
- std::string content = boost::python::extract<std::string>(topics[i]);
- topics_vector.push_back(content);
- }
-
- Consumer consumer;
-
- waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
- client.subscribeAsync(topics_vector, subscriptionName, conf, callback);
- }),
- consumer);
-
- return consumer;
-}
-
-Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern,
- const std::string& subscriptionName, const ConsumerConfiguration& conf) {
- Consumer consumer;
-
- waitForAsyncValue(std::function<void(SubscribeCallback)>([&](SubscribeCallback callback) {
- client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
- }),
- consumer);
-
- return consumer;
-}
-
-Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId,
- const ReaderConfiguration& conf) {
- Reader reader;
-
- waitForAsyncValue(std::function<void(ReaderCallback)>([&](ReaderCallback callback) {
- client.createReaderAsync(topic, startMessageId, conf, callback);
- }),
- reader);
-
- return reader;
-}
-
-boost::python::list Client_getTopicPartitions(Client& client, const std::string& topic) {
- std::vector<std::string> partitions;
-
- waitForAsyncValue(std::function<void(GetPartitionsCallback)>([&](GetPartitionsCallback callback) {
- client.getPartitionsForTopicAsync(topic, callback);
- }),
- partitions);
-
- boost::python::list pyList;
- for (int i = 0; i < partitions.size(); i++) {
- pyList.append(boost::python::object(partitions[i]));
- }
-
- return pyList;
-}
-
-void Client_close(Client& client) {
- waitForAsyncResult([&](ResultCallback callback) { client.closeAsync(callback); });
-}
-
-void export_client() {
- using namespace boost::python;
-
- class_<Client>("Client", init<const std::string&, const ClientConfiguration&>())
- .def("create_producer", &Client_createProducer)
- .def("subscribe", &Client_subscribe)
- .def("subscribe_topics", &Client_subscribe_topics)
- .def("subscribe_pattern", &Client_subscribe_pattern)
- .def("create_reader", &Client_createReader)
- .def("get_topic_partitions", &Client_getTopicPartitions)
- .def("close", &Client_close)
- .def("shutdown", &Client::shutdown);
-}
diff --git a/python/src/config.cc b/python/src/config.cc
deleted file mode 100644
index fed9c28..0000000
--- a/python/src/config.cc
+++ /dev/null
@@ -1,300 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "utils.h"
-#include <pulsar/ConsoleLoggerFactory.h>
-#include "lib/Utils.h"
-#include <memory>
-
-template <typename T>
-struct ListenerWrapper {
- PyObject* _pyListener;
-
- ListenerWrapper(py::object pyListener) : _pyListener(pyListener.ptr()) { Py_XINCREF(_pyListener); }
-
- ListenerWrapper(const ListenerWrapper& other) {
- _pyListener = other._pyListener;
- Py_XINCREF(_pyListener);
- }
-
- ListenerWrapper& operator=(const ListenerWrapper& other) {
- _pyListener = other._pyListener;
- Py_XINCREF(_pyListener);
- return *this;
- }
-
- virtual ~ListenerWrapper() { Py_XDECREF(_pyListener); }
-
- void operator()(T consumer, const Message& msg) {
- PyGILState_STATE state = PyGILState_Ensure();
-
- try {
- py::call<void>(_pyListener, py::object(&consumer), py::object(&msg));
- } catch (const py::error_already_set& e) {
- PyErr_Print();
- }
-
- PyGILState_Release(state);
- }
-};
-
-static ConsumerConfiguration& ConsumerConfiguration_setMessageListener(ConsumerConfiguration& conf,
- py::object pyListener) {
- conf.setMessageListener(ListenerWrapper<Consumer>(pyListener));
- return conf;
-}
-
-static ReaderConfiguration& ReaderConfiguration_setReaderListener(ReaderConfiguration& conf,
- py::object pyListener) {
- conf.setReaderListener(ListenerWrapper<Reader>(pyListener));
- return conf;
-}
-
-static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfiguration& conf,
- py::object authentication) {
- AuthenticationWrapper wrapper = py::extract<AuthenticationWrapper>(authentication);
- conf.setAuth(wrapper.auth);
- return conf;
-}
-
-static ConsumerConfiguration& ConsumerConfiguration_setCryptoKeyReader(ConsumerConfiguration& conf,
- py::object cryptoKeyReader) {
- CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
- conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
- return conf;
-}
-
-static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerConfiguration& conf,
- py::object cryptoKeyReader) {
- CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
- conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
- return conf;
-}
-
-static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfiguration& conf,
- py::object cryptoKeyReader) {
- CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract<CryptoKeyReaderWrapper>(cryptoKeyReader);
- conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader);
- return conf;
-}
-
-class LoggerWrapper : public Logger, public CaptivePythonObjectMixin {
- const std::unique_ptr<Logger> _fallbackLogger;
-
- public:
- LoggerWrapper(PyObject* pyLogger, Logger* fallbackLogger)
- : CaptivePythonObjectMixin(pyLogger), _fallbackLogger(fallbackLogger) {}
-
- LoggerWrapper(const LoggerWrapper&) = delete;
- LoggerWrapper(LoggerWrapper&&) noexcept = delete;
- LoggerWrapper& operator=(const LoggerWrapper&) = delete;
- LoggerWrapper& operator=(LoggerWrapper&&) = delete;
-
- bool isEnabled(Level level) {
- return true; // Python loggers are always enabled; they decide internally whether or not to log.
- }
-
- void log(Level level, int line, const std::string& message) {
- if (!Py_IsInitialized()) {
- // Python logger is unavailable - fallback to console logger
- _fallbackLogger->log(level, line, message);
- } else {
- PyGILState_STATE state = PyGILState_Ensure();
- PyObject *type, *value, *traceback;
- PyErr_Fetch(&type, &value, &traceback);
- try {
- switch (level) {
- case Logger::LEVEL_DEBUG:
- py::call<void>(_captive, "DEBUG", message.c_str());
- break;
- case Logger::LEVEL_INFO:
- py::call<void>(_captive, "INFO", message.c_str());
- break;
- case Logger::LEVEL_WARN:
- py::call<void>(_captive, "WARNING", message.c_str());
- break;
- case Logger::LEVEL_ERROR:
- py::call<void>(_captive, "ERROR", message.c_str());
- break;
- }
- } catch (const py::error_already_set& e) {
- PyErr_Print();
- _fallbackLogger->log(level, line, message);
- }
- PyErr_Restore(type, value, traceback);
- PyGILState_Release(state);
- }
- }
-};
-
-class LoggerWrapperFactory : public LoggerFactory, public CaptivePythonObjectMixin {
- std::unique_ptr<LoggerFactory> _fallbackLoggerFactory{new ConsoleLoggerFactory};
-
- public:
- LoggerWrapperFactory(py::object pyLogger) : CaptivePythonObjectMixin(pyLogger.ptr()) {}
-
- Logger* getLogger(const std::string& fileName) {
- const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName);
- if (_captive == py::object().ptr()) {
- return fallbackLogger;
- } else {
- return new LoggerWrapper(_captive, fallbackLogger);
- }
- }
-};
-
-static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& conf, py::object logger) {
- conf.setLogger(new LoggerWrapperFactory(logger));
- return conf;
-}
-
-void export_config() {
- using namespace boost::python;
-
- class_<ClientConfiguration>("ClientConfiguration")
- .def("authentication", &ClientConfiguration_setAuthentication, return_self<>())
- .def("operation_timeout_seconds", &ClientConfiguration::getOperationTimeoutSeconds)
- .def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, return_self<>())
- .def("connection_timeout", &ClientConfiguration::getConnectionTimeout)
- .def("connection_timeout", &ClientConfiguration::setConnectionTimeout, return_self<>())
- .def("io_threads", &ClientConfiguration::getIOThreads)
- .def("io_threads", &ClientConfiguration::setIOThreads, return_self<>())
- .def("message_listener_threads", &ClientConfiguration::getMessageListenerThreads)
- .def("message_listener_threads", &ClientConfiguration::setMessageListenerThreads, return_self<>())
- .def("concurrent_lookup_requests", &ClientConfiguration::getConcurrentLookupRequest)
- .def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, return_self<>())
- .def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath,
- return_value_policy<copy_const_reference>())
- .def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_self<>())
- .def("use_tls", &ClientConfiguration::isUseTls)
- .def("use_tls", &ClientConfiguration::setUseTls, return_self<>())
- .def("tls_trust_certs_file_path", &ClientConfiguration::getTlsTrustCertsFilePath,
- return_value_policy<copy_const_reference>())
- .def("tls_trust_certs_file_path", &ClientConfiguration::setTlsTrustCertsFilePath, return_self<>())
- .def("tls_allow_insecure_connection", &ClientConfiguration::isTlsAllowInsecureConnection)
- .def("tls_allow_insecure_connection", &ClientConfiguration::setTlsAllowInsecureConnection,
- return_self<>())
- .def("tls_validate_hostname", &ClientConfiguration::setValidateHostName, return_self<>())
- .def("listener_name", &ClientConfiguration::setListenerName, return_self<>())
- .def("set_logger", &ClientConfiguration_setLogger, return_self<>());
-
- class_<ProducerConfiguration>("ProducerConfiguration")
- .def("producer_name", &ProducerConfiguration::getProducerName,
- return_value_policy<copy_const_reference>())
- .def("producer_name", &ProducerConfiguration::setProducerName, return_self<>())
- .def("schema", &ProducerConfiguration::getSchema, return_value_policy<copy_const_reference>())
- .def("schema", &ProducerConfiguration::setSchema, return_self<>())
- .def("send_timeout_millis", &ProducerConfiguration::getSendTimeout)
- .def("send_timeout_millis", &ProducerConfiguration::setSendTimeout, return_self<>())
- .def("initial_sequence_id", &ProducerConfiguration::getInitialSequenceId)
- .def("initial_sequence_id", &ProducerConfiguration::setInitialSequenceId, return_self<>())
- .def("compression_type", &ProducerConfiguration::getCompressionType)
- .def("compression_type", &ProducerConfiguration::setCompressionType, return_self<>())
- .def("max_pending_messages", &ProducerConfiguration::getMaxPendingMessages)
- .def("max_pending_messages", &ProducerConfiguration::setMaxPendingMessages, return_self<>())
- .def("max_pending_messages_across_partitions",
- &ProducerConfiguration::getMaxPendingMessagesAcrossPartitions)
- .def("max_pending_messages_across_partitions",
- &ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>())
- .def("block_if_queue_full", &ProducerConfiguration::getBlockIfQueueFull)
- .def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>())
- .def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode)
- .def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>())
- .def("lazy_start_partitioned_producers", &ProducerConfiguration::getLazyStartPartitionedProducers)
- .def("lazy_start_partitioned_producers", &ProducerConfiguration::setLazyStartPartitionedProducers,
- return_self<>())
- .def("batching_enabled", &ProducerConfiguration::getBatchingEnabled,
- return_value_policy<copy_const_reference>())
- .def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>())
- .def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages,
- return_value_policy<copy_const_reference>())
- .def("batching_max_messages", &ProducerConfiguration::setBatchingMaxMessages, return_self<>())
- .def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::getBatchingMaxAllowedSizeInBytes,
- return_value_policy<copy_const_reference>())
- .def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::setBatchingMaxAllowedSizeInBytes,
- return_self<>())
- .def("batching_max_publish_delay_ms", &ProducerConfiguration::getBatchingMaxPublishDelayMs,
- return_value_policy<copy_const_reference>())
- .def("batching_max_publish_delay_ms", &ProducerConfiguration::setBatchingMaxPublishDelayMs,
- return_self<>())
- .def("chunking_enabled", &ProducerConfiguration::isChunkingEnabled)
- .def("chunking_enabled", &ProducerConfiguration::setChunkingEnabled, return_self<>())
- .def("property", &ProducerConfiguration::setProperty, return_self<>())
- .def("batching_type", &ProducerConfiguration::setBatchingType, return_self<>())
- .def("batching_type", &ProducerConfiguration::getBatchingType)
- .def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>())
- .def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>());
-
- class_<ConsumerConfiguration>("ConsumerConfiguration")
- .def("consumer_type", &ConsumerConfiguration::getConsumerType)
- .def("consumer_type", &ConsumerConfiguration::setConsumerType, return_self<>())
- .def("schema", &ConsumerConfiguration::getSchema, return_value_policy<copy_const_reference>())
- .def("schema", &ConsumerConfiguration::setSchema, return_self<>())
- .def("message_listener", &ConsumerConfiguration_setMessageListener, return_self<>())
- .def("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize)
- .def("receiver_queue_size", &ConsumerConfiguration::setReceiverQueueSize)
- .def("max_total_receiver_queue_size_across_partitions",
- &ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions)
- .def("max_total_receiver_queue_size_across_partitions",
- &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
- .def("consumer_name", &ConsumerConfiguration::getConsumerName,
- return_value_policy<copy_const_reference>())
- .def("consumer_name", &ConsumerConfiguration::setConsumerName)
- .def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
- .def("unacked_messages_timeout_ms", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs)
- .def("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::getNegativeAckRedeliveryDelayMs)
- .def("negative_ack_redelivery_delay_ms", &ConsumerConfiguration::setNegativeAckRedeliveryDelayMs)
- .def("broker_consumer_stats_cache_time_ms",
- &ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs)
- .def("broker_consumer_stats_cache_time_ms",
- &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
- .def("pattern_auto_discovery_period", &ConsumerConfiguration::getPatternAutoDiscoveryPeriod)
- .def("pattern_auto_discovery_period", &ConsumerConfiguration::setPatternAutoDiscoveryPeriod)
- .def("read_compacted", &ConsumerConfiguration::isReadCompacted)
- .def("read_compacted", &ConsumerConfiguration::setReadCompacted)
- .def("property", &ConsumerConfiguration::setProperty, return_self<>())
- .def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition)
- .def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition)
- .def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, return_self<>())
- .def("replicate_subscription_state_enabled",
- &ConsumerConfiguration::setReplicateSubscriptionStateEnabled)
- .def("replicate_subscription_state_enabled",
- &ConsumerConfiguration::isReplicateSubscriptionStateEnabled)
- .def("max_pending_chunked_message", &ConsumerConfiguration::getMaxPendingChunkedMessage)
- .def("max_pending_chunked_message", &ConsumerConfiguration::setMaxPendingChunkedMessage,
- return_self<>())
- .def("auto_ack_oldest_chunked_message_on_queue_full",
- &ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull)
- .def("auto_ack_oldest_chunked_message_on_queue_full",
- &ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull, return_self<>());
-
- class_<ReaderConfiguration>("ReaderConfiguration")
- .def("reader_listener", &ReaderConfiguration_setReaderListener, return_self<>())
- .def("schema", &ReaderConfiguration::getSchema, return_value_policy<copy_const_reference>())
- .def("schema", &ReaderConfiguration::setSchema, return_self<>())
- .def("receiver_queue_size", &ReaderConfiguration::getReceiverQueueSize)
- .def("receiver_queue_size", &ReaderConfiguration::setReceiverQueueSize)
- .def("reader_name", &ReaderConfiguration::getReaderName, return_value_policy<copy_const_reference>())
- .def("reader_name", &ReaderConfiguration::setReaderName)
- .def("subscription_role_prefix", &ReaderConfiguration::getSubscriptionRolePrefix,
- return_value_policy<copy_const_reference>())
- .def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix)
- .def("read_compacted", &ReaderConfiguration::isReadCompacted)
- .def("read_compacted", &ReaderConfiguration::setReadCompacted)
- .def("crypto_key_reader", &ReaderConfiguration_setCryptoKeyReader, return_self<>());
-}
diff --git a/python/src/consumer.cc b/python/src/consumer.cc
deleted file mode 100644
index 811ceb3..0000000
--- a/python/src/consumer.cc
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "utils.h"
-
-void Consumer_unsubscribe(Consumer& consumer) {
- waitForAsyncResult([&consumer](ResultCallback callback) { consumer.unsubscribeAsync(callback); });
-}
-
-Message Consumer_receive(Consumer& consumer) {
- Message msg;
-
- waitForAsyncValue(std::function<void(ReceiveCallback)>(
- [&consumer](ReceiveCallback callback) { consumer.receiveAsync(callback); }),
- msg);
-
- return msg;
-}
-
-Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) {
- Message msg;
- Result res;
- Py_BEGIN_ALLOW_THREADS res = consumer.receive(msg, timeoutMs);
- Py_END_ALLOW_THREADS
-
- CHECK_RESULT(res);
- return msg;
-}
-
-void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }
-
-void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
- consumer.acknowledgeAsync(msgId, nullptr);
-}
-
-void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
- consumer.negativeAcknowledge(msg);
-}
-
-void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
- consumer.negativeAcknowledge(msgId);
-}
-
-void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
- consumer.acknowledgeCumulativeAsync(msg, nullptr);
-}
-
-void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
- consumer.acknowledgeCumulativeAsync(msgId, nullptr);
-}
-
-void Consumer_close(Consumer& consumer) {
- waitForAsyncResult([&consumer](ResultCallback callback) { consumer.closeAsync(callback); });
-}
-
-void Consumer_pauseMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.pauseMessageListener()); }
-
-void Consumer_resumeMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.resumeMessageListener()); }
-
-void Consumer_seek(Consumer& consumer, const MessageId& msgId) {
- waitForAsyncResult([msgId, &consumer](ResultCallback callback) { consumer.seekAsync(msgId, callback); });
-}
-
-void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) {
- waitForAsyncResult(
- [timestamp, &consumer](ResultCallback callback) { consumer.seekAsync(timestamp, callback); });
-}
-
-bool Consumer_is_connected(Consumer& consumer) { return consumer.isConnected(); }
-
-MessageId Consumer_get_last_message_id(Consumer& consumer) {
- MessageId msgId;
- Result res;
- Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId);
- Py_END_ALLOW_THREADS
-
- CHECK_RESULT(res);
- return msgId;
-}
-
-void export_consumer() {
- using namespace boost::python;
-
- class_<Consumer>("Consumer", no_init)
- .def("topic", &Consumer::getTopic, "return the topic this consumer is subscribed to",
- return_value_policy<copy_const_reference>())
- .def("subscription_name", &Consumer::getSubscriptionName, return_value_policy<copy_const_reference>())
- .def("unsubscribe", &Consumer_unsubscribe)
- .def("receive", &Consumer_receive)
- .def("receive", &Consumer_receive_timeout)
- .def("acknowledge", &Consumer_acknowledge)
- .def("acknowledge", &Consumer_acknowledge_message_id)
- .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative)
- .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative_message_id)
- .def("negative_acknowledge", &Consumer_negative_acknowledge)
- .def("negative_acknowledge", &Consumer_negative_acknowledge_message_id)
- .def("close", &Consumer_close)
- .def("pause_message_listener", &Consumer_pauseMessageListener)
- .def("resume_message_listener", &Consumer_resumeMessageListener)
- .def("redeliver_unacknowledged_messages", &Consumer::redeliverUnacknowledgedMessages)
- .def("seek", &Consumer_seek)
- .def("seek", &Consumer_seek_timestamp)
- .def("is_connected", &Consumer_is_connected)
- .def("get_last_message_id", &Consumer_get_last_message_id);
-}
diff --git a/python/src/cryptoKeyReader.cc b/python/src/cryptoKeyReader.cc
deleted file mode 100644
index 2c46b6f..0000000
--- a/python/src/cryptoKeyReader.cc
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "utils.h"
-
-CryptoKeyReaderWrapper::CryptoKeyReaderWrapper() {}
-
-CryptoKeyReaderWrapper::CryptoKeyReaderWrapper(const std::string& publicKeyPath,
- const std::string& privateKeyPath) {
- this->cryptoKeyReader = DefaultCryptoKeyReader::create(publicKeyPath, privateKeyPath);
-}
-
-void export_cryptoKeyReader() {
- using namespace boost::python;
-
- class_<CryptoKeyReaderWrapper>("CryptoKeyReader", init<const std::string&, const std::string&>());
-}
\ No newline at end of file
diff --git a/python/src/enums.cc b/python/src/enums.cc
deleted file mode 100644
index 92f08a1..0000000
--- a/python/src/enums.cc
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "utils.h"
-
-void export_enums() {
- using namespace boost::python;
-
- enum_<ProducerConfiguration::PartitionsRoutingMode>("PartitionsRoutingMode")
- .value("UseSinglePartition", ProducerConfiguration::UseSinglePartition)
- .value("RoundRobinDistribution", ProducerConfiguration::RoundRobinDistribution)
- .value("CustomPartition", ProducerConfiguration::CustomPartition);
-
- enum_<CompressionType>("CompressionType")
- .value("NONE", CompressionNone) // Don't use 'None' since it's a keyword in py3
- .value("LZ4", CompressionLZ4)
- .value("ZLib", CompressionZLib)
- .value("ZSTD", CompressionZSTD)
- .value("SNAPPY", CompressionSNAPPY);
-
- enum_<ConsumerType>("ConsumerType")
- .value("Exclusive", ConsumerExclusive)
- .value("Shared", ConsumerShared)
- .value("Failover", ConsumerFailover)
- .value("KeyShared", ConsumerKeyShared);
-
- enum_<Result>("Result", "Collection of return codes")
- .value("Ok", ResultOk)
- .value("UnknownError", ResultUnknownError)
- .value("InvalidConfiguration", ResultInvalidConfiguration)
- .value("Timeout", ResultTimeout)
- .value("LookupError", ResultLookupError)
- .value("ConnectError", ResultConnectError)
- .value("ReadError", ResultReadError)
- .value("AuthenticationError", ResultAuthenticationError)
- .value("AuthorizationError", ResultAuthorizationError)
- .value("ErrorGettingAuthenticationData", ResultErrorGettingAuthenticationData)
- .value("BrokerMetadataError", ResultBrokerMetadataError)
- .value("BrokerPersistenceError", ResultBrokerPersistenceError)
- .value("ChecksumError", ResultChecksumError)
- .value("ConsumerBusy", ResultConsumerBusy)
- .value("NotConnected", ResultNotConnected)
- .value("AlreadyClosed", ResultAlreadyClosed)
- .value("InvalidMessage", ResultInvalidMessage)
- .value("ConsumerNotInitialized", ResultConsumerNotInitialized)
- .value("ProducerNotInitialized", ResultProducerNotInitialized)
- .value("ProducerBusy", ResultProducerBusy)
- .value("TooManyLookupRequestException", ResultTooManyLookupRequestException)
- .value("InvalidTopicName", ResultInvalidTopicName)
- .value("InvalidUrl", ResultInvalidUrl)
- .value("ServiceUnitNotReady", ResultServiceUnitNotReady)
- .value("OperationNotSupported", ResultOperationNotSupported)
- .value("ProducerBlockedQuotaExceededError", ResultProducerBlockedQuotaExceededError)
- .value("ProducerBlockedQuotaExceededException", ResultProducerBlockedQuotaExceededException)
- .value("ProducerQueueIsFull", ResultProducerQueueIsFull)
- .value("MessageTooBig", ResultMessageTooBig)
- .value("TopicNotFound", ResultTopicNotFound)
- .value("SubscriptionNotFound", ResultSubscriptionNotFound)
- .value("ConsumerNotFound", ResultConsumerNotFound)
- .value("UnsupportedVersionError", ResultUnsupportedVersionError)
- .value("TopicTerminated", ResultTopicTerminated)
- .value("CryptoError", ResultCryptoError)
- .value("IncompatibleSchema", ResultIncompatibleSchema)
- .value("ConsumerAssignError", ResultConsumerAssignError)
- .value("CumulativeAcknowledgementNotAllowedError", ResultCumulativeAcknowledgementNotAllowedError)
- .value("TransactionCoordinatorNotFoundError", ResultTransactionCoordinatorNotFoundError)
- .value("InvalidTxnStatusError", ResultInvalidTxnStatusError)
- .value("NotAllowedError", ResultNotAllowedError)
- .value("TransactionConflict", ResultTransactionConflict)
- .value("TransactionNotFound", ResultTransactionNotFound)
- .value("ProducerFenced", ResultProducerFenced)
- .value("MemoryBufferIsFull", ResultMemoryBufferIsFull)
- .value("Interrupted", pulsar::ResultInterrupted);
-
- enum_<SchemaType>("SchemaType", "Supported schema types")
- .value("NONE", pulsar::NONE)
- .value("STRING", pulsar::STRING)
- .value("INT8", pulsar::INT8)
- .value("INT16", pulsar::INT16)
- .value("INT32", pulsar::INT32)
- .value("INT64", pulsar::INT64)
- .value("FLOAT", pulsar::FLOAT)
- .value("DOUBLE", pulsar::DOUBLE)
- .value("BYTES", pulsar::BYTES)
- .value("JSON", pulsar::JSON)
- .value("PROTOBUF", pulsar::PROTOBUF)
- .value("AVRO", pulsar::AVRO)
- .value("AUTO_CONSUME", pulsar::AUTO_CONSUME)
- .value("AUTO_PUBLISH", pulsar::AUTO_PUBLISH)
- .value("KEY_VALUE", pulsar::KEY_VALUE);
-
- enum_<InitialPosition>("InitialPosition", "Supported initial position")
- .value("Latest", InitialPositionLatest)
- .value("Earliest", InitialPositionEarliest);
-
- enum_<ProducerConfiguration::BatchingType>("BatchingType", "Supported batching types")
- .value("Default", ProducerConfiguration::DefaultBatching)
- .value("KeyBased", ProducerConfiguration::KeyBasedBatching);
-}
diff --git a/python/src/exceptions.cc b/python/src/exceptions.cc
deleted file mode 100644
index efca661..0000000
--- a/python/src/exceptions.cc
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include <map>
-
-#include "utils.h"
-
-static PyObject* basePulsarException = nullptr;
-std::map<Result, PyObject*> exceptions;
-
-PyObject* createExceptionClass(const char* name, PyObject* baseTypeObj = PyExc_Exception) {
- using namespace boost::python;
-
- std::string fullName = "_pulsar.";
- fullName += name;
-
- PyObject* typeObj = PyErr_NewException(const_cast<char*>(fullName.c_str()), baseTypeObj, nullptr);
- if (!typeObj) throw_error_already_set();
- scope().attr(name) = handle<>(borrowed(typeObj));
- return typeObj;
-}
-
-PyObject* get_exception_class(Result result) {
- auto it = exceptions.find(result);
- if (it != exceptions.end()) {
- return it->second;
- } else {
- std::cerr << "Error result exception not found: " << result << std::endl;
- abort();
- }
-}
-
-void export_exceptions() {
- using namespace boost::python;
-
- basePulsarException = createExceptionClass("PulsarException");
-
- exceptions[ResultUnknownError] = createExceptionClass("UnknownError", basePulsarException);
- exceptions[ResultInvalidConfiguration] =
- createExceptionClass("InvalidConfiguration", basePulsarException);
- exceptions[ResultTimeout] = createExceptionClass("Timeout", basePulsarException);
- exceptions[ResultLookupError] = createExceptionClass("LookupError", basePulsarException);
- exceptions[ResultConnectError] = createExceptionClass("ConnectError", basePulsarException);
- exceptions[ResultReadError] = createExceptionClass("ReadError", basePulsarException);
- exceptions[ResultAuthenticationError] = createExceptionClass("AuthenticationError", basePulsarException);
- exceptions[ResultAuthorizationError] = createExceptionClass("AuthorizationError", basePulsarException);
- exceptions[ResultErrorGettingAuthenticationData] =
- createExceptionClass("ErrorGettingAuthenticationData", basePulsarException);
- exceptions[ResultBrokerMetadataError] = createExceptionClass("BrokerMetadataError", basePulsarException);
- exceptions[ResultBrokerPersistenceError] =
- createExceptionClass("BrokerPersistenceError", basePulsarException);
- exceptions[ResultChecksumError] = createExceptionClass("ChecksumError", basePulsarException);
- exceptions[ResultConsumerBusy] = createExceptionClass("ConsumerBusy", basePulsarException);
- exceptions[ResultNotConnected] = createExceptionClass("NotConnected", basePulsarException);
- exceptions[ResultAlreadyClosed] = createExceptionClass("AlreadyClosed", basePulsarException);
- exceptions[ResultInvalidMessage] = createExceptionClass("InvalidMessage", basePulsarException);
- exceptions[ResultConsumerNotInitialized] =
- createExceptionClass("ConsumerNotInitialized", basePulsarException);
- exceptions[ResultProducerNotInitialized] =
- createExceptionClass("ProducerNotInitialized", basePulsarException);
- exceptions[ResultProducerBusy] = createExceptionClass("ProducerBusy", basePulsarException);
- exceptions[ResultTooManyLookupRequestException] =
- createExceptionClass("TooManyLookupRequestException", basePulsarException);
- exceptions[ResultInvalidTopicName] = createExceptionClass("InvalidTopicName", basePulsarException);
- exceptions[ResultInvalidUrl] = createExceptionClass("InvalidUrl", basePulsarException);
- exceptions[ResultServiceUnitNotReady] = createExceptionClass("ServiceUnitNotReady", basePulsarException);
- exceptions[ResultOperationNotSupported] =
- createExceptionClass("OperationNotSupported", basePulsarException);
- exceptions[ResultProducerBlockedQuotaExceededError] =
- createExceptionClass("ProducerBlockedQuotaExceededError", basePulsarException);
- exceptions[ResultProducerBlockedQuotaExceededException] =
- createExceptionClass("ProducerBlockedQuotaExceededException", basePulsarException);
- exceptions[ResultProducerQueueIsFull] = createExceptionClass("ProducerQueueIsFull", basePulsarException);
- exceptions[ResultMessageTooBig] = createExceptionClass("MessageTooBig", basePulsarException);
- exceptions[ResultTopicNotFound] = createExceptionClass("TopicNotFound", basePulsarException);
- exceptions[ResultSubscriptionNotFound] =
- createExceptionClass("SubscriptionNotFound", basePulsarException);
- exceptions[ResultConsumerNotFound] = createExceptionClass("ConsumerNotFound", basePulsarException);
- exceptions[ResultUnsupportedVersionError] =
- createExceptionClass("UnsupportedVersionError", basePulsarException);
- exceptions[ResultTopicTerminated] = createExceptionClass("TopicTerminated", basePulsarException);
- exceptions[ResultCryptoError] = createExceptionClass("CryptoError", basePulsarException);
- exceptions[ResultIncompatibleSchema] = createExceptionClass("IncompatibleSchema", basePulsarException);
- exceptions[ResultConsumerAssignError] = createExceptionClass("ConsumerAssignError", basePulsarException);
- exceptions[ResultCumulativeAcknowledgementNotAllowedError] =
- createExceptionClass("CumulativeAcknowledgementNotAllowedError", basePulsarException);
- exceptions[ResultTransactionCoordinatorNotFoundError] =
- createExceptionClass("TransactionCoordinatorNotFoundError", basePulsarException);
- exceptions[ResultInvalidTxnStatusError] =
- createExceptionClass("InvalidTxnStatusError", basePulsarException);
- exceptions[ResultNotAllowedError] = createExceptionClass("NotAllowedError", basePulsarException);
- exceptions[ResultTransactionConflict] = createExceptionClass("TransactionConflict", basePulsarException);
- exceptions[ResultTransactionNotFound] = createExceptionClass("TransactionNotFound", basePulsarException);
- exceptions[ResultProducerFenced] = createExceptionClass("ProducerFenced", basePulsarException);
- exceptions[ResultMemoryBufferIsFull] = createExceptionClass("MemoryBufferIsFull", basePulsarException);
- exceptions[ResultInterrupted] = createExceptionClass("Interrupted", basePulsarException);
-}
diff --git a/python/src/message.cc b/python/src/message.cc
deleted file mode 100644
index b93380b..0000000
--- a/python/src/message.cc
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "utils.h"
-
-#include <datetime.h>
-#include <boost/python/suite/indexing/map_indexing_suite.hpp>
-#include <boost/python/suite/indexing/vector_indexing_suite.hpp>
-
-std::string MessageId_str(const MessageId& msgId) {
- std::stringstream ss;
- ss << msgId;
- return ss.str();
-}
-
-bool MessageId_eq(const MessageId& a, const MessageId& b) { return a == b; }
-
-bool MessageId_ne(const MessageId& a, const MessageId& b) { return a != b; }
-
-bool MessageId_lt(const MessageId& a, const MessageId& b) { return a < b; }
-
-bool MessageId_le(const MessageId& a, const MessageId& b) { return a <= b; }
-
-bool MessageId_gt(const MessageId& a, const MessageId& b) { return a > b; }
-
-bool MessageId_ge(const MessageId& a, const MessageId& b) { return a >= b; }
-
-boost::python::object MessageId_serialize(const MessageId& msgId) {
- std::string serialized;
- msgId.serialize(serialized);
- return boost::python::object(
- boost::python::handle<>(PyBytes_FromStringAndSize(serialized.c_str(), serialized.length())));
-}
-
-std::string Message_str(const Message& msg) {
- std::stringstream ss;
- ss << msg;
- return ss.str();
-}
-
-boost::python::object Message_data(const Message& msg) {
- return boost::python::object(
- boost::python::handle<>(PyBytes_FromStringAndSize((const char*)msg.getData(), msg.getLength())));
-}
-
-boost::python::object Message_properties(const Message& msg) {
- boost::python::dict pyProperties;
- for (const auto& item : msg.getProperties()) {
- pyProperties[item.first] = item.second;
- }
- return boost::python::object(std::move(pyProperties));
-}
-
-std::string Topic_name_str(const Message& msg) {
- std::stringstream ss;
- ss << msg.getTopicName();
- return ss.str();
-}
-
-std::string schema_version_str(const Message& msg) {
- std::stringstream ss;
- ss << msg.getSchemaVersion();
- return ss.str();
-}
-
-const MessageId& Message_getMessageId(const Message& msg) { return msg.getMessageId(); }
-
-void deliverAfter(MessageBuilder* const builder, PyObject* obj_delta) {
- PyDateTime_Delta const* pydelta = reinterpret_cast<PyDateTime_Delta*>(obj_delta);
-
- long days = pydelta->days;
- const bool is_negative = days < 0;
- if (is_negative) {
- days = -days;
- }
-
- // Create chrono duration object
- std::chrono::milliseconds duration = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::hours(24) * days + std::chrono::seconds(pydelta->seconds) +
- std::chrono::microseconds(pydelta->microseconds));
-
- if (is_negative) {
- duration = duration * -1;
- }
-
- builder->setDeliverAfter(duration);
-}
-
-void export_message() {
- using namespace boost::python;
-
- PyDateTime_IMPORT;
-
- MessageBuilder& (MessageBuilder::*MessageBuilderSetContentString)(const std::string&) =
- &MessageBuilder::setContent;
-
- class_<MessageBuilder, boost::noncopyable>("MessageBuilder")
- .def("content", MessageBuilderSetContentString, return_self<>())
- .def("property", &MessageBuilder::setProperty, return_self<>())
- .def("properties", &MessageBuilder::setProperties, return_self<>())
- .def("sequence_id", &MessageBuilder::setSequenceId, return_self<>())
- .def("deliver_after", &deliverAfter, return_self<>())
- .def("deliver_at", &MessageBuilder::setDeliverAt, return_self<>())
- .def("partition_key", &MessageBuilder::setPartitionKey, return_self<>())
- .def("event_timestamp", &MessageBuilder::setEventTimestamp, return_self<>())
- .def("replication_clusters", &MessageBuilder::setReplicationClusters, return_self<>())
- .def("disable_replication", &MessageBuilder::disableReplication, return_self<>())
- .def("build", &MessageBuilder::build);
-
- class_<Message::StringMap>("MessageStringMap").def(map_indexing_suite<Message::StringMap>());
-
- static const MessageId& _MessageId_earliest = MessageId::earliest();
- static const MessageId& _MessageId_latest = MessageId::latest();
-
- class_<MessageId>("MessageId")
- .def(init<int32_t, int64_t, int64_t, int32_t>())
- .def("__str__", &MessageId_str)
- .def("__eq__", &MessageId_eq)
- .def("__ne__", &MessageId_ne)
- .def("__le__", &MessageId_le)
- .def("__lt__", &MessageId_lt)
- .def("__ge__", &MessageId_ge)
- .def("__gt__", &MessageId_gt)
- .def("ledger_id", &MessageId::ledgerId)
- .def("entry_id", &MessageId::entryId)
- .def("batch_index", &MessageId::batchIndex)
- .def("partition", &MessageId::partition)
- .add_static_property("earliest", make_getter(&_MessageId_earliest))
- .add_static_property("latest", make_getter(&_MessageId_latest))
- .def("serialize", &MessageId_serialize)
- .def("deserialize", &MessageId::deserialize)
- .staticmethod("deserialize");
-
- class_<Message>("Message")
- .def("properties", &Message_properties)
- .def("data", &Message_data)
- .def("length", &Message::getLength)
- .def("partition_key", &Message::getPartitionKey, return_value_policy<copy_const_reference>())
- .def("publish_timestamp", &Message::getPublishTimestamp)
- .def("event_timestamp", &Message::getEventTimestamp)
- .def("message_id", &Message_getMessageId, return_value_policy<copy_const_reference>())
- .def("__str__", &Message_str)
- .def("topic_name", &Topic_name_str)
- .def("redelivery_count", &Message::getRedeliveryCount)
- .def("schema_version", &schema_version_str);
-
- MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload,
- uint32_t batchSize) = &MessageBatch::parseFrom;
-
- class_<MessageBatch>("MessageBatch")
- .def("with_message_id", &MessageBatch::withMessageId, return_self<>())
- .def("parse_from", MessageBatchParseFromString, return_self<>())
- .def("messages", &MessageBatch::messages, return_value_policy<copy_const_reference>());
-
- class_<std::vector<Message> >("Messages").def(vector_indexing_suite<std::vector<Message> >());
-}
diff --git a/python/src/producer.cc b/python/src/producer.cc
deleted file mode 100644
index d1a11cf..0000000
--- a/python/src/producer.cc
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "utils.h"
-
-#include <functional>
-
-extern boost::python::object MessageId_serialize(const MessageId& msgId);
-
-boost::python::object Producer_send(Producer& producer, const Message& message) {
- MessageId messageId;
-
- waitForAsyncValue(std::function<void(SendCallback)>(
- [&](SendCallback callback) { producer.sendAsync(message, callback); }),
- messageId);
-
- return MessageId_serialize(messageId);
-}
-
-void Producer_sendAsyncCallback(PyObject* callback, Result res, const MessageId& msgId) {
- if (callback == Py_None) {
- return;
- }
-
- PyGILState_STATE state = PyGILState_Ensure();
-
- try {
- py::call<void>(callback, res, py::object(&msgId));
- } catch (const py::error_already_set& e) {
- PyErr_Print();
- }
-
- Py_XDECREF(callback);
- PyGILState_Release(state);
-}
-
-void Producer_sendAsync(Producer& producer, const Message& message, py::object callback) {
- PyObject* pyCallback = callback.ptr();
- Py_XINCREF(pyCallback);
-
- Py_BEGIN_ALLOW_THREADS producer.sendAsync(
- message,
- std::bind(Producer_sendAsyncCallback, pyCallback, std::placeholders::_1, std::placeholders::_2));
- Py_END_ALLOW_THREADS
-}
-
-void Producer_flush(Producer& producer) {
- waitForAsyncResult([&](ResultCallback callback) { producer.flushAsync(callback); });
-}
-
-void Producer_close(Producer& producer) {
- waitForAsyncResult([&](ResultCallback callback) { producer.closeAsync(callback); });
-}
-
-bool Producer_is_connected(Producer& producer) { return producer.isConnected(); }
-
-void export_producer() {
- using namespace boost::python;
-
- class_<Producer>("Producer", no_init)
- .def("topic", &Producer::getTopic, "return the topic to which producer is publishing to",
- return_value_policy<copy_const_reference>())
- .def("producer_name", &Producer::getProducerName,
- "return the producer name which could have been assigned by the system or specified by the "
- "client",
- return_value_policy<copy_const_reference>())
- .def("last_sequence_id", &Producer::getLastSequenceId)
- .def("send", &Producer_send,
- "Publish a message on the topic associated with this Producer.\n"
- "\n"
- "This method will block until the message will be accepted and persisted\n"
- "by the broker. In case of errors, the client library will try to\n"
- "automatically recover and use a different broker.\n"
- "\n"
- "If it wasn't possible to successfully publish the message within the sendTimeout,\n"
- "an error will be returned.\n"
- "\n"
- "This method is equivalent to asyncSend() and wait until the callback is triggered.\n"
- "\n"
- "@param msg message to publish\n")
- .def("send_async", &Producer_sendAsync)
- .def("flush", &Producer_flush,
- "Flush all the messages buffered in the client and wait until all messages have been\n"
- "successfully persisted\n")
- .def("close", &Producer_close)
- .def("is_connected", &Producer_is_connected);
-}
diff --git a/python/src/pulsar.cc b/python/src/pulsar.cc
deleted file mode 100644
index a82a533..0000000
--- a/python/src/pulsar.cc
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "utils.h"
-
-void export_client();
-void export_message();
-void export_producer();
-void export_consumer();
-void export_reader();
-void export_config();
-void export_enums();
-void export_authentication();
-void export_schema();
-void export_cryptoKeyReader();
-void export_exceptions();
-
-PyObject* get_exception_class(Result result);
-
-static void translateException(const PulsarException& ex) {
- std::string err = "Pulsar error: ";
- err += strResult(ex._result);
- PyErr_SetString(get_exception_class(ex._result), err.c_str());
-}
-
-BOOST_PYTHON_MODULE(_pulsar) {
- py::register_exception_translator<PulsarException>(translateException);
-
- // Initialize thread support so that we can grab the GIL mutex
- // from pulsar library threads
- PyEval_InitThreads();
-
- export_client();
- export_message();
- export_producer();
- export_consumer();
- export_reader();
- export_config();
- export_enums();
- export_authentication();
- export_schema();
- export_cryptoKeyReader();
- export_exceptions();
-}
diff --git a/python/src/reader.cc b/python/src/reader.cc
deleted file mode 100644
index 70873c8..0000000
--- a/python/src/reader.cc
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "utils.h"
-
-Message Reader_readNext(Reader& reader) {
- Message msg;
- Result res;
-
- // TODO: There is currently no readNextAsync() version for the Reader.
- // Once that's available, we should also convert these ad-hoc loops.
- while (true) {
- Py_BEGIN_ALLOW_THREADS
- // Use 100ms timeout to periodically check whether the
- // interpreter was interrupted
- res = reader.readNext(msg, 100);
- Py_END_ALLOW_THREADS
-
- if (res != ResultTimeout) {
- // In case of timeout we keep calling receive() to simulate a
- // blocking call until a message is available, while breaking
- // every once in a while to check the Python signal status
- break;
- }
-
- if (PyErr_CheckSignals() == -1) {
- PyErr_SetInterrupt();
- return msg;
- }
- }
-
- CHECK_RESULT(res);
- return msg;
-}
-
-Message Reader_readNextTimeout(Reader& reader, int timeoutMs) {
- Message msg;
- Result res;
- Py_BEGIN_ALLOW_THREADS res = reader.readNext(msg, timeoutMs);
- Py_END_ALLOW_THREADS
-
- CHECK_RESULT(res);
- return msg;
-}
-
-bool Reader_hasMessageAvailable(Reader& reader) {
- bool available = false;
-
- waitForAsyncValue(
- std::function<void(HasMessageAvailableCallback)>(
- [&](HasMessageAvailableCallback callback) { reader.hasMessageAvailableAsync(callback); }),
- available);
-
- return available;
-}
-
-void Reader_close(Reader& reader) {
- waitForAsyncResult([&](ResultCallback callback) { reader.closeAsync(callback); });
-}
-
-void Reader_seek(Reader& reader, const MessageId& msgId) {
- waitForAsyncResult([&](ResultCallback callback) { reader.seekAsync(msgId, callback); });
-}
-
-void Reader_seek_timestamp(Reader& reader, uint64_t timestamp) {
- waitForAsyncResult([&](ResultCallback callback) { reader.seekAsync(timestamp, callback); });
-}
-
-bool Reader_is_connected(Reader& reader) { return reader.isConnected(); }
-
-void export_reader() {
- using namespace boost::python;
-
- class_<Reader>("Reader", no_init)
- .def("topic", &Reader::getTopic, return_value_policy<copy_const_reference>())
- .def("read_next", &Reader_readNext)
- .def("read_next", &Reader_readNextTimeout)
- .def("has_message_available", &Reader_hasMessageAvailable)
- .def("close", &Reader_close)
- .def("seek", &Reader_seek)
- .def("seek", &Reader_seek_timestamp)
- .def("is_connected", &Reader_is_connected);
-}
diff --git a/python/src/schema.cc b/python/src/schema.cc
deleted file mode 100644
index cdfcda6..0000000
--- a/python/src/schema.cc
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "utils.h"
-
-void export_schema() {
- using namespace boost::python;
-
- class_<SchemaInfo>("SchemaInfo", init<SchemaType, const std::string&, const std::string&>())
- .def("schema_type", &SchemaInfo::getSchemaType)
- .def("name", &SchemaInfo::getName, return_value_policy<copy_const_reference>())
- .def("schema", &SchemaInfo::getSchema, return_value_policy<copy_const_reference>());
-}
diff --git a/python/src/utils.cc b/python/src/utils.cc
deleted file mode 100644
index cf8f6f4..0000000
--- a/python/src/utils.cc
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "utils.h"
-
-void waitForAsyncResult(std::function<void(ResultCallback)> func) {
- Result res = ResultOk;
- bool b;
- Promise<bool, Result> promise;
- Future<bool, Result> future = promise.getFuture();
-
- Py_BEGIN_ALLOW_THREADS func(WaitForCallback(promise));
- Py_END_ALLOW_THREADS
-
- bool isComplete;
- while (true) {
- // Check periodically for Python signals
- Py_BEGIN_ALLOW_THREADS isComplete = future.get(b, std::ref(res), std::chrono::milliseconds(100));
- Py_END_ALLOW_THREADS
-
- if (isComplete) {
- CHECK_RESULT(res);
- return;
- }
-
- if (PyErr_CheckSignals() == -1) {
- PyErr_SetInterrupt();
- return;
- }
- }
-}
diff --git a/python/src/utils.h b/python/src/utils.h
deleted file mode 100644
index 4b69ff8..0000000
--- a/python/src/utils.h
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include <boost/python.hpp>
-
-#include <pulsar/Client.h>
-#include <pulsar/MessageBatch.h>
-#include <lib/Utils.h>
-
-using namespace pulsar;
-
-namespace py = boost::python;
-
-struct PulsarException {
- Result _result;
- PulsarException(Result res) : _result(res) {}
-};
-
-inline void CHECK_RESULT(Result res) {
- if (res != ResultOk) {
- throw PulsarException(res);
- }
-}
-
-void waitForAsyncResult(std::function<void(ResultCallback)> func);
-
-template <typename T, typename Callback>
-inline void waitForAsyncValue(std::function<void(Callback)> func, T& value) {
- Result res = ResultOk;
- Promise<Result, T> promise;
- Future<Result, T> future = promise.getFuture();
-
- Py_BEGIN_ALLOW_THREADS func(WaitForCallbackValue<T>(promise));
- Py_END_ALLOW_THREADS
-
- bool isComplete;
- while (true) {
- // Check periodically for Python signals
- Py_BEGIN_ALLOW_THREADS isComplete = future.get(res, std::ref(value), std::chrono::milliseconds(100));
- Py_END_ALLOW_THREADS
-
- if (isComplete) {
- CHECK_RESULT(res);
- return;
- }
-
- if (PyErr_CheckSignals() == -1) {
- PyErr_SetInterrupt();
- return;
- }
- }
-}
-
-struct AuthenticationWrapper {
- AuthenticationPtr auth;
-
- AuthenticationWrapper();
- AuthenticationWrapper(const std::string& dynamicLibPath, const std::string& authParamsString);
-};
-
-struct CryptoKeyReaderWrapper {
- CryptoKeyReaderPtr cryptoKeyReader;
-
- CryptoKeyReaderWrapper();
- CryptoKeyReaderWrapper(const std::string& publicKeyPath, const std::string& privateKeyPath);
-};
-
-class CaptivePythonObjectMixin {
- protected:
- PyObject* _captive;
-
- CaptivePythonObjectMixin(PyObject* captive) {
- _captive = captive;
- PyGILState_STATE state = PyGILState_Ensure();
- Py_XINCREF(_captive);
- PyGILState_Release(state);
- }
-
- ~CaptivePythonObjectMixin() {
- if (Py_IsInitialized()) {
- PyGILState_STATE state = PyGILState_Ensure();
- Py_XDECREF(_captive);
- PyGILState_Release(state);
- }
- }
-};
diff --git a/python/test_consumer.py b/python/test_consumer.py
deleted file mode 100755
index 8c2985e..0000000
--- a/python/test_consumer.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#!/usr/bin/env python3
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-
-import pulsar
-
-client = pulsar.Client('pulsar://localhost:6650')
-consumer = client.subscribe('my-topic', "my-subscription",
- properties={
- "consumer-name": "test-consumer-name",
- "consumer-id": "test-consumer-id"
- })
-
-while True:
- msg = consumer.receive()
- print("Received message '{0}' id='{1}'".format(msg.data().decode('utf-8'), msg.message_id()))
- consumer.acknowledge(msg)
-
-client.close()
diff --git a/python/test_producer.py b/python/test_producer.py
deleted file mode 100755
index c9c8ca1..0000000
--- a/python/test_producer.py
+++ /dev/null
@@ -1,46 +0,0 @@
-#!/usr/bin/env python3
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from pulsar import BatchingType
-import pulsar
-
-
-client = pulsar.Client('pulsar://localhost:6650')
-
-producer = client.create_producer(
- 'my-topic',
- block_if_queue_full=True,
- batching_enabled=True,
- batching_max_publish_delay_ms=10,
- properties={
- "producer-name": "test-producer-name",
- "producer-id": "test-producer-id"
- },
- batching_type=BatchingType.KeyBased
- )
-
-for i in range(10):
- try:
- producer.send('hello'.encode('utf-8'), None)
- except Exception as e:
- print("Failed to send message: %s", e)
-
-producer.flush()
-producer.close()