You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by li...@apache.org on 2021/04/15 07:24:47 UTC

[pulsar] branch asf-site updated: [Doc] show multiple versions for various API docs

This is an automated email from the ASF dual-hosted git repository.

liuyu pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new cc84f7e  [Doc] show multiple versions for various API docs
cc84f7e is described below

commit cc84f7e6042c41e2757db3ae42400fb1f058bae4
Author: Anonymitaet <anonymitaet_hotmail.com>
AuthorDate: Thu Apr 15 15:23:30 2021 +0800

    [Doc] show multiple versions for various API docs
---
 content/api/admin/index.html            |   75 -
 content/api/client/index.html           |   72 -
 content/api/cpp/index.html              |  124 -
 content/api/pulsar-functions/index.html |   75 -
 content/api/python/index.html           | 6459 -------------------------------
 5 files changed, 6805 deletions(-)

diff --git a/content/api/admin/index.html b/content/api/admin/index.html
deleted file mode 100644
index 3b19c5d..0000000
--- a/content/api/admin/index.html
+++ /dev/null
@@ -1,75 +0,0 @@
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Frameset//EN" "http://www.w3.org/TR/html4/frameset.dtd">
-<!-- NewPage -->
-<html lang="en">
-<head>
-<!-- Generated by javadoc -->
-<title>Pulsar Admin Java API</title>
-<script type="text/javascript">
-    tmpTargetPage = "" + window.location.search;
-    if (tmpTargetPage != "" && tmpTargetPage != "undefined")
-        tmpTargetPage = tmpTargetPage.substring(1);
-    if (tmpTargetPage.indexOf(":") != -1 || (tmpTargetPage != "" && !validURL(tmpTargetPage)))
-        tmpTargetPage = "undefined";
-    targetPage = tmpTargetPage;
-    function validURL(url) {
-        try {
-            url = decodeURIComponent(url);
-        }
-        catch (error) {
-            return false;
-        }
-        var pos = url.indexOf(".html");
-        if (pos == -1 || pos != url.length - 5)
-            return false;
-        var allowNumber = false;
-        var allowSep = false;
-        var seenDot = false;
-        for (var i = 0; i < url.length - 5; i++) {
-            var ch = url.charAt(i);
-            if ('a' <= ch && ch <= 'z' ||
-                    'A' <= ch && ch <= 'Z' ||
-                    ch == '$' ||
-                    ch == '_' ||
-                    ch.charCodeAt(0) > 127) {
-                allowNumber = true;
-                allowSep = true;
-            } else if ('0' <= ch && ch <= '9'
-                    || ch == '-') {
-                if (!allowNumber)
-                     return false;
-            } else if (ch == '/' || ch == '.') {
-                if (!allowSep)
-                    return false;
-                allowNumber = false;
-                allowSep = false;
-                if (ch == '.')
-                     seenDot = true;
-                if (ch == '/' && seenDot)
-                     return false;
-            } else {
-                return false;
-            }
-        }
-        return true;
-    }
-    function loadFrames() {
-        if (targetPage != "" && targetPage != "undefined")
-             top.classFrame.location = top.targetPage;
-    }
-</script>
-</head>
-<frameset cols="20%,80%" title="Documentation frame" onload="top.loadFrames()">
-<frameset rows="30%,70%" title="Left frames" onload="top.loadFrames()">
-<frame src="overview-frame.html" name="packageListFrame" title="All Packages">
-<frame src="allclasses-frame.html" name="packageFrame" title="All classes and interfaces (except non-static nested types)">
-</frameset>
-<frame src="overview-summary.html" name="classFrame" title="Package, class and interface descriptions" scrolling="yes">
-<noframes>
-<noscript>
-<div>JavaScript is disabled on your browser.</div>
-</noscript>
-<h2>Frame Alert</h2>
-<p>This document is designed to be viewed using the frames feature. If you see this message, you are using a non-frame-capable web client. Link to <a href="overview-summary.html">Non-frame version</a>.</p>
-</noframes>
-</frameset>
-</html>
diff --git a/content/api/client/index.html b/content/api/client/index.html
deleted file mode 100644
index 300f7e4..0000000
--- a/content/api/client/index.html
+++ /dev/null
@@ -1,72 +0,0 @@
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Frameset//EN" "http://www.w3.org/TR/html4/frameset.dtd">
-<!-- NewPage -->
-<html lang="en">
-<head>
-<!-- Generated by javadoc -->
-<title>Pulsar Client Java API</title>
-<script type="text/javascript">
-    tmpTargetPage = "" + window.location.search;
-    if (tmpTargetPage != "" && tmpTargetPage != "undefined")
-        tmpTargetPage = tmpTargetPage.substring(1);
-    if (tmpTargetPage.indexOf(":") != -1 || (tmpTargetPage != "" && !validURL(tmpTargetPage)))
-        tmpTargetPage = "undefined";
-    targetPage = tmpTargetPage;
-    function validURL(url) {
-        try {
-            url = decodeURIComponent(url);
-        }
-        catch (error) {
-            return false;
-        }
-        var pos = url.indexOf(".html");
-        if (pos == -1 || pos != url.length - 5)
-            return false;
-        var allowNumber = false;
-        var allowSep = false;
-        var seenDot = false;
-        for (var i = 0; i < url.length - 5; i++) {
-            var ch = url.charAt(i);
-            if ('a' <= ch && ch <= 'z' ||
-                    'A' <= ch && ch <= 'Z' ||
-                    ch == '$' ||
-                    ch == '_' ||
-                    ch.charCodeAt(0) > 127) {
-                allowNumber = true;
-                allowSep = true;
-            } else if ('0' <= ch && ch <= '9'
-                    || ch == '-') {
-                if (!allowNumber)
-                     return false;
-            } else if (ch == '/' || ch == '.') {
-                if (!allowSep)
-                    return false;
-                allowNumber = false;
-                allowSep = false;
-                if (ch == '.')
-                     seenDot = true;
-                if (ch == '/' && seenDot)
-                     return false;
-            } else {
-                return false;
-            }
-        }
-        return true;
-    }
-    function loadFrames() {
-        if (targetPage != "" && targetPage != "undefined")
-             top.classFrame.location = top.targetPage;
-    }
-</script>
-</head>
-<frameset cols="20%,80%" title="Documentation frame" onload="top.loadFrames()">
-<frame src="allclasses-frame.html" name="packageFrame" title="All classes and interfaces (except non-static nested types)">
-<frame src="overview-summary.html" name="classFrame" title="Package, class and interface descriptions" scrolling="yes">
-<noframes>
-<noscript>
-<div>JavaScript is disabled on your browser.</div>
-</noscript>
-<h2>Frame Alert</h2>
-<p>This document is designed to be viewed using the frames feature. If you see this message, you are using a non-frame-capable web client. Link to <a href="overview-summary.html">Non-frame version</a>.</p>
-</noframes>
-</frameset>
-</html>
diff --git a/content/api/cpp/index.html b/content/api/cpp/index.html
deleted file mode 100644
index 93d8aa7..0000000
--- a/content/api/cpp/index.html
+++ /dev/null
@@ -1,124 +0,0 @@
-<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
-<html xmlns="http://www.w3.org/1999/xhtml">
-<head>
-<meta http-equiv="Content-Type" content="text/xhtml;charset=UTF-8"/>
-<meta http-equiv="X-UA-Compatible" content="IE=9"/>
-<meta name="generator" content="Doxygen 1.8.11"/>
-<title>pulsar-client-cpp: Main Page</title>
-<link href="tabs.css" rel="stylesheet" type="text/css"/>
-<script type="text/javascript" src="jquery.js"></script>
-<script type="text/javascript" src="dynsections.js"></script>
-<link href="search/search.css" rel="stylesheet" type="text/css"/>
-<script type="text/javascript" src="search/searchdata.js"></script>
-<script type="text/javascript" src="search/search.js"></script>
-<script type="text/javascript">
-  $(document).ready(function() { init_search(); });
-</script>
-<link href="doxygen.css" rel="stylesheet" type="text/css" />
-</head>
-<body>
-<div id="top"><!-- do not remove this div, it is closed by doxygen! -->
-<div id="titlearea">
-<table cellspacing="0" cellpadding="0">
- <tbody>
- <tr style="height: 56px;">
-  <td id="projectalign" style="padding-left: 0.5em;">
-   <div id="projectname">pulsar-client-cpp
-   </div>
-  </td>
- </tr>
- </tbody>
-</table>
-</div>
-<!-- end header part -->
-<!-- Generated by Doxygen 1.8.11 -->
-<script type="text/javascript">
-var searchBox = new SearchBox("searchBox", "search",false,'Search');
-</script>
-  <div id="navrow1" class="tabs">
-    <ul class="tablist">
-      <li class="current"><a href="index.html"><span>Main&#160;Page</span></a></li>
-      <li><a href="pages.html"><span>Related&#160;Pages</span></a></li>
-      <li><a href="namespaces.html"><span>Namespaces</span></a></li>
-      <li><a href="annotated.html"><span>Classes</span></a></li>
-      <li><a href="files.html"><span>Files</span></a></li>
-      <li>
-        <div id="MSearchBox" class="MSearchBoxInactive">
-        <span class="left">
-          <img id="MSearchSelect" src="search/mag_sel.png"
-               onmouseover="return searchBox.OnSearchSelectShow()"
-               onmouseout="return searchBox.OnSearchSelectHide()"
-               alt=""/>
-          <input type="text" id="MSearchField" value="Search" accesskey="S"
-               onfocus="searchBox.OnSearchFieldFocus(true)" 
-               onblur="searchBox.OnSearchFieldFocus(false)" 
-               onkeyup="searchBox.OnSearchFieldChange(event)"/>
-          </span><span class="right">
-            <a id="MSearchClose" href="javascript:searchBox.CloseResultsWindow()"><img id="MSearchCloseImg" border="0" src="search/close.png" alt=""/></a>
-          </span>
-        </div>
-      </li>
-    </ul>
-  </div>
-</div><!-- top -->
-<!-- window showing the filter options -->
-<div id="MSearchSelectWindow"
-     onmouseover="return searchBox.OnSearchSelectShow()"
-     onmouseout="return searchBox.OnSearchSelectHide()"
-     onkeydown="return searchBox.OnSearchSelectKey(event)">
-</div>
-
-<!-- iframe showing the search results (closed by default) -->
-<div id="MSearchResultsWindow">
-<iframe src="javascript:void(0)" frameborder="0" 
-        name="MSearchResults" id="MSearchResults">
-</iframe>
-</div>
-
-<div class="header">
-  <div class="headertitle">
-<div class="title">pulsar-client-cpp Documentation</div>  </div>
-</div><!--header-->
-<div class="contents">
-<div class="textblock"><h1>The Pulsar C++ client</h1>
-<p>Welcome to the Doxygen documentation for <a href="https://pulsar.apache.org/">Pulsar</a>.</p>
-<h2>Supported platforms</h2>
-<p>The Pulsar C++ client has been successfully tested on <b>MacOS</b> and <b>Linux</b>.</p>
-<h2>System requirements</h2>
-<p>You need to have the following installed to use the C++ client:</p>
-<ul>
-<li><a href="https://cmake.org/">CMake</a></li>
-<li><a href="http://www.boost.org/">Boost</a></li>
-<li><a href="https://developers.google.com/protocol-buffers/">Protocol Buffers</a> 2.6</li>
-<li><a href="https://logging.apache.org/log4cxx">Log4CXX</a></li>
-<li><a href="https://curl.haxx.se/libcurl/">libcurl</a></li>
-<li><a href="https://github.com/google/googletest">Google Test</a></li>
-<li><a href="https://github.com/open-source-parsers/jsoncpp">JsonCpp</a></li>
-</ul>
-<h2>Compilation</h2>
-<p>There are separate compilation instructions for <a href="#macos">MacOS</a> and <a href="#linux">Linux</a>. For both systems, start by cloning the Pulsar repository:</p>
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno">    1</span>&#160;$ git clone https://github.com/apache/pulsar</div></div><!-- fragment --><h3>Linux</h3>
-<p>First, install all of the necessary dependencies:</p>
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno">    1</span>&#160;$ apt-get install cmake libssl-dev libcurl4-openssl-dev liblog4cxx-dev \</div><div class="line"><a name="l00002"></a><span class="lineno">    2</span>&#160;  libprotobuf-dev libboost-all-dev libgtest-dev libjsoncpp-dev</div></div><!-- fragment --><p>Then compile and install <a href="https://github.com/google/googletest">Google Test</a>:</p>
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno">    1</span>&#160;$ git clone https://github.com/google/googletest.git &amp;&amp; cd googletest</div><div class="line"><a name="l00002"></a><span class="lineno">    2</span>&#160;$ sudo cmake .</div><div class="line"><a name="l00003"></a><span class="lineno">    3</span>&#160;$ sudo make</div><div class="line"><a name="l00004"></a><span class="lineno">    4</span>&#160;$ sudo cp *.a /usr/lib</div></div><!-- [...]
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno">    1</span>&#160;$ cd pulsar-client-cpp</div><div class="line"><a name="l00002"></a><span class="lineno">    2</span>&#160;$ cmake .</div><div class="line"><a name="l00003"></a><span class="lineno">    3</span>&#160;$ make</div></div><!-- fragment --><p>The resulting files, <code>libpulsar.so</code> and <code>libpulsar.a</code>, will be placed in the <code>lib</code> folder of the repo while two tools, <co [...]
-<h3>MacOS</h3>
-<p>First, install all of the necessary dependencies:</p>
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno">    1</span>&#160;# OpenSSL installation</div><div class="line"><a name="l00002"></a><span class="lineno">    2</span>&#160;$ brew install openssl</div><div class="line"><a name="l00003"></a><span class="lineno">    3</span>&#160;$ export OPENSSL_INCLUDE_DIR=/usr/local/opt/openssl/include/</div><div class="line"><a name="l00004"></a><span class="lineno">    4</span>&#160;$ export OPENSSL_ROOT_DIR=/usr/local [...]
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno">    1</span>&#160;$ cd pulsar-client-cpp</div><div class="line"><a name="l00002"></a><span class="lineno">    2</span>&#160;$ cmake .</div><div class="line"><a name="l00003"></a><span class="lineno">    3</span>&#160;$ make</div></div><!-- fragment --><h2>Consumer</h2>
-<div class="fragment"><div class="line">Client client(<span class="stringliteral">&quot;pulsar://localhost:6650&quot;</span>);</div><div class="line"></div><div class="line">Consumer consumer;</div><div class="line"><a class="code" href="namespacepulsar.html#ae85314d6b9e8afd831cf8c66705f2dbb">Result</a> result = client.subscribe(<span class="stringliteral">&quot;persistent://sample/standalone/ns1/my-topic&quot;</span>, <span class="stringliteral">&quot;my-subscribtion-name&quot;</span>,  [...]
-<div class="fragment"><div class="line">Client client(<span class="stringliteral">&quot;pulsar://localhost:6650&quot;</span>);</div><div class="line"></div><div class="line">Producer producer;</div><div class="line"><a class="code" href="namespacepulsar.html#ae85314d6b9e8afd831cf8c66705f2dbb">Result</a> result = client.createProducer(<span class="stringliteral">&quot;persistent://sample/standalone/ns1/my-topic&quot;</span>, producer);</div><div class="line"><span class="keywordflow">if</ [...]
-<div class="fragment"><div class="line">ClientConfiguration config = ClientConfiguration();</div><div class="line">config.setUseTls(<span class="keyword">true</span>);</div><div class="line">std::string certfile = <span class="stringliteral">&quot;/path/to/cacert.pem&quot;</span>;</div><div class="line"></div><div class="line">ParamMap params;</div><div class="line">params[<span class="stringliteral">&quot;tlsCertFile&quot;</span>] = <span class="stringliteral">&quot;/path/to/client-cert [...]
-<p>After you changed code, run auto-formatting by the following command.</p>
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno">    1</span>&#160;make format</div></div><!-- fragment --><p> You need to have the following installed to use the auto-formatting.</p><ul>
-<li><a href="https://clang.llvm.org/">clang-format 5.0</a> </li>
-</ul>
-</div></div><!-- contents -->
-<!-- start footer part -->
-<hr class="footer"/><address class="footer"><small>
-Generated by &#160;<a href="http://www.doxygen.org/index.html">
-<img class="footer" src="doxygen.png" alt="doxygen"/>
-</a> 1.8.11
-</small></address>
-</body>
-</html>
diff --git a/content/api/pulsar-functions/index.html b/content/api/pulsar-functions/index.html
deleted file mode 100644
index ff28246..0000000
--- a/content/api/pulsar-functions/index.html
+++ /dev/null
@@ -1,75 +0,0 @@
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Frameset//EN" "http://www.w3.org/TR/html4/frameset.dtd">
-<!-- NewPage -->
-<html lang="en">
-<head>
-<!-- Generated by javadoc -->
-<title>Pulsar Functions Java SDK</title>
-<script type="text/javascript">
-    tmpTargetPage = "" + window.location.search;
-    if (tmpTargetPage != "" && tmpTargetPage != "undefined")
-        tmpTargetPage = tmpTargetPage.substring(1);
-    if (tmpTargetPage.indexOf(":") != -1 || (tmpTargetPage != "" && !validURL(tmpTargetPage)))
-        tmpTargetPage = "undefined";
-    targetPage = tmpTargetPage;
-    function validURL(url) {
-        try {
-            url = decodeURIComponent(url);
-        }
-        catch (error) {
-            return false;
-        }
-        var pos = url.indexOf(".html");
-        if (pos == -1 || pos != url.length - 5)
-            return false;
-        var allowNumber = false;
-        var allowSep = false;
-        var seenDot = false;
-        for (var i = 0; i < url.length - 5; i++) {
-            var ch = url.charAt(i);
-            if ('a' <= ch && ch <= 'z' ||
-                    'A' <= ch && ch <= 'Z' ||
-                    ch == '$' ||
-                    ch == '_' ||
-                    ch.charCodeAt(0) > 127) {
-                allowNumber = true;
-                allowSep = true;
-            } else if ('0' <= ch && ch <= '9'
-                    || ch == '-') {
-                if (!allowNumber)
-                     return false;
-            } else if (ch == '/' || ch == '.') {
-                if (!allowSep)
-                    return false;
-                allowNumber = false;
-                allowSep = false;
-                if (ch == '.')
-                     seenDot = true;
-                if (ch == '/' && seenDot)
-                     return false;
-            } else {
-                return false;
-            }
-        }
-        return true;
-    }
-    function loadFrames() {
-        if (targetPage != "" && targetPage != "undefined")
-             top.classFrame.location = top.targetPage;
-    }
-</script>
-</head>
-<frameset cols="20%,80%" title="Documentation frame" onload="top.loadFrames()">
-<frameset rows="30%,70%" title="Left frames" onload="top.loadFrames()">
-<frame src="overview-frame.html" name="packageListFrame" title="All Packages">
-<frame src="allclasses-frame.html" name="packageFrame" title="All classes and interfaces (except non-static nested types)">
-</frameset>
-<frame src="overview-summary.html" name="classFrame" title="Package, class and interface descriptions" scrolling="yes">
-<noframes>
-<noscript>
-<div>JavaScript is disabled on your browser.</div>
-</noscript>
-<h2>Frame Alert</h2>
-<p>This document is designed to be viewed using the frames feature. If you see this message, you are using a non-frame-capable web client. Link to <a href="overview-summary.html">Non-frame version</a>.</p>
-</noframes>
-</frameset>
-</html>
diff --git a/content/api/python/index.html b/content/api/python/index.html
deleted file mode 100644
index 0cd0cf1..0000000
--- a/content/api/python/index.html
+++ /dev/null
@@ -1,6459 +0,0 @@
-<!doctype html>
-<head>
-  <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
-  <meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" />
-
-    <title>pulsar API documentation</title>
-    <meta name="description" content="The Pulsar Python client library is based on the existing C++ client library.
-All the same features ..." />
-
-  <link href='http://fonts.googleapis.com/css?family=Source+Sans+Pro:400,300' rel='stylesheet' type='text/css'>
-  
-  <style type="text/css">
-  
-* {
-  box-sizing: border-box;
-}
-/*! normalize.css v1.1.1 | MIT License | git.io/normalize */
-
-/* ==========================================================================
-   HTML5 display definitions
-   ========================================================================== */
-
-/**
- * Correct `block` display not defined in IE 6/7/8/9 and Firefox 3.
- */
-
-article,
-aside,
-details,
-figcaption,
-figure,
-footer,
-header,
-hgroup,
-main,
-nav,
-section,
-summary {
-    display: block;
-}
-
-/**
- * Correct `inline-block` display not defined in IE 6/7/8/9 and Firefox 3.
- */
-
-audio,
-canvas,
-video {
-    display: inline-block;
-    *display: inline;
-    *zoom: 1;
-}
-
-/**
- * Prevent modern browsers from displaying `audio` without controls.
- * Remove excess height in iOS 5 devices.
- */
-
-audio:not([controls]) {
-    display: none;
-    height: 0;
-}
-
-/**
- * Address styling not present in IE 7/8/9, Firefox 3, and Safari 4.
- * Known issue: no IE 6 support.
- */
-
-[hidden] {
-    display: none;
-}
-
-/* ==========================================================================
-   Base
-   ========================================================================== */
-
-/**
- * 1. Prevent system color scheme's background color being used in Firefox, IE,
- *    and Opera.
- * 2. Prevent system color scheme's text color being used in Firefox, IE, and
- *    Opera.
- * 3. Correct text resizing oddly in IE 6/7 when body `font-size` is set using
- *    `em` units.
- * 4. Prevent iOS text size adjust after orientation change, without disabling
- *    user zoom.
- */
-
-html {
-    background: #fff; /* 1 */
-    color: #000; /* 2 */
-    font-size: 100%; /* 3 */
-    -webkit-text-size-adjust: 100%; /* 4 */
-    -ms-text-size-adjust: 100%; /* 4 */
-}
-
-/**
- * Address `font-family` inconsistency between `textarea` and other form
- * elements.
- */
-
-html,
-button,
-input,
-select,
-textarea {
-    font-family: sans-serif;
-}
-
-/**
- * Address margins handled incorrectly in IE 6/7.
- */
-
-body {
-    margin: 0;
-}
-
-/* ==========================================================================
-   Links
-   ========================================================================== */
-
-/**
- * Address `outline` inconsistency between Chrome and other browsers.
- */
-
-a:focus {
-    outline: thin dotted;
-}
-
-/**
- * Improve readability when focused and also mouse hovered in all browsers.
- */
-
-a:active,
-a:hover {
-    outline: 0;
-}
-
-/* ==========================================================================
-   Typography
-   ========================================================================== */
-
-/**
- * Address font sizes and margins set differently in IE 6/7.
- * Address font sizes within `section` and `article` in Firefox 4+, Safari 5,
- * and Chrome.
- */
-
-h1 {
-    font-size: 2em;
-    margin: 0.67em 0;
-}
-
-h2 {
-    font-size: 1.5em;
-    margin: 0.83em 0;
-}
-
-h3 {
-    font-size: 1.17em;
-    margin: 1em 0;
-}
-
-h4 {
-    font-size: 1em;
-    margin: 1.33em 0;
-}
-
-h5 {
-    font-size: 0.83em;
-    margin: 1.67em 0;
-}
-
-h6 {
-    font-size: 0.67em;
-    margin: 2.33em 0;
-}
-
-/**
- * Address styling not present in IE 7/8/9, Safari 5, and Chrome.
- */
-
-abbr[title] {
-    border-bottom: 1px dotted;
-}
-
-/**
- * Address style set to `bolder` in Firefox 3+, Safari 4/5, and Chrome.
- */
-
-b,
-strong {
-    font-weight: bold;
-}
-
-blockquote {
-    margin: 1em 40px;
-}
-
-/**
- * Address styling not present in Safari 5 and Chrome.
- */
-
-dfn {
-    font-style: italic;
-}
-
-/**
- * Address differences between Firefox and other browsers.
- * Known issue: no IE 6/7 normalization.
- */
-
-hr {
-    -moz-box-sizing: content-box;
-    box-sizing: content-box;
-    height: 0;
-}
-
-/**
- * Address styling not present in IE 6/7/8/9.
- */
-
-mark {
-    background: #ff0;
-    color: #000;
-}
-
-/**
- * Address margins set differently in IE 6/7.
- */
-
-p,
-pre {
-    margin: 1em 0;
-}
-
-/**
- * Correct font family set oddly in IE 6, Safari 4/5, and Chrome.
- */
-
-code,
-kbd,
-pre,
-samp {
-    font-family: monospace, serif;
-    _font-family: 'courier new', monospace;
-    font-size: 1em;
-}
-
-/**
- * Improve readability of pre-formatted text in all browsers.
- */
-
-pre {
-    white-space: pre;
-    white-space: pre-wrap;
-    word-wrap: break-word;
-}
-
-/**
- * Address CSS quotes not supported in IE 6/7.
- */
-
-q {
-    quotes: none;
-}
-
-/**
- * Address `quotes` property not supported in Safari 4.
- */
-
-q:before,
-q:after {
-    content: '';
-    content: none;
-}
-
-/**
- * Address inconsistent and variable font size in all browsers.
- */
-
-small {
-    font-size: 80%;
-}
-
-/**
- * Prevent `sub` and `sup` affecting `line-height` in all browsers.
- */
-
-sub,
-sup {
-    font-size: 75%;
-    line-height: 0;
-    position: relative;
-    vertical-align: baseline;
-}
-
-sup {
-    top: -0.5em;
-}
-
-sub {
-    bottom: -0.25em;
-}
-
-/* ==========================================================================
-   Lists
-   ========================================================================== */
-
-/**
- * Address margins set differently in IE 6/7.
- */
-
-dl,
-menu,
-ol,
-ul {
-    margin: 1em 0;
-}
-
-dd {
-    margin: 0 0 0 40px;
-}
-
-/**
- * Address paddings set differently in IE 6/7.
- */
-
-menu,
-ol,
-ul {
-    padding: 0 0 0 40px;
-}
-
-/**
- * Correct list images handled incorrectly in IE 7.
- */
-
-nav ul,
-nav ol {
-    list-style: none;
-    list-style-image: none;
-}
-
-/* ==========================================================================
-   Embedded content
-   ========================================================================== */
-
-/**
- * 1. Remove border when inside `a` element in IE 6/7/8/9 and Firefox 3.
- * 2. Improve image quality when scaled in IE 7.
- */
-
-img {
-    border: 0; /* 1 */
-    -ms-interpolation-mode: bicubic; /* 2 */
-}
-
-/**
- * Correct overflow displayed oddly in IE 9.
- */
-
-svg:not(:root) {
-    overflow: hidden;
-}
-
-/* ==========================================================================
-   Figures
-   ========================================================================== */
-
-/**
- * Address margin not present in IE 6/7/8/9, Safari 5, and Opera 11.
- */
-
-figure {
-    margin: 0;
-}
-
-/* ==========================================================================
-   Forms
-   ========================================================================== */
-
-/**
- * Correct margin displayed oddly in IE 6/7.
- */
-
-form {
-    margin: 0;
-}
-
-/**
- * Define consistent border, margin, and padding.
- */
-
-fieldset {
-    border: 1px solid #c0c0c0;
-    margin: 0 2px;
-    padding: 0.35em 0.625em 0.75em;
-}
-
-/**
- * 1. Correct color not being inherited in IE 6/7/8/9.
- * 2. Correct text not wrapping in Firefox 3.
- * 3. Correct alignment displayed oddly in IE 6/7.
- */
-
-legend {
-    border: 0; /* 1 */
-    padding: 0;
-    white-space: normal; /* 2 */
-    *margin-left: -7px; /* 3 */
-}
-
-/**
- * 1. Correct font size not being inherited in all browsers.
- * 2. Address margins set differently in IE 6/7, Firefox 3+, Safari 5,
- *    and Chrome.
- * 3. Improve appearance and consistency in all browsers.
- */
-
-button,
-input,
-select,
-textarea {
-    font-size: 100%; /* 1 */
-    margin: 0; /* 2 */
-    vertical-align: baseline; /* 3 */
-    *vertical-align: middle; /* 3 */
-}
-
-/**
- * Address Firefox 3+ setting `line-height` on `input` using `!important` in
- * the UA stylesheet.
- */
-
-button,
-input {
-    line-height: normal;
-}
-
-/**
- * Address inconsistent `text-transform` inheritance for `button` and `select`.
- * All other form control elements do not inherit `text-transform` values.
- * Correct `button` style inheritance in Chrome, Safari 5+, and IE 6+.
- * Correct `select` style inheritance in Firefox 4+ and Opera.
- */
-
-button,
-select {
-    text-transform: none;
-}
-
-/**
- * 1. Avoid the WebKit bug in Android 4.0.* where (2) destroys native `audio`
- *    and `video` controls.
- * 2. Correct inability to style clickable `input` types in iOS.
- * 3. Improve usability and consistency of cursor style between image-type
- *    `input` and others.
- * 4. Remove inner spacing in IE 7 without affecting normal text inputs.
- *    Known issue: inner spacing remains in IE 6.
- */
-
-button,
-html input[type="button"], /* 1 */
-input[type="reset"],
-input[type="submit"] {
-    -webkit-appearance: button; /* 2 */
-    cursor: pointer; /* 3 */
-    *overflow: visible;  /* 4 */
-}
-
-/**
- * Re-set default cursor for disabled elements.
- */
-
-button[disabled],
-html input[disabled] {
-    cursor: default;
-}
-
-/**
- * 1. Address box sizing set to content-box in IE 8/9.
- * 2. Remove excess padding in IE 8/9.
- * 3. Remove excess padding in IE 7.
- *    Known issue: excess padding remains in IE 6.
- */
-
-input[type="checkbox"],
-input[type="radio"] {
-    box-sizing: border-box; /* 1 */
-    padding: 0; /* 2 */
-    *height: 13px; /* 3 */
-    *width: 13px; /* 3 */
-}
-
-/**
- * 1. Address `appearance` set to `searchfield` in Safari 5 and Chrome.
- * 2. Address `box-sizing` set to `border-box` in Safari 5 and Chrome
- *    (include `-moz` to future-proof).
- */
-
-input[type="search"] {
-    -webkit-appearance: textfield; /* 1 */
-    -moz-box-sizing: content-box;
-    -webkit-box-sizing: content-box; /* 2 */
-    box-sizing: content-box;
-}
-
-/**
- * Remove inner padding and search cancel button in Safari 5 and Chrome
- * on OS X.
- */
-
-input[type="search"]::-webkit-search-cancel-button,
-input[type="search"]::-webkit-search-decoration {
-    -webkit-appearance: none;
-}
-
-/**
- * Remove inner padding and border in Firefox 3+.
- */
-
-button::-moz-focus-inner,
-input::-moz-focus-inner {
-    border: 0;
-    padding: 0;
-}
-
-/**
- * 1. Remove default vertical scrollbar in IE 6/7/8/9.
- * 2. Improve readability and alignment in all browsers.
- */
-
-textarea {
-    overflow: auto; /* 1 */
-    vertical-align: top; /* 2 */
-}
-
-/* ==========================================================================
-   Tables
-   ========================================================================== */
-
-/**
- * Remove most spacing between table cells.
- */
-
-table {
-    border-collapse: collapse;
-    border-spacing: 0;
-}
-
-  </style>
-
-  <style type="text/css">
-  
-  html, body {
-    margin: 0;
-    padding: 0;
-    min-height: 100%;
-  }
-  body {
-    background: #fff;
-    font-family: "Source Sans Pro", "Helvetica Neueue", Helvetica, sans;
-    font-weight: 300;
-    font-size: 16px;
-    line-height: 1.6em;
-  }
-  #content {
-    width: 70%;
-    max-width: 850px;
-    float: left;
-    padding: 30px 60px;
-    border-left: 1px solid #ddd;
-  }
-  #sidebar {
-    width: 25%;
-    float: left;
-    padding: 30px;
-    overflow: hidden;
-  }
-  #nav {
-    font-size: 130%;
-    margin: 0 0 15px 0;
-  }
-
-  #top {
-    display: block;
-    position: fixed;
-    bottom: 5px;
-    left: 5px;
-    font-size: .85em;
-    text-transform: uppercase;
-  }
-
-  #footer {
-    font-size: .75em;
-    padding: 5px 30px;
-    border-top: 1px solid #ddd;
-    text-align: right;
-  }
-    #footer p {
-      margin: 0 0 0 30px;
-      display: inline-block;
-    }
-
-  h1, h2, h3, h4, h5 {
-    font-weight: 300;
-  }
-  h1 {
-    font-size: 2.5em;
-    line-height: 1.1em;
-    margin: 0 0 .50em 0;
-  }
-
-  h2 {
-    font-size: 1.75em;
-    margin: 1em 0 .50em 0;
-  }
-
-  h3 {
-    margin: 25px 0 10px 0;
-  }
-
-  h4 {
-    margin: 0;
-    font-size: 105%;
-  }
-
-  a {
-    color: #058;
-    text-decoration: none;
-    transition: color .3s ease-in-out;
-  }
-
-  a:hover {
-    color: #e08524;
-    transition: color .3s ease-in-out;
-  }
-
-  pre, code, .mono, .name {
-    font-family: "Ubuntu Mono", "Cousine", "DejaVu Sans Mono", monospace;
-  }
-
-  .title .name {
-    font-weight: bold;
-  }
-  .section-title {
-    margin-top: 2em;
-  }
-  .ident {
-    color: #900;
-  }
-
-  code {
-    background: #f9f9f9;
-  } 
-
-  pre {
-    background: #fefefe;
-    border: 1px solid #ddd;
-    box-shadow: 2px 2px 0 #f3f3f3;
-    margin: 0 30px;
-    padding: 15px 30px;
-  }
-
-  .codehilite {
-    margin: 0 30px 10px 30px;
-  }
-
-    .codehilite pre {
-      margin: 0;
-    }
-    .codehilite .err { background: #ff3300; color: #fff !important; } 
-
-  table#module-list {
-    font-size: 110%;
-  }
-
-    table#module-list tr td:first-child {
-      padding-right: 10px;
-      white-space: nowrap;
-    }
-
-    table#module-list td {
-      vertical-align: top;
-      padding-bottom: 8px;
-    }
-
-      table#module-list td p {
-        margin: 0 0 7px 0;
-      }
-
-  .def {
-    display: table;
-  }
-
-    .def p {
-      display: table-cell;
-      vertical-align: top;
-      text-align: left;
-    }
-
-    .def p:first-child {
-      white-space: nowrap;
-    }
-
-    .def p:last-child {
-      width: 100%;
-    }
-
-
-  #index {
-    list-style-type: none;
-    margin: 0;
-    padding: 0;
-  }
-    ul#index .class_name {
-      /* font-size: 110%; */
-      font-weight: bold;
-    }
-    #index ul {
-      margin: 0;
-    }
-
-  .item {
-    margin: 0 0 15px 0;
-  }
-
-    .item .class {
-      margin: 0 0 25px 30px;
-    }
-
-      .item .class ul.class_list {
-        margin: 0 0 20px 0;
-      }
-
-    .item .name {
-      background: #fafafa;
-      margin: 0;
-      font-weight: bold;
-      padding: 5px 10px;
-      border-radius: 3px;
-      display: inline-block;
-      min-width: 40%;
-    }
-      .item .name:hover {
-        background: #f6f6f6;
-      }
-
-    .item .empty_desc {
-      margin: 0 0 5px 0;
-      padding: 0;
-    }
-
-    .item .inheritance {
-      margin: 3px 0 0 30px;
-    }
-
-    .item .inherited {
-      color: #666;
-    }
-
-    .item .desc {
-      padding: 0 8px;
-      margin: 0;
-    }
-
-      .item .desc p {
-        margin: 0 0 10px 0;
-      }
-
-    .source_cont {
-      margin: 0;
-      padding: 0;
-    }
-
-    .source_link a {
-      background: #ffc300;
-      font-weight: 400;
-      font-size: .75em;
-      text-transform: uppercase;
-      color: #fff;
-      text-shadow: 1px 1px 0 #f4b700;
-      
-      padding: 3px 8px;
-      border-radius: 2px;
-      transition: background .3s ease-in-out;
-    }
-      .source_link a:hover {
-        background: #FF7200;
-        text-shadow: none;
-        transition: background .3s ease-in-out;
-      }
-
-    .source {
-      display: none;
-      max-height: 600px;
-      overflow-y: scroll;
-      margin-bottom: 15px;
-    }
-
-      .source .codehilite {
-        margin: 0;
-      }
-
-  .desc h1, .desc h2, .desc h3 {
-    font-size: 100% !important;
-  }
-  .clear {
-    clear: both;
-  }
-
-  @media all and (max-width: 950px) {
-    #sidebar {
-      width: 35%;
-    }
-    #content {
-      width: 65%;
-    }
-  }
-  @media all and (max-width: 650px) {
-    #top {
-      display: none;
-    }
-    #sidebar {
-      float: none;
-      width: auto;
-    }
-    #content {
-      float: none;
-      width: auto;
-      padding: 30px;
-    }
-
-    #index ul {
-      padding: 0;
-      margin-bottom: 15px;
-    }
-    #index ul li {
-      display: inline-block;
-      margin-right: 30px;
-    }
-    #footer {
-      text-align: left;
-    }
-    #footer p {
-      display: block;
-      margin: inherit;
-    }
-  }
-
-  /*****************************/
-
-  </style>
-
-
-  <style type="text/css">
-  
-/* ==========================================================================
-   EXAMPLE Media Queries for Responsive Design.
-   These examples override the primary ('mobile first') styles.
-   Modify as content requires.
-   ========================================================================== */
-
-@media only screen and (min-width: 35em) {
-    /* Style adjustments for viewports that meet the condition */
-}
-
-@media print,
-       (-o-min-device-pixel-ratio: 5/4),
-       (-webkit-min-device-pixel-ratio: 1.25),
-       (min-resolution: 120dpi) {
-    /* Style adjustments for high resolution devices */
-}
-
-/* ==========================================================================
-   Print styles.
-   Inlined to avoid required HTTP connection: h5bp.com/r
-   ========================================================================== */
-
-@media print {
-    * {
-        background: transparent !important;
-        color: #000 !important; /* Black prints faster: h5bp.com/s */
-        box-shadow: none !important;
-        text-shadow: none !important;
-    }
-
-    a,
-    a:visited {
-        text-decoration: underline;
-    }
-
-    a[href]:after {
-        content: " (" attr(href) ")";
-    }
-
-    abbr[title]:after {
-        content: " (" attr(title) ")";
-    }
-
-    /*
-     * Don't show links for images, or javascript/internal links
-     */
-
-    .ir a:after,
-    a[href^="javascript:"]:after,
-    a[href^="#"]:after {
-        content: "";
-    }
-
-    pre,
-    blockquote {
-        border: 1px solid #999;
-        page-break-inside: avoid;
-    }
-
-    thead {
-        display: table-header-group; /* h5bp.com/t */
-    }
-
-    tr,
-    img {
-        page-break-inside: avoid;
-    }
-
-    img {
-        max-width: 100% !important;
-    }
-
-    @page {
-        margin: 0.5cm;
-    }
-
-    p,
-    h2,
-    h3 {
-        orphans: 3;
-        widows: 3;
-    }
-
-    h2,
-    h3 {
-        page-break-after: avoid;
-    }
-}
-
-  </style>
-
-  <script type="text/javascript">
-  function toggle(id, $link) {
-    $node = document.getElementById(id);
-    if (!$node)
-    return;
-    if (!$node.style.display || $node.style.display == 'none') {
-    $node.style.display = 'block';
-    $link.innerHTML = 'Hide source &nequiv;';
-    } else {
-    $node.style.display = 'none';
-    $link.innerHTML = 'Show source &equiv;';
-    }
-  }
-  </script>
-</head>
-<body>
-<a href="#" id="top">Top</a>
-
-<div id="container">
-    
-  
-  <div id="sidebar">
-    <h1>Index</h1>
-    <ul id="index">
-
-
-    <li class="set"><h3><a href="#header-classes">Classes</a></h3>
-      <ul>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.Authentication">Authentication</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.Authentication.__init__">__init__</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.AuthenticationAthenz">AuthenticationAthenz</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.AuthenticationAthenz.__init__">__init__</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.AuthenticationOauth2">AuthenticationOauth2</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.AuthenticationOauth2.__init__">__init__</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.AuthenticationTLS">AuthenticationTLS</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.AuthenticationTLS.__init__">__init__</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.AuthenticationToken">AuthenticationToken</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.AuthenticationToken.__init__">__init__</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.Client">Client</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.Client.__init__">__init__</a></li>
-    <li class="mono"><a href="#pulsar.Client.close">close</a></li>
-    <li class="mono"><a href="#pulsar.Client.create_producer">create_producer</a></li>
-    <li class="mono"><a href="#pulsar.Client.create_reader">create_reader</a></li>
-    <li class="mono"><a href="#pulsar.Client.get_topic_partitions">get_topic_partitions</a></li>
-    <li class="mono"><a href="#pulsar.Client.subscribe">subscribe</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.Consumer">Consumer</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.Consumer.acknowledge">acknowledge</a></li>
-    <li class="mono"><a href="#pulsar.Consumer.acknowledge_cumulative">acknowledge_cumulative</a></li>
-    <li class="mono"><a href="#pulsar.Consumer.close">close</a></li>
-    <li class="mono"><a href="#pulsar.Consumer.negative_acknowledge">negative_acknowledge</a></li>
-    <li class="mono"><a href="#pulsar.Consumer.pause_message_listener">pause_message_listener</a></li>
-    <li class="mono"><a href="#pulsar.Consumer.receive">receive</a></li>
-    <li class="mono"><a href="#pulsar.Consumer.redeliver_unacknowledged_messages">redeliver_unacknowledged_messages</a></li>
-    <li class="mono"><a href="#pulsar.Consumer.resume_message_listener">resume_message_listener</a></li>
-    <li class="mono"><a href="#pulsar.Consumer.seek">seek</a></li>
-    <li class="mono"><a href="#pulsar.Consumer.subscription_name">subscription_name</a></li>
-    <li class="mono"><a href="#pulsar.Consumer.topic">topic</a></li>
-    <li class="mono"><a href="#pulsar.Consumer.unsubscribe">unsubscribe</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.CryptoKeyReader">CryptoKeyReader</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.CryptoKeyReader.__init__">__init__</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.Message">Message</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.Message.data">data</a></li>
-    <li class="mono"><a href="#pulsar.Message.event_timestamp">event_timestamp</a></li>
-    <li class="mono"><a href="#pulsar.Message.message_id">message_id</a></li>
-    <li class="mono"><a href="#pulsar.Message.partition_key">partition_key</a></li>
-    <li class="mono"><a href="#pulsar.Message.properties">properties</a></li>
-    <li class="mono"><a href="#pulsar.Message.publish_timestamp">publish_timestamp</a></li>
-    <li class="mono"><a href="#pulsar.Message.redelivery_count">redelivery_count</a></li>
-    <li class="mono"><a href="#pulsar.Message.schema_version">schema_version</a></li>
-    <li class="mono"><a href="#pulsar.Message.topic_name">topic_name</a></li>
-    <li class="mono"><a href="#pulsar.Message.value">value</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.MessageBatch">MessageBatch</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.MessageBatch.__init__">__init__</a></li>
-    <li class="mono"><a href="#pulsar.MessageBatch.parse_from">parse_from</a></li>
-    <li class="mono"><a href="#pulsar.MessageBatch.with_message_id">with_message_id</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.MessageId">MessageId</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.MessageId.deserialize">deserialize</a></li>
-    <li class="mono"><a href="#pulsar.MessageId.__init__">__init__</a></li>
-    <li class="mono"><a href="#pulsar.MessageId.batch_index">batch_index</a></li>
-    <li class="mono"><a href="#pulsar.MessageId.entry_id">entry_id</a></li>
-    <li class="mono"><a href="#pulsar.MessageId.ledger_id">ledger_id</a></li>
-    <li class="mono"><a href="#pulsar.MessageId.partition">partition</a></li>
-    <li class="mono"><a href="#pulsar.MessageId.serialize">serialize</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.Producer">Producer</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.Producer.close">close</a></li>
-    <li class="mono"><a href="#pulsar.Producer.flush">flush</a></li>
-    <li class="mono"><a href="#pulsar.Producer.last_sequence_id">last_sequence_id</a></li>
-    <li class="mono"><a href="#pulsar.Producer.producer_name">producer_name</a></li>
-    <li class="mono"><a href="#pulsar.Producer.send">send</a></li>
-    <li class="mono"><a href="#pulsar.Producer.send_async">send_async</a></li>
-    <li class="mono"><a href="#pulsar.Producer.topic">topic</a></li>
-  </ul>
-
-        </li>
-        <li class="mono">
-        <span class="class_name"><a href="#pulsar.Reader">Reader</a></span>
-        
-          
-  <ul>
-    <li class="mono"><a href="#pulsar.Reader.close">close</a></li>
-    <li class="mono"><a href="#pulsar.Reader.has_message_available">has_message_available</a></li>
-    <li class="mono"><a href="#pulsar.Reader.read_next">read_next</a></li>
-    <li class="mono"><a href="#pulsar.Reader.seek">seek</a></li>
-    <li class="mono"><a href="#pulsar.Reader.topic">topic</a></li>
-  </ul>
-
-        </li>
-      </ul>
-    </li>
-
-    <li class="set"><h3><a href="#header-submodules">Sub-modules</a></h3>
-      <ul>
-        <li class="mono"><a href="functions/index.html">pulsar.functions</a></li>
-        <li class="mono"><a href="schema/index.html">pulsar.schema</a></li>
-      </ul>
-    </li>
-    </ul>
-  </div>
-
-    <article id="content">
-      
-  
-
-  
-
-
-  <header id="section-intro">
-  <h1 class="title"><span class="name">pulsar</span> module</h1>
-  <p>The Pulsar Python client library is based on the existing C++ client library.
-All the same features are exposed through the Python interface.</p>
-<p>Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8.</p>
-<h2>Install from PyPI</h2>
-<p>Download Python wheel binary files for MacOS and Linux
-directly from the PyPI archive.</p>
-<pre><code>#!shell
-$ sudo pip install pulsar-client
-</code></pre>
-<h2>Install from sources</h2>
-<p>Follow the instructions to compile the Pulsar C++ client library. This method
-will also build the Python binding for the library.</p>
-<p>To install the Python bindings:</p>
-<pre><code>#!shell
-$ cd pulsar-client-cpp/python
-$ sudo python setup.py install
-</code></pre>
-<h2>Examples</h2>
-<h3><a href="#pulsar.Producer">Producer</a> example</h3>
-<pre><code>#!python
-import pulsar
-
-client = pulsar.Client('pulsar://localhost:6650')
-
-producer = client.create_producer('my-topic')
-
-for i in range(10):
-    producer.send(('Hello-%d' % i).encode('utf-8'))
-
-client.close()
-</code></pre>
-<h4><a href="#pulsar.Consumer">Consumer</a> Example</h4>
-<pre><code>#!python
-import pulsar
-
-client = pulsar.Client('pulsar://localhost:6650')
-consumer = client.subscribe('my-topic', 'my-subscription')
-
-while True:
-    msg = consumer.receive()
-    try:
-        print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id())
-        consumer.acknowledge(msg)
-    except:
-        consumer.negative_acknowledge(msg)
-
-client.close()
-</code></pre>
-<h3><a href="#pulsar.Producer.send_async">Async producer</a> example</h3>
-<pre><code>#!python
-import pulsar
-
-client = pulsar.Client('pulsar://localhost:6650')
-
-producer = client.create_producer(
-                'my-topic',
-                block_if_queue_full=True,
-                batching_enabled=True,
-                batching_max_publish_delay_ms=10
-            )
-
-def send_callback(res, msg_id):
-    print('Message published res=%s', res)
-
-while True:
-    producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback)
-
-client.close()
-</code></pre>
-  
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar" class="source">
-    <pre><code>#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-The Pulsar Python client library is based on the existing C++ client library.
-All the same features are exposed through the Python interface.
-
-Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8.
-
-## Install from PyPI
-
-Download Python wheel binary files for MacOS and Linux
-directly from the PyPI archive.
-
-    #!shell
-    $ sudo pip install pulsar-client
-
-## Install from sources
-
-Follow the instructions to compile the Pulsar C++ client library. This method
-will also build the Python binding for the library.
-
-To install the Python bindings:
-
-    #!shell
-    $ cd pulsar-client-cpp/python
-    $ sudo python setup.py install
-
-## Examples
-
-### [Producer](#pulsar.Producer) example
-
-    #!python
-    import pulsar
-
-    client = pulsar.Client('pulsar://localhost:6650')
-
-    producer = client.create_producer('my-topic')
-
-    for i in range(10):
-        producer.send(('Hello-%d' % i).encode('utf-8'))
-
-    client.close()
-
-#### [Consumer](#pulsar.Consumer) Example
-
-    #!python
-    import pulsar
-
-    client = pulsar.Client('pulsar://localhost:6650')
-    consumer = client.subscribe('my-topic', 'my-subscription')
-
-    while True:
-        msg = consumer.receive()
-        try:
-            print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id())
-            consumer.acknowledge(msg)
-        except:
-            consumer.negative_acknowledge(msg)
-
-    client.close()
-
-### [Async producer](#pulsar.Producer.send_async) example
-
-    #!python
-    import pulsar
-
-    client = pulsar.Client('pulsar://localhost:6650')
-
-    producer = client.create_producer(
-                    'my-topic',
-                    block_if_queue_full=True,
-                    batching_enabled=True,
-                    batching_max_publish_delay_ms=10
-                )
-
-    def send_callback(res, msg_id):
-        print('Message published res=%s', res)
-
-    while True:
-        producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback)
-
-    client.close()
-"""
-
-import _pulsar
-
-from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType  # noqa: F401
-
-from pulsar.functions.function import Function
-from pulsar.functions.context import Context
-from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
-from pulsar import schema
-_schema = schema
-
-import re
-_retype = type(re.compile('x'))
-
-import certifi
-from datetime import timedelta
-
-
-class MessageId:
-    """
-    Represents a message id
-    """
-
-    def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
-        self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
-
-    'Represents the earliest message stored in a topic'
-    earliest = _pulsar.MessageId.earliest
-
-    'Represents the latest message published on a topic'
-    latest = _pulsar.MessageId.latest
-
-    def ledger_id(self):
-        return self._msg_id.ledger_id()
-
-    def entry_id(self):
-        return self._msg_id.entry_id()
-
-    def batch_index(self):
-        return self._msg_id.batch_index()
-
-    def partition(self):
-        return self._msg_id.partition()
-
-    def serialize(self):
-        """
-        Returns a bytes representation of the message id.
-        This bytes sequence can be stored and later deserialized.
-        """
-        return self._msg_id.serialize()
-
-    @staticmethod
-    def deserialize(message_id_bytes):
-        """
-        Deserialize a message id object from a previously
-        serialized bytes sequence.
-        """
-        return _pulsar.MessageId.deserialize(message_id_bytes)
-
-
-class Message:
-    """
-    Message objects are returned by a consumer, either by calling `receive` or
-    through a listener.
-    """
-
-    def data(self):
-        """
-        Returns object typed bytes with the payload of the message.
-        """
-        return self._message.data()
-
-    def value(self):
-        """
-        Returns object with the de-serialized version of the message content
-        """
-        return self._schema.decode(self._message.data())
-
-    def properties(self):
-        """
-        Return the properties attached to the message. Properties are
-        application-defined key/value pairs that will be attached to the
-        message.
-        """
-        return self._message.properties()
-
-    def partition_key(self):
-        """
-        Get the partitioning key for the message.
-        """
-        return self._message.partition_key()
-
-    def publish_timestamp(self):
-        """
-        Get the timestamp in milliseconds with the message publish time.
-        """
-        return self._message.publish_timestamp()
-
-    def event_timestamp(self):
-        """
-        Get the timestamp in milliseconds with the message event time.
-        """
-        return self._message.event_timestamp()
-
-    def message_id(self):
-        """
-        The message ID that can be used to refere to this particular message.
-        """
-        return self._message.message_id()
-
-    def topic_name(self):
-        """
-        Get the topic Name from which this message originated from
-        """
-        return self._message.topic_name()
-
-    def redelivery_count(self):
-        """
-        Get the redelivery count for this message
-        """
-        return self._message.redelivery_count()
-
-    def schema_version(self):
-        """
-        Get the schema version for this message
-        """
-        return self._message.schema_version()
-
-    @staticmethod
-    def _wrap(_message):
-        self = Message()
-        self._message = _message
-        return self
-
-
-class MessageBatch:
-
-    def __init__(self):
-        self._msg_batch = _pulsar.MessageBatch()
-
-    def with_message_id(self, msg_id):
-        if not isinstance(msg_id, _pulsar.MessageId):
-            if isinstance(msg_id, MessageId):
-                msg_id = msg_id._msg_id
-            else:
-                raise TypeError("unknown message id type")
-        self._msg_batch.with_message_id(msg_id)
-        return self
-
-    def parse_from(self, data, size):
-        self._msg_batch.parse_from(data, size)
-        _msgs = self._msg_batch.messages()
-        return list(map(Message._wrap, _msgs))
-
-
-class Authentication:
-    """
-    Authentication provider object. Used to load authentication from an external
-    shared library.
-    """
-    def __init__(self, dynamicLibPath, authParamsString):
-        """
-        Create the authentication provider instance.
-
-        **Args**
-
-        * `dynamicLibPath`: Path to the authentication provider shared library
-          (such as `tls.so`)
-        * `authParamsString`: Comma-separated list of provider-specific
-          configuration params
-        """
-        _check_type(str, dynamicLibPath, 'dynamicLibPath')
-        _check_type(str, authParamsString, 'authParamsString')
-        self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
-
-
-class AuthenticationTLS(Authentication):
-    """
-    TLS Authentication implementation
-    """
-    def __init__(self, certificate_path, private_key_path):
-        """
-        Create the TLS authentication provider instance.
-
-        **Args**
-
-        * `certificatePath`: Path to the public certificate
-        * `privateKeyPath`: Path to private TLS key
-        """
-        _check_type(str, certificate_path, 'certificate_path')
-        _check_type(str, private_key_path, 'private_key_path')
-        self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
-
-
-class AuthenticationToken(Authentication):
-    """
-    Token based authentication implementation
-    """
-    def __init__(self, token):
-        """
-        Create the token authentication provider instance.
-
-        **Args**
-
-        * `token`: A string containing the token or a functions that provides a
-                   string with the token
-        """
-        if not (isinstance(token, str) or callable(token)):
-            raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
-        self.auth = _pulsar.AuthenticationToken(token)
-
-
-class AuthenticationAthenz(Authentication):
-    """
-    Athenz Authentication implementation
-    """
-    def __init__(self, auth_params_string):
-        """
-        Create the Athenz authentication provider instance.
-
-        **Args**
-
-        * `auth_params_string`: JSON encoded configuration for Athenz client
-        """
-        _check_type(str, auth_params_string, 'auth_params_string')
-        self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
-
-class AuthenticationOauth2(Authentication):
-    """
-    Oauth2 Authentication implementation
-    """
-    def __init__(self, auth_params_string):
-        """
-        Create the Oauth2 authentication provider instance.
-
-        **Args**
-
-        * `auth_params_string`: JSON encoded configuration for Oauth2 client
-        """
-        _check_type(str, auth_params_string, 'auth_params_string')
-        self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
-
-class Client:
-    """
-    The Pulsar client. A single client instance can be used to create producers
-    and consumers on multiple topics.
-
-    The client will share the same connection pool and threads across all
-    producers and consumers.
-    """
-
-    def __init__(self, service_url,
-                 authentication=None,
-                 operation_timeout_seconds=30,
-                 io_threads=1,
-                 message_listener_threads=1,
-                 concurrent_lookup_requests=50000,
-                 log_conf_file_path=None,
-                 use_tls=False,
-                 tls_trust_certs_file_path=None,
-                 tls_allow_insecure_connection=False,
-                 tls_validate_hostname=False,
-                 ):
-        """
-        Create a new Pulsar client instance.
-
-        **Args**
-
-        * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
-
-        **Options**
-
-        * `authentication`:
-          Set the authentication provider to be used with the broker. For example:
-          `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
-        * `operation_timeout_seconds`:
-          Set timeout on client operations (subscribe, create producer, close,
-          unsubscribe).
-        * `io_threads`:
-          Set the number of IO threads to be used by the Pulsar client.
-        * `message_listener_threads`:
-          Set the number of threads to be used by the Pulsar client when
-          delivering messages through message listener. The default is 1 thread
-          per Pulsar client. If using more than 1 thread, messages for distinct
-          `message_listener`s will be delivered in different threads, however a
-          single `MessageListener` will always be assigned to the same thread.
-        * `concurrent_lookup_requests`:
-          Number of concurrent lookup-requests allowed on each broker connection
-          to prevent overload on the broker.
-        * `log_conf_file_path`:
-          Initialize log4cxx from a configuration file.
-        * `use_tls`:
-          Configure whether to use TLS encryption on the connection. This setting
-          is deprecated. TLS will be automatically enabled if the `serviceUrl` is
-          set to `pulsar+ssl://` or `https://`
-        * `tls_trust_certs_file_path`:
-          Set the path to the trusted TLS certificate file. If empty defaults to
-          certifi.
-        * `tls_allow_insecure_connection`:
-          Configure whether the Pulsar client accepts untrusted TLS certificates
-          from the broker.
-        * `tls_validate_hostname`:
-          Configure whether the Pulsar client validates that the hostname of the
-          endpoint, matches the common name on the TLS certificate presented by
-          the endpoint.
-        """
-        _check_type(str, service_url, 'service_url')
-        _check_type_or_none(Authentication, authentication, 'authentication')
-        _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
-        _check_type(int, io_threads, 'io_threads')
-        _check_type(int, message_listener_threads, 'message_listener_threads')
-        _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
-        _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
-        _check_type(bool, use_tls, 'use_tls')
-        _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
-        _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
-        _check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
-
-        conf = _pulsar.ClientConfiguration()
-        if authentication:
-            conf.authentication(authentication.auth)
-        conf.operation_timeout_seconds(operation_timeout_seconds)
-        conf.io_threads(io_threads)
-        conf.message_listener_threads(message_listener_threads)
-        conf.concurrent_lookup_requests(concurrent_lookup_requests)
-        if log_conf_file_path:
-            conf.log_conf_file_path(log_conf_file_path)
-        if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
-            conf.use_tls(True)
-        if tls_trust_certs_file_path:
-            conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
-        else:
-            conf.tls_trust_certs_file_path(certifi.where())
-        conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
-        conf.tls_validate_hostname(tls_validate_hostname)
-        self._client = _pulsar.Client(service_url, conf)
-        self._consumers = []
-
-    def create_producer(self, topic,
-                        producer_name=None,
-                        schema=schema.BytesSchema(),
-                        initial_sequence_id=None,
-                        send_timeout_millis=30000,
-                        compression_type=CompressionType.NONE,
-                        max_pending_messages=1000,
-                        max_pending_messages_across_partitions=50000,
-                        block_if_queue_full=False,
-                        batching_enabled=False,
-                        batching_max_messages=1000,
-                        batching_max_allowed_size_in_bytes=128*1024,
-                        batching_max_publish_delay_ms=10,
-                        message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
-                        properties=None,
-                        batching_type=BatchingType.Default,
-                        encryption_key=None,
-                        crypto_key_reader=None
-                        ):
-        """
-        Create a new producer on a given topic.
-
-        **Args**
-
-        * `topic`:
-          The topic name
-
-        **Options**
-
-        * `producer_name`:
-           Specify a name for the producer. If not assigned,
-           the system will generate a globally unique name which can be accessed
-           with `Producer.producer_name()`. When specifying a name, it is app to
-           the user to ensure that, for a given topic, the producer name is unique
-           across all Pulsar's clusters.
-        * `schema`:
-           Define the schema of the data that will be published by this producer.
-           The schema will be used for two purposes:
-             - Validate the data format against the topic defined schema
-             - Perform serialization/deserialization between data and objects
-           An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
-        * `initial_sequence_id`:
-           Set the baseline for the sequence ids for messages
-           published by the producer. First message will be using
-           `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
-           be assigned incremental sequence ids, if not otherwise specified.
-        * `send_timeout_millis`:
-          If a message is not acknowledged by the server before the
-          `send_timeout` expires, an error will be reported.
-        * `compression_type`:
-          Set the compression type for the producer. By default, message
-          payloads are not compressed. Supported compression types are
-          `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
-          ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
-          release in order to be able to receive messages compressed with ZSTD.
-          SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
-          release in order to be able to receive messages compressed with SNAPPY.
-        * `max_pending_messages`:
-          Set the max size of the queue holding the messages pending to receive
-          an acknowledgment from the broker.
-        * `max_pending_messages_across_partitions`:
-          Set the max size of the queue holding the messages pending to receive
-          an acknowledgment across partitions from the broker.
-        * `block_if_queue_full`: Set whether `send_async` operations should
-          block when the outgoing message queue is full.
-        * `message_routing_mode`:
-          Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
-          other option is `PartitionsRoutingMode.UseSinglePartition`
-        * `properties`:
-          Sets the properties for the producer. The properties associated with a producer
-          can be used for identify a producer at broker side.
-        * `batching_type`:
-          Sets the batching type for the producer.
-          There are two batching type: DefaultBatching and KeyBasedBatching.
-            - Default batching
-            incoming single messages:
-            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
-            batched into single batch message:
-            [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
-
-            - KeyBasedBatching
-            incoming single messages:
-            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
-            batched into single batch message:
-            [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
-        * encryption_key:
-           The key used for symmetric encryption, configured on the producer side
-        * crypto_key_reader:
-           Symmetric encryption class implementation, configuring public key encryption messages for the producer
-           and private key decryption messages for the consumer
-        """
-        _check_type(str, topic, 'topic')
-        _check_type_or_none(str, producer_name, 'producer_name')
-        _check_type(_schema.Schema, schema, 'schema')
-        _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
-        _check_type(int, send_timeout_millis, 'send_timeout_millis')
-        _check_type(CompressionType, compression_type, 'compression_type')
-        _check_type(int, max_pending_messages, 'max_pending_messages')
-        _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
-        _check_type(bool, block_if_queue_full, 'block_if_queue_full')
-        _check_type(bool, batching_enabled, 'batching_enabled')
-        _check_type(int, batching_max_messages, 'batching_max_messages')
-        _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
-        _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
-        _check_type_or_none(dict, properties, 'properties')
-        _check_type(BatchingType, batching_type, 'batching_type')
-        _check_type_or_none(str, encryption_key, 'encryption_key')
-        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
-
-        conf = _pulsar.ProducerConfiguration()
-        conf.send_timeout_millis(send_timeout_millis)
-        conf.compression_type(compression_type)
-        conf.max_pending_messages(max_pending_messages)
-        conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
-        conf.block_if_queue_full(block_if_queue_full)
-        conf.batching_enabled(batching_enabled)
-        conf.batching_max_messages(batching_max_messages)
-        conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
-        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
-        conf.partitions_routing_mode(message_routing_mode)
-        conf.batching_type(batching_type)
-        if producer_name:
-            conf.producer_name(producer_name)
-        if initial_sequence_id:
-            conf.initial_sequence_id(initial_sequence_id)
-        if properties:
-            for k, v in properties.items():
-                conf.property(k, v)
-
-        conf.schema(schema.schema_info())
-        if encryption_key:
-            conf.encryption_key(encryption_key)
-        if crypto_key_reader:
-            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-
-        p = Producer()
-        p._producer = self._client.create_producer(topic, conf)
-        p._schema = schema
-        return p
-
-    def subscribe(self, topic, subscription_name,
-                  consumer_type=ConsumerType.Exclusive,
-                  schema=schema.BytesSchema(),
-                  message_listener=None,
-                  receiver_queue_size=1000,
-                  max_total_receiver_queue_size_across_partitions=50000,
-                  consumer_name=None,
-                  unacked_messages_timeout_ms=None,
-                  broker_consumer_stats_cache_time_ms=30000,
-                  negative_ack_redelivery_delay_ms=60000,
-                  is_read_compacted=False,
-                  properties=None,
-                  pattern_auto_discovery_period=60,
-                  initial_position=InitialPosition.Latest,
-                  crypto_key_reader=None
-                  ):
-        """
-        Subscribe to the given topic and subscription combination.
-
-        **Args**
-
-        * `topic`: The name of the topic, list of topics or regex pattern.
-                  This method will accept these forms:
-                    - `topic='my-topic'`
-                    - `topic=['topic-1', 'topic-2', 'topic-3']`
-                    - `topic=re.compile('persistent://public/default/topic-*')`
-        * `subscription`: The name of the subscription.
-
-        **Options**
-
-        * `consumer_type`:
-          Select the subscription type to be used when subscribing to the topic.
-        * `schema`:
-           Define the schema of the data that will be received by this consumer.
-        * `message_listener`:
-          Sets a message listener for the consumer. When the listener is set,
-          the application will receive messages through it. Calls to
-          `consumer.receive()` will not be allowed. The listener function needs
-          to accept (consumer, message), for example:
-
-                #!python
-                def my_listener(consumer, message):
-                    # process message
-                    consumer.acknowledge(message)
-
-        * `receiver_queue_size`:
-          Sets the size of the consumer receive queue. The consumer receive
-          queue controls how many messages can be accumulated by the consumer
-          before the application calls `receive()`. Using a higher value could
-          potentially increase the consumer throughput at the expense of higher
-          memory utilization. Setting the consumer queue size to zero decreases
-          the throughput of the consumer by disabling pre-fetching of messages.
-          This approach improves the message distribution on shared subscription
-          by pushing messages only to those consumers that are ready to process
-          them. Neither receive with timeout nor partitioned topics can be used
-          if the consumer queue size is zero. The `receive()` function call
-          should not be interrupted when the consumer queue size is zero. The
-          default value is 1000 messages and should work well for most use
-          cases.
-        * `max_total_receiver_queue_size_across_partitions`
-          Set the max total receiver queue size across partitions.
-          This setting will be used to reduce the receiver queue size for individual partitions
-        * `consumer_name`:
-          Sets the consumer name.
-        * `unacked_messages_timeout_ms`:
-          Sets the timeout in milliseconds for unacknowledged messages. The
-          timeout needs to be greater than 10 seconds. An exception is thrown if
-          the given value is less than 10 seconds. If a successful
-          acknowledgement is not sent within the timeout, all the unacknowledged
-          messages are redelivered.
-        * `negative_ack_redelivery_delay_ms`:
-           The delay after which to redeliver the messages that failed to be
-           processed (with the `consumer.negative_acknowledge()`)
-        * `broker_consumer_stats_cache_time_ms`:
-          Sets the time duration for which the broker-side consumer stats will
-          be cached in the client.
-        * `is_read_compacted`:
-          Selects whether to read the compacted version of the topic
-        * `properties`:
-          Sets the properties for the consumer. The properties associated with a consumer
-          can be used for identify a consumer at broker side.
-        * `pattern_auto_discovery_period`:
-          Periods of seconds for consumer to auto discover match topics.
-        * `initial_position`:
-          Set the initial position of a consumer  when subscribing to the topic.
-          It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
-          Default: `Latest`.
-        * crypto_key_reader:
-           Symmetric encryption class implementation, configuring public key encryption messages for the producer
-           and private key decryption messages for the consumer
-        """
-        _check_type(str, subscription_name, 'subscription_name')
-        _check_type(ConsumerType, consumer_type, 'consumer_type')
-        _check_type(_schema.Schema, schema, 'schema')
-        _check_type(int, receiver_queue_size, 'receiver_queue_size')
-        _check_type(int, max_total_receiver_queue_size_across_partitions,
-                    'max_total_receiver_queue_size_across_partitions')
-        _check_type_or_none(str, consumer_name, 'consumer_name')
-        _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
-        _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
-        _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
-        _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
-        _check_type(bool, is_read_compacted, 'is_read_compacted')
-        _check_type_or_none(dict, properties, 'properties')
-        _check_type(InitialPosition, initial_position, 'initial_position')
-        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
-
-        conf = _pulsar.ConsumerConfiguration()
-        conf.consumer_type(consumer_type)
-        conf.read_compacted(is_read_compacted)
-        if message_listener:
-            conf.message_listener(_listener_wrapper(message_listener, schema))
-        conf.receiver_queue_size(receiver_queue_size)
-        conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
-        if consumer_name:
-            conf.consumer_name(consumer_name)
-        if unacked_messages_timeout_ms:
-            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-
-        conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
-        conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
-        if properties:
-            for k, v in properties.items():
-                conf.property(k, v)
-        conf.subscription_initial_position(initial_position)
-
-        conf.schema(schema.schema_info())
-
-        if crypto_key_reader:
-            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-
-        c = Consumer()
-        if isinstance(topic, str):
-            # Single topic
-            c._consumer = self._client.subscribe(topic, subscription_name, conf)
-        elif isinstance(topic, list):
-            # List of topics
-            c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
-        elif isinstance(topic, _retype):
-            # Regex pattern
-            c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
-        else:
-            raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
-
-        c._client = self
-        c._schema = schema
-        self._consumers.append(c)
-        return c
-
-    def create_reader(self, topic, start_message_id,
-                      schema=schema.BytesSchema(),
-                      reader_listener=None,
-                      receiver_queue_size=1000,
-                      reader_name=None,
-                      subscription_role_prefix=None,
-                      is_read_compacted=False
-                      ):
-        """
-        Create a reader on a particular topic
-
-        **Args**
-
-        * `topic`: The name of the topic.
-        * `start_message_id`: The initial reader positioning is done by specifying a message id.
-           The options are:
-            * `MessageId.earliest`: Start reading from the earliest message available in the topic
-            * `MessageId.latest`: Start reading from the end topic, only getting messages published
-               after the reader was created
-            * `MessageId`: When passing a particular message id, the reader will position itself on
-               that specific position. The first message to be read will be the message next to the
-               specified messageId. Message id can be serialized into a string and deserialized
-               back into a `MessageId` object:
-
-                   # Serialize to string
-                   s = msg.message_id().serialize()
-
-                   # Deserialize from string
-                   msg_id = MessageId.deserialize(s)
-
-        **Options**
-
-        * `schema`:
-           Define the schema of the data that will be received by this reader.
-        * `reader_listener`:
-          Sets a message listener for the reader. When the listener is set,
-          the application will receive messages through it. Calls to
-          `reader.read_next()` will not be allowed. The listener function needs
-          to accept (reader, message), for example:
-
-                def my_listener(reader, message):
-                    # process message
-                    pass
-
-        * `receiver_queue_size`:
-          Sets the size of the reader receive queue. The reader receive
-          queue controls how many messages can be accumulated by the reader
-          before the application calls `read_next()`. Using a higher value could
-          potentially increase the reader throughput at the expense of higher
-          memory utilization.
-        * `reader_name`:
-          Sets the reader name.
-        * `subscription_role_prefix`:
-          Sets the subscription role prefix.
-        * `is_read_compacted`:
-          Selects whether to read the compacted version of the topic
-        """
-        _check_type(str, topic, 'topic')
-        _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
-        _check_type(_schema.Schema, schema, 'schema')
-        _check_type(int, receiver_queue_size, 'receiver_queue_size')
-        _check_type_or_none(str, reader_name, 'reader_name')
-        _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
-        _check_type(bool, is_read_compacted, 'is_read_compacted')
-
-        conf = _pulsar.ReaderConfiguration()
-        if reader_listener:
-            conf.reader_listener(_listener_wrapper(reader_listener, schema))
-        conf.receiver_queue_size(receiver_queue_size)
-        if reader_name:
-            conf.reader_name(reader_name)
-        if subscription_role_prefix:
-            conf.subscription_role_prefix(subscription_role_prefix)
-        conf.schema(schema.schema_info())
-        conf.read_compacted(is_read_compacted)
-
-        c = Reader()
-        c._reader = self._client.create_reader(topic, start_message_id, conf)
-        c._client = self
-        c._schema = schema
-        self._consumers.append(c)
-        return c
-
-    def get_topic_partitions(self, topic):
-        """
-        Get the list of partitions for a given topic.
-
-        If the topic is partitioned, this will return a list of partition names. If the topic is not
-        partitioned, the returned list will contain the topic name itself.
-
-        This can be used to discover the partitions and create Reader, Consumer or Producer
-        instances directly on a particular partition.
-        :param topic: the topic name to lookup
-        :return: a list of partition name
-        """
-        _check_type(str, topic, 'topic')
-        return self._client.get_topic_partitions(topic)
-
-    def close(self):
-        """
-        Close the client and all the associated producers and consumers
-        """
-        self._client.close()
-
-
-class Producer:
-    """
-    The Pulsar message producer, used to publish messages on a topic.
-    """
-
-    def topic(self):
-        """
-        Return the topic which producer is publishing to
-        """
-        return self._producer.topic()
-
-    def producer_name(self):
-        """
-        Return the producer name which could have been assigned by the
-        system or specified by the client
-        """
-        return self._producer.producer_name()
-
-    def last_sequence_id(self):
-        """
-        Get the last sequence id that was published by this producer.
-
-        This represent either the automatically assigned or custom sequence id
-        (set on the `MessageBuilder`) that was published and acknowledged by the broker.
-
-        After recreating a producer with the same producer name, this will return the
-        last message that was published in the previous producer session, or -1 if
-        there no message was ever published.
-        """
-        return self._producer.last_sequence_id()
-
-    def send(self, content,
-             properties=None,
-             partition_key=None,
-             sequence_id=None,
-             replication_clusters=None,
-             disable_replication=False,
-             event_timestamp=None,
-             deliver_at=None,
-             deliver_after=None,
-             ):
-        """
-        Publish a message on the topic. Blocks until the message is acknowledged
-
-        Returns a `MessageId` object that represents where the message is persisted.
-
-        **Args**
-
-        * `content`:
-          A `bytes` object with the message payload.
-
-        **Options**
-
-        * `properties`:
-          A dict of application-defined string properties.
-        * `partition_key`:
-          Sets the partition key for message routing. A hash of this key is used
-          to determine the message's topic partition.
-        * `sequence_id`:
-          Specify a custom sequence id for the message being published.
-        * `replication_clusters`:
-          Override namespace replication clusters. Note that it is the caller's
-          responsibility to provide valid cluster names and that all clusters
-          have been previously configured as topics. Given an empty list,
-          the message will replicate according to the namespace configuration.
-        * `disable_replication`:
-          Do not replicate this message.
-        * `event_timestamp`:
-          Timestamp in millis of the timestamp of event creation
-        * `deliver_at`:
-          Specify the this message should not be delivered earlier than the
-          specified timestamp.
-          The timestamp is milliseconds and based on UTC
-        * `deliver_after`:
-          Specify a delay in timedelta for the delivery of the messages.
-
-        """
-        msg = self._build_msg(content, properties, partition_key, sequence_id,
-                              replication_clusters, disable_replication, event_timestamp,
-                              deliver_at, deliver_after)
-        return MessageId.deserialize(self._producer.send(msg))
-
-    def send_async(self, content, callback,
-                   properties=None,
-                   partition_key=None,
-                   sequence_id=None,
-                   replication_clusters=None,
-                   disable_replication=False,
-                   event_timestamp=None,
-                   deliver_at=None,
-                   deliver_after=None,
-                   ):
-        """
-        Send a message asynchronously.
-
-        The `callback` will be invoked once the message has been acknowledged
-        by the broker.
-
-        Example:
-
-            #!python
-            def callback(res, msg_id):
-                print('Message published: %s' % res)
-
-            producer.send_async(msg, callback)
-
-        When the producer queue is full, by default the message will be rejected
-        and the callback invoked with an error code.
-
-        **Args**
-
-        * `content`:
-          A `bytes` object with the message payload.
-
-        **Options**
-
-        * `properties`:
-          A dict of application0-defined string properties.
-        * `partition_key`:
-          Sets the partition key for the message routing. A hash of this key is
-          used to determine the message's topic partition.
-        * `sequence_id`:
-          Specify a custom sequence id for the message being published.
-        * `replication_clusters`: Override namespace replication clusters. Note
-          that it is the caller's responsibility to provide valid cluster names
-          and that all clusters have been previously configured as topics.
-          Given an empty list, the message will replicate per the namespace
-          configuration.
-        * `disable_replication`:
-          Do not replicate this message.
-        * `event_timestamp`:
-          Timestamp in millis of the timestamp of event creation
-        * `deliver_at`:
-          Specify the this message should not be delivered earlier than the
-          specified timestamp.
-          The timestamp is milliseconds and based on UTC
-        * `deliver_after`:
-          Specify a delay in timedelta for the delivery of the messages.
-        """
-        msg = self._build_msg(content, properties, partition_key, sequence_id,
-                              replication_clusters, disable_replication, event_timestamp,
-                              deliver_at, deliver_after)
-        self._producer.send_async(msg, callback)
-
-
-    def flush(self):
-        """
-        Flush all the messages buffered in the client and wait until all messages have been
-        successfully persisted
-        """
-        self._producer.flush()
-
-
-    def close(self):
-        """
-        Close the producer.
-        """
-        self._producer.close()
-
-    def _build_msg(self, content, properties, partition_key, sequence_id,
-                   replication_clusters, disable_replication, event_timestamp,
-                   deliver_at, deliver_after):
-        data = self._schema.encode(content)
-
-        _check_type(bytes, data, 'data')
-        _check_type_or_none(dict, properties, 'properties')
-        _check_type_or_none(str, partition_key, 'partition_key')
-        _check_type_or_none(int, sequence_id, 'sequence_id')
-        _check_type_or_none(list, replication_clusters, 'replication_clusters')
-        _check_type(bool, disable_replication, 'disable_replication')
-        _check_type_or_none(int, event_timestamp, 'event_timestamp')
-        _check_type_or_none(int, deliver_at, 'deliver_at')
-        _check_type_or_none(timedelta, deliver_after, 'deliver_after')
-
-        mb = _pulsar.MessageBuilder()
-        mb.content(data)
-        if properties:
-            for k, v in properties.items():
-                mb.property(k, v)
-        if partition_key:
-            mb.partition_key(partition_key)
-        if sequence_id:
-            mb.sequence_id(sequence_id)
-        if replication_clusters:
-            mb.replication_clusters(replication_clusters)
-        if disable_replication:
-            mb.disable_replication(disable_replication)
-        if event_timestamp:
-            mb.event_timestamp(event_timestamp)
-        if deliver_at:
-            mb.deliver_at(deliver_at)
-        if deliver_after:
-            mb.deliver_after(deliver_after)
-
-        return mb.build()
-
-
-class Consumer:
-    """
-    Pulsar consumer.
-    """
-
-    def topic(self):
-        """
-        Return the topic this consumer is subscribed to.
-        """
-        return self._consumer.topic()
-
-    def subscription_name(self):
-        """
-        Return the subscription name.
-        """
-        return self._consumer.subscription_name()
-
-    def unsubscribe(self):
-        """
-        Unsubscribe the current consumer from the topic.
-
-        This method will block until the operation is completed. Once the
-        consumer is unsubscribed, no more messages will be received and
-        subsequent new messages will not be retained for this consumer.
-
-        This consumer object cannot be reused.
-        """
-        return self._consumer.unsubscribe()
-
-    def receive(self, timeout_millis=None):
-        """
-        Receive a single message.
-
-        If a message is not immediately available, this method will block until
-        a new message is available.
-
-        **Options**
-
-        * `timeout_millis`:
-          If specified, the receive will raise an exception if a message is not
-          available within the timeout.
-        """
-        if timeout_millis is None:
-            msg = self._consumer.receive()
-        else:
-            _check_type(int, timeout_millis, 'timeout_millis')
-            msg = self._consumer.receive(timeout_millis)
-
-        m = Message()
-        m._message = msg
-        m._schema = self._schema
-        return m
-
-    def acknowledge(self, message):
-        """
-        Acknowledge the reception of a single message.
-
-        This method will block until an acknowledgement is sent to the broker.
-        After that, the message will not be re-delivered to this consumer.
-
-        **Args**
-
-        * `message`:
-          The received message or message id.
-        """
-        if isinstance(message, Message):
-            self._consumer.acknowledge(message._message)
-        else:
-            self._consumer.acknowledge(message)
-
-    def acknowledge_cumulative(self, message):
-        """
-        Acknowledge the reception of all the messages in the stream up to (and
-        including) the provided message.
-
-        This method will block until an acknowledgement is sent to the broker.
-        After that, the messages will not be re-delivered to this consumer.
-
-        **Args**
-
-        * `message`:
-          The received message or message id.
-        """
-        if isinstance(message, Message):
-            self._consumer.acknowledge_cumulative(message._message)
-        else:
-            self._consumer.acknowledge_cumulative(message)
-
-    def negative_acknowledge(self, message):
-        """
-        Acknowledge the failure to process a single message.
-
-        When a message is "negatively acked" it will be marked for redelivery after
-        some fixed delay. The delay is configurable when constructing the consumer
-        with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
-
-        This call is not blocking.
-
-        **Args**
-
-        * `message`:
-          The received message or message id.
-        """
-        if isinstance(message, Message):
-            self._consumer.negative_acknowledge(message._message)
-        else:
-            self._consumer.negative_acknowledge(message)
-
-    def pause_message_listener(self):
-        """
-        Pause receiving messages via the `message_listener` until
-        `resume_message_listener()` is called.
-        """
-        self._consumer.pause_message_listener()
-
-    def resume_message_listener(self):
-        """
-        Resume receiving the messages via the message listener.
-        Asynchronously receive all the messages enqueued from the time
-        `pause_message_listener()` was called.
-        """
-        self._consumer.resume_message_listener()
-
-    def redeliver_unacknowledged_messages(self):
-        """
-        Redelivers all the unacknowledged messages. In failover mode, the
-        request is ignored if the consumer is not active for the given topic. In
-        shared mode, the consumer's messages to be redelivered are distributed
-        across all the connected consumers. This is a non-blocking call and
-        doesn't throw an exception. In case the connection breaks, the messages
-        are redelivered after reconnect.
-        """
-        self._consumer.redeliver_unacknowledged_messages()
-
-    def seek(self, messageid):
-        """
-        Reset the subscription associated with this consumer to a specific message id or publish timestamp.
-        The message id can either be a specific message or represent the first or last messages in the topic.
-        Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
-        seek() on the individual partitions.
-
-        **Args**
-
-        * `message`:
-          The message id for seek, OR an integer event time to seek to
-        """
-        self._consumer.seek(messageid)
-
-    def close(self):
-        """
-        Close the consumer.
-        """
-        self._consumer.close()
-        self._client._consumers.remove(self)
-
-
-class Reader:
-    """
-    Pulsar topic reader.
-    """
-
-    def topic(self):
-        """
-        Return the topic this reader is reading from.
-        """
-        return self._reader.topic()
-
-    def read_next(self, timeout_millis=None):
-        """
-        Read a single message.
-
-        If a message is not immediately available, this method will block until
-        a new message is available.
-
-        **Options**
-
-        * `timeout_millis`:
-          If specified, the receive will raise an exception if a message is not
-          available within the timeout.
-        """
-        if timeout_millis is None:
-            msg = self._reader.read_next()
-        else:
-            _check_type(int, timeout_millis, 'timeout_millis')
-            msg = self._reader.read_next(timeout_millis)
-
-        m = Message()
-        m._message = msg
-        m._schema = self._schema
-        return m
-
-    def has_message_available(self):
-        """
-        Check if there is any message available to read from the current position.
-        """
-        return self._reader.has_message_available();
-
-    def seek(self, messageid):
-        """
-        Reset this reader to a specific message id or publish timestamp.
-        The message id can either be a specific message or represent the first or last messages in the topic.
-        Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
-        seek() on the individual partitions.
-
-        **Args**
-
-        * `message`:
-          The message id for seek, OR an integer event time to seek to
-        """
-        self._reader.seek(messageid)
-
-    def close(self):
-        """
-        Close the reader.
-        """
-        self._reader.close()
-        self._client._consumers.remove(self)
-
-class CryptoKeyReader:
-    """
-    Default crypto key reader implementation
-    """
-    def __init__(self, public_key_path, private_key_path):
-        """
-        Create crypto key reader.
-
-        **Args**
-
-        * `public_key_path`: Path to the public key
-        * `private_key_path`: Path to private key
-        """
-        _check_type(str, public_key_path, 'public_key_path')
-        _check_type(str, private_key_path, 'private_key_path')
-        self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
-
-def _check_type(var_type, var, name):
-    if not isinstance(var, var_type):
-        raise ValueError("Argument %s is expected to be of type '%s' and not '%s'"
-                         % (name, var_type.__name__, type(var).__name__))
-
-
-def _check_type_or_none(var_type, var, name):
-    if var is not None and not isinstance(var, var_type):
-        raise ValueError("Argument %s is expected to be either None or of type '%s'"
-                         % (name, var_type.__name__))
-
-
-def _listener_wrapper(listener, schema):
-    def wrapper(consumer, msg):
-        c = Consumer()
-        c._consumer = consumer
-        m = Message()
-        m._message = msg
-        m._schema = schema
-        listener(c, m)
-    return wrapper
-</code></pre>
-  </div>
-
-  </header>
-
-  <section id="section-items">
-
-
-    <h2 class="section-title" id="header-classes">Classes</h2>
-      
-      <div class="item">
-      <p id="pulsar.Authentication" class="name">class <span class="ident">Authentication</span></p>
-      
-  
-    <div class="desc"><p>Authentication provider object. Used to load authentication from an external
-shared library.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Authentication', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Authentication" class="source">
-    <pre><code>class Authentication:
-    """
-    Authentication provider object. Used to load authentication from an external
-    shared library.
-    """
-    def __init__(self, dynamicLibPath, authParamsString):
-        """
-        Create the authentication provider instance.
-
-        **Args**
-
-        * `dynamicLibPath`: Path to the authentication provider shared library
-          (such as `tls.so`)
-        * `authParamsString`: Comma-separated list of provider-specific
-          configuration params
-        """
-        _check_type(str, dynamicLibPath, 'dynamicLibPath')
-        _check_type(str, authParamsString, 'authParamsString')
-        self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.Authentication">Authentication</a></li>
-          </ul>
-          <h3>Instance variables</h3>
-            <div class="item">
-            <p id="pulsar.Authentication.auth" class="name">var <span class="ident">auth</span></p>
-            
-
-            
-  
-  <div class="source_cont">
-</div>
-
-            </div>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Authentication.__init__">
-    <p>def <span class="ident">__init__</span>(</p><p>self, dynamicLibPath, authParamsString)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Create the authentication provider instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>dynamicLibPath</code>: Path to the authentication provider shared library
-  (such as <code>tls.so</code>)</li>
-<li><code>authParamsString</code>: Comma-separated list of provider-specific
-  configuration params</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Authentication.__init__', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Authentication.__init__" class="source">
-    <pre><code>def __init__(self, dynamicLibPath, authParamsString):
-    """
-    Create the authentication provider instance.
-    **Args**
-    * `dynamicLibPath`: Path to the authentication provider shared library
-      (such as `tls.so`)
-    * `authParamsString`: Comma-separated list of provider-specific
-      configuration params
-    """
-    _check_type(str, dynamicLibPath, 'dynamicLibPath')
-    _check_type(str, authParamsString, 'authParamsString')
-    self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.AuthenticationAthenz" class="name">class <span class="ident">AuthenticationAthenz</span></p>
-      
-  
-    <div class="desc"><p>Athenz Authentication implementation</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationAthenz', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.AuthenticationAthenz" class="source">
-    <pre><code>class AuthenticationAthenz(Authentication):
-    """
-    Athenz Authentication implementation
-    """
-    def __init__(self, auth_params_string):
-        """
-        Create the Athenz authentication provider instance.
-
-        **Args**
-
-        * `auth_params_string`: JSON encoded configuration for Athenz client
-        """
-        _check_type(str, auth_params_string, 'auth_params_string')
-        self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.AuthenticationAthenz">AuthenticationAthenz</a></li>
-          <li><a href="#pulsar.Authentication">Authentication</a></li>
-          </ul>
-          <h3>Instance variables</h3>
-            <div class="item">
-            <p id="pulsar.AuthenticationAthenz.auth" class="name">var <span class="ident">auth</span></p>
-            
-    <p class="inheritance">
-     <strong>Inheritance:</strong>
-       <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
-    </p>
-
-            
-  
-  <div class="source_cont">
-</div>
-
-            </div>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.AuthenticationAthenz.__init__">
-    <p>def <span class="ident">__init__</span>(</p><p>self, auth_params_string)</p>
-    </div>
-    
-    <p class="inheritance">
-     <strong>Inheritance:</strong>
-       <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
-    </p>
-
-    
-  
-    <div class="desc"><p>Create the Athenz authentication provider instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>auth_params_string</code>: JSON encoded configuration for Athenz client</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationAthenz.__init__', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.AuthenticationAthenz.__init__" class="source">
-    <pre><code>def __init__(self, auth_params_string):
-    """
-    Create the Athenz authentication provider instance.
-    **Args**
-    * `auth_params_string`: JSON encoded configuration for Athenz client
-    """
-    _check_type(str, auth_params_string, 'auth_params_string')
-    self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.AuthenticationOauth2" class="name">class <span class="ident">AuthenticationOauth2</span></p>
-      
-  
-    <div class="desc"><p>Oauth2 Authentication implementation</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationOauth2', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.AuthenticationOauth2" class="source">
-    <pre><code>class AuthenticationOauth2(Authentication):
-    """
-    Oauth2 Authentication implementation
-    """
-    def __init__(self, auth_params_string):
-        """
-        Create the Oauth2 authentication provider instance.
-
-        **Args**
-
-        * `auth_params_string`: JSON encoded configuration for Oauth2 client
-        """
-        _check_type(str, auth_params_string, 'auth_params_string')
-        self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.AuthenticationOauth2">AuthenticationOauth2</a></li>
-          <li><a href="#pulsar.Authentication">Authentication</a></li>
-          </ul>
-          <h3>Instance variables</h3>
-            <div class="item">
-            <p id="pulsar.AuthenticationOauth2.auth" class="name">var <span class="ident">auth</span></p>
-            
-    <p class="inheritance">
-     <strong>Inheritance:</strong>
-       <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
-    </p>
-
-            
-  
-  <div class="source_cont">
-</div>
-
-            </div>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.AuthenticationOauth2.__init__">
-    <p>def <span class="ident">__init__</span>(</p><p>self, auth_params_string)</p>
-    </div>
-    
-    <p class="inheritance">
-     <strong>Inheritance:</strong>
-       <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
-    </p>
-
-    
-  
-    <div class="desc"><p>Create the Oauth2 authentication provider instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>auth_params_string</code>: JSON encoded configuration for Oauth2 client</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationOauth2.__init__', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.AuthenticationOauth2.__init__" class="source">
-    <pre><code>def __init__(self, auth_params_string):
-    """
-    Create the Oauth2 authentication provider instance.
-    **Args**
-    * `auth_params_string`: JSON encoded configuration for Oauth2 client
-    """
-    _check_type(str, auth_params_string, 'auth_params_string')
-    self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.AuthenticationTLS" class="name">class <span class="ident">AuthenticationTLS</span></p>
-      
-  
-    <div class="desc"><p>TLS Authentication implementation</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationTLS', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.AuthenticationTLS" class="source">
-    <pre><code>class AuthenticationTLS(Authentication):
-    """
-    TLS Authentication implementation
-    """
-    def __init__(self, certificate_path, private_key_path):
-        """
-        Create the TLS authentication provider instance.
-
-        **Args**
-
-        * `certificatePath`: Path to the public certificate
-        * `privateKeyPath`: Path to private TLS key
-        """
-        _check_type(str, certificate_path, 'certificate_path')
-        _check_type(str, private_key_path, 'private_key_path')
-        self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.AuthenticationTLS">AuthenticationTLS</a></li>
-          <li><a href="#pulsar.Authentication">Authentication</a></li>
-          </ul>
-          <h3>Instance variables</h3>
-            <div class="item">
-            <p id="pulsar.AuthenticationTLS.auth" class="name">var <span class="ident">auth</span></p>
-            
-    <p class="inheritance">
-     <strong>Inheritance:</strong>
-       <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
-    </p>
-
-            
-  
-  <div class="source_cont">
-</div>
-
-            </div>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.AuthenticationTLS.__init__">
-    <p>def <span class="ident">__init__</span>(</p><p>self, certificate_path, private_key_path)</p>
-    </div>
-    
-    <p class="inheritance">
-     <strong>Inheritance:</strong>
-       <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
-    </p>
-
-    
-  
-    <div class="desc"><p>Create the TLS authentication provider instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>certificatePath</code>: Path to the public certificate</li>
-<li><code>privateKeyPath</code>: Path to private TLS key</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationTLS.__init__', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.AuthenticationTLS.__init__" class="source">
-    <pre><code>def __init__(self, certificate_path, private_key_path):
-    """
-    Create the TLS authentication provider instance.
-    **Args**
-    * `certificatePath`: Path to the public certificate
-    * `privateKeyPath`: Path to private TLS key
-    """
-    _check_type(str, certificate_path, 'certificate_path')
-    _check_type(str, private_key_path, 'private_key_path')
-    self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.AuthenticationToken" class="name">class <span class="ident">AuthenticationToken</span></p>
-      
-  
-    <div class="desc"><p>Token based authentication implementation</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationToken', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.AuthenticationToken" class="source">
-    <pre><code>class AuthenticationToken(Authentication):
-    """
-    Token based authentication implementation
-    """
-    def __init__(self, token):
-        """
-        Create the token authentication provider instance.
-
-        **Args**
-
-        * `token`: A string containing the token or a functions that provides a
-                   string with the token
-        """
-        if not (isinstance(token, str) or callable(token)):
-            raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
-        self.auth = _pulsar.AuthenticationToken(token)
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.AuthenticationToken">AuthenticationToken</a></li>
-          <li><a href="#pulsar.Authentication">Authentication</a></li>
-          </ul>
-          <h3>Instance variables</h3>
-            <div class="item">
-            <p id="pulsar.AuthenticationToken.auth" class="name">var <span class="ident">auth</span></p>
-            
-    <p class="inheritance">
-     <strong>Inheritance:</strong>
-       <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
-    </p>
-
-            
-  
-  <div class="source_cont">
-</div>
-
-            </div>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.AuthenticationToken.__init__">
-    <p>def <span class="ident">__init__</span>(</p><p>self, token)</p>
-    </div>
-    
-    <p class="inheritance">
-     <strong>Inheritance:</strong>
-       <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
-    </p>
-
-    
-  
-    <div class="desc"><p>Create the token authentication provider instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>token</code>: A string containing the token or a functions that provides a
-           string with the token</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationToken.__init__', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.AuthenticationToken.__init__" class="source">
-    <pre><code>def __init__(self, token):
-    """
-    Create the token authentication provider instance.
-    **Args**
-    * `token`: A string containing the token or a functions that provides a
-               string with the token
-    """
-    if not (isinstance(token, str) or callable(token)):
-        raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
-    self.auth = _pulsar.AuthenticationToken(token)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.Client" class="name">class <span class="ident">Client</span></p>
-      
-  
-    <div class="desc"><p>The Pulsar client. A single client instance can be used to create producers
-and consumers on multiple topics.</p>
-<p>The client will share the same connection pool and threads across all
-producers and consumers.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Client" class="source">
-    <pre><code>class Client:
-    """
-    The Pulsar client. A single client instance can be used to create producers
-    and consumers on multiple topics.
-
-    The client will share the same connection pool and threads across all
-    producers and consumers.
-    """
-
-    def __init__(self, service_url,
-                 authentication=None,
-                 operation_timeout_seconds=30,
-                 io_threads=1,
-                 message_listener_threads=1,
-                 concurrent_lookup_requests=50000,
-                 log_conf_file_path=None,
-                 use_tls=False,
-                 tls_trust_certs_file_path=None,
-                 tls_allow_insecure_connection=False,
-                 tls_validate_hostname=False,
-                 ):
-        """
-        Create a new Pulsar client instance.
-
-        **Args**
-
-        * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
-
-        **Options**
-
-        * `authentication`:
-          Set the authentication provider to be used with the broker. For example:
-          `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
-        * `operation_timeout_seconds`:
-          Set timeout on client operations (subscribe, create producer, close,
-          unsubscribe).
-        * `io_threads`:
-          Set the number of IO threads to be used by the Pulsar client.
-        * `message_listener_threads`:
-          Set the number of threads to be used by the Pulsar client when
-          delivering messages through message listener. The default is 1 thread
-          per Pulsar client. If using more than 1 thread, messages for distinct
-          `message_listener`s will be delivered in different threads, however a
-          single `MessageListener` will always be assigned to the same thread.
-        * `concurrent_lookup_requests`:
-          Number of concurrent lookup-requests allowed on each broker connection
-          to prevent overload on the broker.
-        * `log_conf_file_path`:
-          Initialize log4cxx from a configuration file.
-        * `use_tls`:
-          Configure whether to use TLS encryption on the connection. This setting
-          is deprecated. TLS will be automatically enabled if the `serviceUrl` is
-          set to `pulsar+ssl://` or `https://`
-        * `tls_trust_certs_file_path`:
-          Set the path to the trusted TLS certificate file. If empty defaults to
-          certifi.
-        * `tls_allow_insecure_connection`:
-          Configure whether the Pulsar client accepts untrusted TLS certificates
-          from the broker.
-        * `tls_validate_hostname`:
-          Configure whether the Pulsar client validates that the hostname of the
-          endpoint, matches the common name on the TLS certificate presented by
-          the endpoint.
-        """
-        _check_type(str, service_url, 'service_url')
-        _check_type_or_none(Authentication, authentication, 'authentication')
-        _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
-        _check_type(int, io_threads, 'io_threads')
-        _check_type(int, message_listener_threads, 'message_listener_threads')
-        _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
-        _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
-        _check_type(bool, use_tls, 'use_tls')
-        _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
-        _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
-        _check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
-
-        conf = _pulsar.ClientConfiguration()
-        if authentication:
-            conf.authentication(authentication.auth)
-        conf.operation_timeout_seconds(operation_timeout_seconds)
-        conf.io_threads(io_threads)
-        conf.message_listener_threads(message_listener_threads)
-        conf.concurrent_lookup_requests(concurrent_lookup_requests)
-        if log_conf_file_path:
-            conf.log_conf_file_path(log_conf_file_path)
-        if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
-            conf.use_tls(True)
-        if tls_trust_certs_file_path:
-            conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
-        else:
-            conf.tls_trust_certs_file_path(certifi.where())
-        conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
-        conf.tls_validate_hostname(tls_validate_hostname)
-        self._client = _pulsar.Client(service_url, conf)
-        self._consumers = []
-
-    def create_producer(self, topic,
-                        producer_name=None,
-                        schema=schema.BytesSchema(),
-                        initial_sequence_id=None,
-                        send_timeout_millis=30000,
-                        compression_type=CompressionType.NONE,
-                        max_pending_messages=1000,
-                        max_pending_messages_across_partitions=50000,
-                        block_if_queue_full=False,
-                        batching_enabled=False,
-                        batching_max_messages=1000,
-                        batching_max_allowed_size_in_bytes=128*1024,
-                        batching_max_publish_delay_ms=10,
-                        message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
-                        properties=None,
-                        batching_type=BatchingType.Default,
-                        encryption_key=None,
-                        crypto_key_reader=None
-                        ):
-        """
-        Create a new producer on a given topic.
-
-        **Args**
-
-        * `topic`:
-          The topic name
-
-        **Options**
-
-        * `producer_name`:
-           Specify a name for the producer. If not assigned,
-           the system will generate a globally unique name which can be accessed
-           with `Producer.producer_name()`. When specifying a name, it is app to
-           the user to ensure that, for a given topic, the producer name is unique
-           across all Pulsar's clusters.
-        * `schema`:
-           Define the schema of the data that will be published by this producer.
-           The schema will be used for two purposes:
-             - Validate the data format against the topic defined schema
-             - Perform serialization/deserialization between data and objects
-           An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
-        * `initial_sequence_id`:
-           Set the baseline for the sequence ids for messages
-           published by the producer. First message will be using
-           `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
-           be assigned incremental sequence ids, if not otherwise specified.
-        * `send_timeout_millis`:
-          If a message is not acknowledged by the server before the
-          `send_timeout` expires, an error will be reported.
-        * `compression_type`:
-          Set the compression type for the producer. By default, message
-          payloads are not compressed. Supported compression types are
-          `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
-          ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
-          release in order to be able to receive messages compressed with ZSTD.
-          SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
-          release in order to be able to receive messages compressed with SNAPPY.
-        * `max_pending_messages`:
-          Set the max size of the queue holding the messages pending to receive
-          an acknowledgment from the broker.
-        * `max_pending_messages_across_partitions`:
-          Set the max size of the queue holding the messages pending to receive
-          an acknowledgment across partitions from the broker.
-        * `block_if_queue_full`: Set whether `send_async` operations should
-          block when the outgoing message queue is full.
-        * `message_routing_mode`:
-          Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
-          other option is `PartitionsRoutingMode.UseSinglePartition`
-        * `properties`:
-          Sets the properties for the producer. The properties associated with a producer
-          can be used for identify a producer at broker side.
-        * `batching_type`:
-          Sets the batching type for the producer.
-          There are two batching type: DefaultBatching and KeyBasedBatching.
-            - Default batching
-            incoming single messages:
-            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
-            batched into single batch message:
-            [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
-
-            - KeyBasedBatching
-            incoming single messages:
-            (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
-            batched into single batch message:
-            [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
-        * encryption_key:
-           The key used for symmetric encryption, configured on the producer side
-        * crypto_key_reader:
-           Symmetric encryption class implementation, configuring public key encryption messages for the producer
-           and private key decryption messages for the consumer
-        """
-        _check_type(str, topic, 'topic')
-        _check_type_or_none(str, producer_name, 'producer_name')
-        _check_type(_schema.Schema, schema, 'schema')
-        _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
-        _check_type(int, send_timeout_millis, 'send_timeout_millis')
-        _check_type(CompressionType, compression_type, 'compression_type')
-        _check_type(int, max_pending_messages, 'max_pending_messages')
-        _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
-        _check_type(bool, block_if_queue_full, 'block_if_queue_full')
-        _check_type(bool, batching_enabled, 'batching_enabled')
-        _check_type(int, batching_max_messages, 'batching_max_messages')
-        _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
-        _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
-        _check_type_or_none(dict, properties, 'properties')
-        _check_type(BatchingType, batching_type, 'batching_type')
-        _check_type_or_none(str, encryption_key, 'encryption_key')
-        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
-
-        conf = _pulsar.ProducerConfiguration()
-        conf.send_timeout_millis(send_timeout_millis)
-        conf.compression_type(compression_type)
-        conf.max_pending_messages(max_pending_messages)
-        conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
-        conf.block_if_queue_full(block_if_queue_full)
-        conf.batching_enabled(batching_enabled)
-        conf.batching_max_messages(batching_max_messages)
-        conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
-        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
-        conf.partitions_routing_mode(message_routing_mode)
-        conf.batching_type(batching_type)
-        if producer_name:
-            conf.producer_name(producer_name)
-        if initial_sequence_id:
-            conf.initial_sequence_id(initial_sequence_id)
-        if properties:
-            for k, v in properties.items():
-                conf.property(k, v)
-
-        conf.schema(schema.schema_info())
-        if encryption_key:
-            conf.encryption_key(encryption_key)
-        if crypto_key_reader:
-            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-
-        p = Producer()
-        p._producer = self._client.create_producer(topic, conf)
-        p._schema = schema
-        return p
-
-    def subscribe(self, topic, subscription_name,
-                  consumer_type=ConsumerType.Exclusive,
-                  schema=schema.BytesSchema(),
-                  message_listener=None,
-                  receiver_queue_size=1000,
-                  max_total_receiver_queue_size_across_partitions=50000,
-                  consumer_name=None,
-                  unacked_messages_timeout_ms=None,
-                  broker_consumer_stats_cache_time_ms=30000,
-                  negative_ack_redelivery_delay_ms=60000,
-                  is_read_compacted=False,
-                  properties=None,
-                  pattern_auto_discovery_period=60,
-                  initial_position=InitialPosition.Latest,
-                  crypto_key_reader=None
-                  ):
-        """
-        Subscribe to the given topic and subscription combination.
-
-        **Args**
-
-        * `topic`: The name of the topic, list of topics or regex pattern.
-                  This method will accept these forms:
-                    - `topic='my-topic'`
-                    - `topic=['topic-1', 'topic-2', 'topic-3']`
-                    - `topic=re.compile('persistent://public/default/topic-*')`
-        * `subscription`: The name of the subscription.
-
-        **Options**
-
-        * `consumer_type`:
-          Select the subscription type to be used when subscribing to the topic.
-        * `schema`:
-           Define the schema of the data that will be received by this consumer.
-        * `message_listener`:
-          Sets a message listener for the consumer. When the listener is set,
-          the application will receive messages through it. Calls to
-          `consumer.receive()` will not be allowed. The listener function needs
-          to accept (consumer, message), for example:
-
-                #!python
-                def my_listener(consumer, message):
-                    # process message
-                    consumer.acknowledge(message)
-
-        * `receiver_queue_size`:
-          Sets the size of the consumer receive queue. The consumer receive
-          queue controls how many messages can be accumulated by the consumer
-          before the application calls `receive()`. Using a higher value could
-          potentially increase the consumer throughput at the expense of higher
-          memory utilization. Setting the consumer queue size to zero decreases
-          the throughput of the consumer by disabling pre-fetching of messages.
-          This approach improves the message distribution on shared subscription
-          by pushing messages only to those consumers that are ready to process
-          them. Neither receive with timeout nor partitioned topics can be used
-          if the consumer queue size is zero. The `receive()` function call
-          should not be interrupted when the consumer queue size is zero. The
-          default value is 1000 messages and should work well for most use
-          cases.
-        * `max_total_receiver_queue_size_across_partitions`
-          Set the max total receiver queue size across partitions.
-          This setting will be used to reduce the receiver queue size for individual partitions
-        * `consumer_name`:
-          Sets the consumer name.
-        * `unacked_messages_timeout_ms`:
-          Sets the timeout in milliseconds for unacknowledged messages. The
-          timeout needs to be greater than 10 seconds. An exception is thrown if
-          the given value is less than 10 seconds. If a successful
-          acknowledgement is not sent within the timeout, all the unacknowledged
-          messages are redelivered.
-        * `negative_ack_redelivery_delay_ms`:
-           The delay after which to redeliver the messages that failed to be
-           processed (with the `consumer.negative_acknowledge()`)
-        * `broker_consumer_stats_cache_time_ms`:
-          Sets the time duration for which the broker-side consumer stats will
-          be cached in the client.
-        * `is_read_compacted`:
-          Selects whether to read the compacted version of the topic
-        * `properties`:
-          Sets the properties for the consumer. The properties associated with a consumer
-          can be used for identify a consumer at broker side.
-        * `pattern_auto_discovery_period`:
-          Periods of seconds for consumer to auto discover match topics.
-        * `initial_position`:
-          Set the initial position of a consumer  when subscribing to the topic.
-          It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
-          Default: `Latest`.
-        * crypto_key_reader:
-           Symmetric encryption class implementation, configuring public key encryption messages for the producer
-           and private key decryption messages for the consumer
-        """
-        _check_type(str, subscription_name, 'subscription_name')
-        _check_type(ConsumerType, consumer_type, 'consumer_type')
-        _check_type(_schema.Schema, schema, 'schema')
-        _check_type(int, receiver_queue_size, 'receiver_queue_size')
-        _check_type(int, max_total_receiver_queue_size_across_partitions,
-                    'max_total_receiver_queue_size_across_partitions')
-        _check_type_or_none(str, consumer_name, 'consumer_name')
-        _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
-        _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
-        _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
-        _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
-        _check_type(bool, is_read_compacted, 'is_read_compacted')
-        _check_type_or_none(dict, properties, 'properties')
-        _check_type(InitialPosition, initial_position, 'initial_position')
-        _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
-
-        conf = _pulsar.ConsumerConfiguration()
-        conf.consumer_type(consumer_type)
-        conf.read_compacted(is_read_compacted)
-        if message_listener:
-            conf.message_listener(_listener_wrapper(message_listener, schema))
-        conf.receiver_queue_size(receiver_queue_size)
-        conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
-        if consumer_name:
-            conf.consumer_name(consumer_name)
-        if unacked_messages_timeout_ms:
-            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-
-        conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
-        conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
-        if properties:
-            for k, v in properties.items():
-                conf.property(k, v)
-        conf.subscription_initial_position(initial_position)
-
-        conf.schema(schema.schema_info())
-
-        if crypto_key_reader:
-            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-
-        c = Consumer()
-        if isinstance(topic, str):
-            # Single topic
-            c._consumer = self._client.subscribe(topic, subscription_name, conf)
-        elif isinstance(topic, list):
-            # List of topics
-            c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
-        elif isinstance(topic, _retype):
-            # Regex pattern
-            c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
-        else:
-            raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
-
-        c._client = self
-        c._schema = schema
-        self._consumers.append(c)
-        return c
-
-    def create_reader(self, topic, start_message_id,
-                      schema=schema.BytesSchema(),
-                      reader_listener=None,
-                      receiver_queue_size=1000,
-                      reader_name=None,
-                      subscription_role_prefix=None,
-                      is_read_compacted=False
-                      ):
-        """
-        Create a reader on a particular topic
-
-        **Args**
-
-        * `topic`: The name of the topic.
-        * `start_message_id`: The initial reader positioning is done by specifying a message id.
-           The options are:
-            * `MessageId.earliest`: Start reading from the earliest message available in the topic
-            * `MessageId.latest`: Start reading from the end topic, only getting messages published
-               after the reader was created
-            * `MessageId`: When passing a particular message id, the reader will position itself on
-               that specific position. The first message to be read will be the message next to the
-               specified messageId. Message id can be serialized into a string and deserialized
-               back into a `MessageId` object:
-
-                   # Serialize to string
-                   s = msg.message_id().serialize()
-
-                   # Deserialize from string
-                   msg_id = MessageId.deserialize(s)
-
-        **Options**
-
-        * `schema`:
-           Define the schema of the data that will be received by this reader.
-        * `reader_listener`:
-          Sets a message listener for the reader. When the listener is set,
-          the application will receive messages through it. Calls to
-          `reader.read_next()` will not be allowed. The listener function needs
-          to accept (reader, message), for example:
-
-                def my_listener(reader, message):
-                    # process message
-                    pass
-
-        * `receiver_queue_size`:
-          Sets the size of the reader receive queue. The reader receive
-          queue controls how many messages can be accumulated by the reader
-          before the application calls `read_next()`. Using a higher value could
-          potentially increase the reader throughput at the expense of higher
-          memory utilization.
-        * `reader_name`:
-          Sets the reader name.
-        * `subscription_role_prefix`:
-          Sets the subscription role prefix.
-        * `is_read_compacted`:
-          Selects whether to read the compacted version of the topic
-        """
-        _check_type(str, topic, 'topic')
-        _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
-        _check_type(_schema.Schema, schema, 'schema')
-        _check_type(int, receiver_queue_size, 'receiver_queue_size')
-        _check_type_or_none(str, reader_name, 'reader_name')
-        _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
-        _check_type(bool, is_read_compacted, 'is_read_compacted')
-
-        conf = _pulsar.ReaderConfiguration()
-        if reader_listener:
-            conf.reader_listener(_listener_wrapper(reader_listener, schema))
-        conf.receiver_queue_size(receiver_queue_size)
-        if reader_name:
-            conf.reader_name(reader_name)
-        if subscription_role_prefix:
-            conf.subscription_role_prefix(subscription_role_prefix)
-        conf.schema(schema.schema_info())
-        conf.read_compacted(is_read_compacted)
-
-        c = Reader()
-        c._reader = self._client.create_reader(topic, start_message_id, conf)
-        c._client = self
-        c._schema = schema
-        self._consumers.append(c)
-        return c
-
-    def get_topic_partitions(self, topic):
-        """
-        Get the list of partitions for a given topic.
-
-        If the topic is partitioned, this will return a list of partition names. If the topic is not
-        partitioned, the returned list will contain the topic name itself.
-
-        This can be used to discover the partitions and create Reader, Consumer or Producer
-        instances directly on a particular partition.
-        :param topic: the topic name to lookup
-        :return: a list of partition name
-        """
-        _check_type(str, topic, 'topic')
-        return self._client.get_topic_partitions(topic)
-
-    def close(self):
-        """
-        Close the client and all the associated producers and consumers
-        """
-        self._client.close()
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.Client">Client</a></li>
-          </ul>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Client.__init__">
-    <p>def <span class="ident">__init__</span>(</p><p>self, service_url, authentication=None, operation_timeout_seconds=30, io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000, log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None, tls_allow_insecure_connection=False, tls_validate_hostname=False)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Create a new Pulsar client instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>service_url</code>: The Pulsar service url eg: pulsar://my-broker.com:6650/</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>authentication</code>:
-  Set the authentication provider to be used with the broker. For example:
-  <code>AuthenticationTls</code>, AuthenticaionToken, <code>AuthenticationAthenz</code>or <code>AuthenticationOauth2</code></li>
-<li><code>operation_timeout_seconds</code>:
-  Set timeout on client operations (subscribe, create producer, close,
-  unsubscribe).</li>
-<li><code>io_threads</code>:
-  Set the number of IO threads to be used by the Pulsar client.</li>
-<li><code>message_listener_threads</code>:
-  Set the number of threads to be used by the Pulsar client when
-  delivering messages through message listener. The default is 1 thread
-  per Pulsar client. If using more than 1 thread, messages for distinct
-  <code>message_listener</code>s will be delivered in different threads, however a
-  single <code>MessageListener</code> will always be assigned to the same thread.</li>
-<li><code>concurrent_lookup_requests</code>:
-  Number of concurrent lookup-requests allowed on each broker connection
-  to prevent overload on the broker.</li>
-<li><code>log_conf_file_path</code>:
-  Initialize log4cxx from a configuration file.</li>
-<li><code>use_tls</code>:
-  Configure whether to use TLS encryption on the connection. This setting
-  is deprecated. TLS will be automatically enabled if the <code>serviceUrl</code> is
-  set to <code>pulsar+ssl://</code> or <code>https://</code></li>
-<li><code>tls_trust_certs_file_path</code>:
-  Set the path to the trusted TLS certificate file. If empty defaults to
-  certifi.</li>
-<li><code>tls_allow_insecure_connection</code>:
-  Configure whether the Pulsar client accepts untrusted TLS certificates
-  from the broker.</li>
-<li><code>tls_validate_hostname</code>:
-  Configure whether the Pulsar client validates that the hostname of the
-  endpoint, matches the common name on the TLS certificate presented by
-  the endpoint.</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.__init__', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Client.__init__" class="source">
-    <pre><code>def __init__(self, service_url,
-             authentication=None,
-             operation_timeout_seconds=30,
-             io_threads=1,
-             message_listener_threads=1,
-             concurrent_lookup_requests=50000,
-             log_conf_file_path=None,
-             use_tls=False,
-             tls_trust_certs_file_path=None,
-             tls_allow_insecure_connection=False,
-             tls_validate_hostname=False,
-             ):
-    """
-    Create a new Pulsar client instance.
-    **Args**
-    * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
-    **Options**
-    * `authentication`:
-      Set the authentication provider to be used with the broker. For example:
-      `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
-    * `operation_timeout_seconds`:
-      Set timeout on client operations (subscribe, create producer, close,
-      unsubscribe).
-    * `io_threads`:
-      Set the number of IO threads to be used by the Pulsar client.
-    * `message_listener_threads`:
-      Set the number of threads to be used by the Pulsar client when
-      delivering messages through message listener. The default is 1 thread
-      per Pulsar client. If using more than 1 thread, messages for distinct
-      `message_listener`s will be delivered in different threads, however a
-      single `MessageListener` will always be assigned to the same thread.
-    * `concurrent_lookup_requests`:
-      Number of concurrent lookup-requests allowed on each broker connection
-      to prevent overload on the broker.
-    * `log_conf_file_path`:
-      Initialize log4cxx from a configuration file.
-    * `use_tls`:
-      Configure whether to use TLS encryption on the connection. This setting
-      is deprecated. TLS will be automatically enabled if the `serviceUrl` is
-      set to `pulsar+ssl://` or `https://`
-    * `tls_trust_certs_file_path`:
-      Set the path to the trusted TLS certificate file. If empty defaults to
-      certifi.
-    * `tls_allow_insecure_connection`:
-      Configure whether the Pulsar client accepts untrusted TLS certificates
-      from the broker.
-    * `tls_validate_hostname`:
-      Configure whether the Pulsar client validates that the hostname of the
-      endpoint, matches the common name on the TLS certificate presented by
-      the endpoint.
-    """
-    _check_type(str, service_url, 'service_url')
-    _check_type_or_none(Authentication, authentication, 'authentication')
-    _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
-    _check_type(int, io_threads, 'io_threads')
-    _check_type(int, message_listener_threads, 'message_listener_threads')
-    _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
-    _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
-    _check_type(bool, use_tls, 'use_tls')
-    _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
-    _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
-    _check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
-    conf = _pulsar.ClientConfiguration()
-    if authentication:
-        conf.authentication(authentication.auth)
-    conf.operation_timeout_seconds(operation_timeout_seconds)
-    conf.io_threads(io_threads)
-    conf.message_listener_threads(message_listener_threads)
-    conf.concurrent_lookup_requests(concurrent_lookup_requests)
-    if log_conf_file_path:
-        conf.log_conf_file_path(log_conf_file_path)
-    if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
-        conf.use_tls(True)
-    if tls_trust_certs_file_path:
-        conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
-    else:
-        conf.tls_trust_certs_file_path(certifi.where())
-    conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
-    conf.tls_validate_hostname(tls_validate_hostname)
-    self._client = _pulsar.Client(service_url, conf)
-    self._consumers = []
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Client.close">
-    <p>def <span class="ident">close</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Close the client and all the associated producers and consumers</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.close', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Client.close" class="source">
-    <pre><code>def close(self):
-    """
-    Close the client and all the associated producers and consumers
-    """
-    self._client.close()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Client.create_producer">
-    <p>def <span class="ident">create_producer</span>(</p><p>self, topic, producer_name=None, schema=&lt;pulsar.schema.schema.BytesSchema object at 0x7f2e807d5810&gt;, initial_sequence_id=None, send_timeout_millis=30000, compression_type=_pulsar.CompressionType.NONE, max_pending_messages=1000, max_pending_messages_across_partitions=50000, block_if_queue_full=False, batching_enabled=False, batching_max_messages=1000, batching_max_allowed_size_in_bytes=131072, batching_max_publish_delay_ms [...]
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Create a new producer on a given topic.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>topic</code>:
-  The topic name</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>producer_name</code>:
-   Specify a name for the producer. If not assigned,
-   the system will generate a globally unique name which can be accessed
-   with <code>Producer.producer_name()</code>. When specifying a name, it is app to
-   the user to ensure that, for a given topic, the producer name is unique
-   across all Pulsar's clusters.</li>
-<li><code>schema</code>:
-   Define the schema of the data that will be published by this producer.
-   The schema will be used for two purposes:<ul>
-<li>Validate the data format against the topic defined schema</li>
-<li>Perform serialization/deserialization between data and objects
-   An example for this parameter would be to pass <code>schema=JsonSchema(MyRecordClass)</code>.</li>
-</ul>
-</li>
-<li><code>initial_sequence_id</code>:
-   Set the baseline for the sequence ids for messages
-   published by the producer. First message will be using
-   `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
-   be assigned incremental sequence ids, if not otherwise specified.</li>
-<li><code>send_timeout_millis</code>:
-  If a message is not acknowledged by the server before the
-  <code>send_timeout</code> expires, an error will be reported.</li>
-<li><code>compression_type</code>:
-  Set the compression type for the producer. By default, message
-  payloads are not compressed. Supported compression types are
-  <code>CompressionType.LZ4</code>, <code>CompressionType.ZLib</code>, <code>CompressionType.ZSTD</code> and <code>CompressionType.SNAPPY</code>.
-  ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
-  release in order to be able to receive messages compressed with ZSTD.
-  SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
-  release in order to be able to receive messages compressed with SNAPPY.</li>
-<li><code>max_pending_messages</code>:
-  Set the max size of the queue holding the messages pending to receive
-  an acknowledgment from the broker.</li>
-<li><code>max_pending_messages_across_partitions</code>:
-  Set the max size of the queue holding the messages pending to receive
-  an acknowledgment across partitions from the broker.</li>
-<li><code>block_if_queue_full</code>: Set whether <code>send_async</code> operations should
-  block when the outgoing message queue is full.</li>
-<li><code>message_routing_mode</code>:
-  Set the message routing mode for the partitioned producer. Default is <code>PartitionsRoutingMode.RoundRobinDistribution</code>,
-  other option is <code>PartitionsRoutingMode.UseSinglePartition</code></li>
-<li><code>properties</code>:
-  Sets the properties for the producer. The properties associated with a producer
-  can be used for identify a producer at broker side.</li>
-<li>
-<p><code>batching_type</code>:
-  Sets the batching type for the producer.
-  There are two batching type: DefaultBatching and KeyBasedBatching.</p>
-<ul>
-<li>
-<p>Default batching
-incoming single messages:
-(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
-batched into single batch message:
-[(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]</p>
-</li>
-<li>
-<p>KeyBasedBatching
-incoming single messages:
-(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
-batched into single batch message:
-[(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]</p>
-</li>
-<li>encryption_key:
-   The key used for symmetric encryption, configured on the producer side</li>
-<li>crypto_key_reader:
-   Symmetric encryption class implementation, configuring public key encryption messages for the producer
-   and private key decryption messages for the consumer</li>
-</ul>
-</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.create_producer', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Client.create_producer" class="source">
-    <pre><code>def create_producer(self, topic,
-                    producer_name=None,
-                    schema=schema.BytesSchema(),
-                    initial_sequence_id=None,
-                    send_timeout_millis=30000,
-                    compression_type=CompressionType.NONE,
-                    max_pending_messages=1000,
-                    max_pending_messages_across_partitions=50000,
-                    block_if_queue_full=False,
-                    batching_enabled=False,
-                    batching_max_messages=1000,
-                    batching_max_allowed_size_in_bytes=128*1024,
-                    batching_max_publish_delay_ms=10,
-                    message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
-                    properties=None,
-                    batching_type=BatchingType.Default,
-                    encryption_key=None,
-                    crypto_key_reader=None
-                    ):
-    """
-    Create a new producer on a given topic.
-    **Args**
-    * `topic`:
-      The topic name
-    **Options**
-    * `producer_name`:
-       Specify a name for the producer. If not assigned,
-       the system will generate a globally unique name which can be accessed
-       with `Producer.producer_name()`. When specifying a name, it is app to
-       the user to ensure that, for a given topic, the producer name is unique
-       across all Pulsar's clusters.
-    * `schema`:
-       Define the schema of the data that will be published by this producer.
-       The schema will be used for two purposes:
-         - Validate the data format against the topic defined schema
-         - Perform serialization/deserialization between data and objects
-       An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
-    * `initial_sequence_id`:
-       Set the baseline for the sequence ids for messages
-       published by the producer. First message will be using
-       `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
-       be assigned incremental sequence ids, if not otherwise specified.
-    * `send_timeout_millis`:
-      If a message is not acknowledged by the server before the
-      `send_timeout` expires, an error will be reported.
-    * `compression_type`:
-      Set the compression type for the producer. By default, message
-      payloads are not compressed. Supported compression types are
-      `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
-      ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
-      release in order to be able to receive messages compressed with ZSTD.
-      SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
-      release in order to be able to receive messages compressed with SNAPPY.
-    * `max_pending_messages`:
-      Set the max size of the queue holding the messages pending to receive
-      an acknowledgment from the broker.
-    * `max_pending_messages_across_partitions`:
-      Set the max size of the queue holding the messages pending to receive
-      an acknowledgment across partitions from the broker.
-    * `block_if_queue_full`: Set whether `send_async` operations should
-      block when the outgoing message queue is full.
-    * `message_routing_mode`:
-      Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
-      other option is `PartitionsRoutingMode.UseSinglePartition`
-    * `properties`:
-      Sets the properties for the producer. The properties associated with a producer
-      can be used for identify a producer at broker side.
-    * `batching_type`:
-      Sets the batching type for the producer.
-      There are two batching type: DefaultBatching and KeyBasedBatching.
-        - Default batching
-        incoming single messages:
-        (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
-        batched into single batch message:
-        [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
-        - KeyBasedBatching
-        incoming single messages:
-        (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
-        batched into single batch message:
-        [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
-    * encryption_key:
-       The key used for symmetric encryption, configured on the producer side
-    * crypto_key_reader:
-       Symmetric encryption class implementation, configuring public key encryption messages for the producer
-       and private key decryption messages for the consumer
-    """
-    _check_type(str, topic, 'topic')
-    _check_type_or_none(str, producer_name, 'producer_name')
-    _check_type(_schema.Schema, schema, 'schema')
-    _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
-    _check_type(int, send_timeout_millis, 'send_timeout_millis')
-    _check_type(CompressionType, compression_type, 'compression_type')
-    _check_type(int, max_pending_messages, 'max_pending_messages')
-    _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
-    _check_type(bool, block_if_queue_full, 'block_if_queue_full')
-    _check_type(bool, batching_enabled, 'batching_enabled')
-    _check_type(int, batching_max_messages, 'batching_max_messages')
-    _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
-    _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
-    _check_type_or_none(dict, properties, 'properties')
-    _check_type(BatchingType, batching_type, 'batching_type')
-    _check_type_or_none(str, encryption_key, 'encryption_key')
-    _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
-    conf = _pulsar.ProducerConfiguration()
-    conf.send_timeout_millis(send_timeout_millis)
-    conf.compression_type(compression_type)
-    conf.max_pending_messages(max_pending_messages)
-    conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
-    conf.block_if_queue_full(block_if_queue_full)
-    conf.batching_enabled(batching_enabled)
-    conf.batching_max_messages(batching_max_messages)
-    conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
-    conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
-    conf.partitions_routing_mode(message_routing_mode)
-    conf.batching_type(batching_type)
-    if producer_name:
-        conf.producer_name(producer_name)
-    if initial_sequence_id:
-        conf.initial_sequence_id(initial_sequence_id)
-    if properties:
-        for k, v in properties.items():
-            conf.property(k, v)
-    conf.schema(schema.schema_info())
-    if encryption_key:
-        conf.encryption_key(encryption_key)
-    if crypto_key_reader:
-        conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-    p = Producer()
-    p._producer = self._client.create_producer(topic, conf)
-    p._schema = schema
-    return p
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Client.create_reader">
-    <p>def <span class="ident">create_reader</span>(</p><p>self, topic, start_message_id, schema=&lt;pulsar.schema.schema.BytesSchema object at 0x7f2e807e5ed0&gt;, reader_listener=None, receiver_queue_size=1000, reader_name=None, subscription_role_prefix=None, is_read_compacted=False)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Create a reader on a particular topic</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>topic</code>: The name of the topic.</li>
-<li><code>start_message_id</code>: The initial reader positioning is done by specifying a message id.
-   The options are:<ul>
-<li><code>MessageId.earliest</code>: Start reading from the earliest message available in the topic</li>
-<li><code>MessageId.latest</code>: Start reading from the end topic, only getting messages published
-   after the reader was created</li>
-<li>
-<p><code>MessageId</code>: When passing a particular message id, the reader will position itself on
-   that specific position. The first message to be read will be the message next to the
-   specified messageId. Message id can be serialized into a string and deserialized
-   back into a <code>MessageId</code> object:</p>
-<p># Serialize to string
-   s = msg.message_id().serialize()</p>
-<p># Deserialize from string
-   msg_id = MessageId.deserialize(s)</p>
-</li>
-</ul>
-</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>schema</code>:
-   Define the schema of the data that will be received by this reader.</li>
-<li>
-<p><code>reader_listener</code>:
-  Sets a message listener for the reader. When the listener is set,
-  the application will receive messages through it. Calls to
-  <code>reader.read_next()</code> will not be allowed. The listener function needs
-  to accept (reader, message), for example:</p>
-<pre><code>def my_listener(reader, message):
-    # process message
-    pass
-</code></pre>
-</li>
-<li>
-<p><code>receiver_queue_size</code>:
-  Sets the size of the reader receive queue. The reader receive
-  queue controls how many messages can be accumulated by the reader
-  before the application calls <code>read_next()</code>. Using a higher value could
-  potentially increase the reader throughput at the expense of higher
-  memory utilization.</p>
-</li>
-<li><code>reader_name</code>:
-  Sets the reader name.</li>
-<li><code>subscription_role_prefix</code>:
-  Sets the subscription role prefix.</li>
-<li><code>is_read_compacted</code>:
-  Selects whether to read the compacted version of the topic</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.create_reader', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Client.create_reader" class="source">
-    <pre><code>def create_reader(self, topic, start_message_id,
-                  schema=schema.BytesSchema(),
-                  reader_listener=None,
-                  receiver_queue_size=1000,
-                  reader_name=None,
-                  subscription_role_prefix=None,
-                  is_read_compacted=False
-                  ):
-    """
-    Create a reader on a particular topic
-    **Args**
-    * `topic`: The name of the topic.
-    * `start_message_id`: The initial reader positioning is done by specifying a message id.
-       The options are:
-        * `MessageId.earliest`: Start reading from the earliest message available in the topic
-        * `MessageId.latest`: Start reading from the end topic, only getting messages published
-           after the reader was created
-        * `MessageId`: When passing a particular message id, the reader will position itself on
-           that specific position. The first message to be read will be the message next to the
-           specified messageId. Message id can be serialized into a string and deserialized
-           back into a `MessageId` object:
-               # Serialize to string
-               s = msg.message_id().serialize()
-               # Deserialize from string
-               msg_id = MessageId.deserialize(s)
-    **Options**
-    * `schema`:
-       Define the schema of the data that will be received by this reader.
-    * `reader_listener`:
-      Sets a message listener for the reader. When the listener is set,
-      the application will receive messages through it. Calls to
-      `reader.read_next()` will not be allowed. The listener function needs
-      to accept (reader, message), for example:
-            def my_listener(reader, message):
-                # process message
-                pass
-    * `receiver_queue_size`:
-      Sets the size of the reader receive queue. The reader receive
-      queue controls how many messages can be accumulated by the reader
-      before the application calls `read_next()`. Using a higher value could
-      potentially increase the reader throughput at the expense of higher
-      memory utilization.
-    * `reader_name`:
-      Sets the reader name.
-    * `subscription_role_prefix`:
-      Sets the subscription role prefix.
-    * `is_read_compacted`:
-      Selects whether to read the compacted version of the topic
-    """
-    _check_type(str, topic, 'topic')
-    _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
-    _check_type(_schema.Schema, schema, 'schema')
-    _check_type(int, receiver_queue_size, 'receiver_queue_size')
-    _check_type_or_none(str, reader_name, 'reader_name')
-    _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
-    _check_type(bool, is_read_compacted, 'is_read_compacted')
-    conf = _pulsar.ReaderConfiguration()
-    if reader_listener:
-        conf.reader_listener(_listener_wrapper(reader_listener, schema))
-    conf.receiver_queue_size(receiver_queue_size)
-    if reader_name:
-        conf.reader_name(reader_name)
-    if subscription_role_prefix:
-        conf.subscription_role_prefix(subscription_role_prefix)
-    conf.schema(schema.schema_info())
-    conf.read_compacted(is_read_compacted)
-    c = Reader()
-    c._reader = self._client.create_reader(topic, start_message_id, conf)
-    c._client = self
-    c._schema = schema
-    self._consumers.append(c)
-    return c
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Client.get_topic_partitions">
-    <p>def <span class="ident">get_topic_partitions</span>(</p><p>self, topic)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Get the list of partitions for a given topic.</p>
-<p>If the topic is partitioned, this will return a list of partition names. If the topic is not
-partitioned, the returned list will contain the topic name itself.</p>
-<p>This can be used to discover the partitions and create Reader, Consumer or Producer
-instances directly on a particular partition.
-:param topic: the topic name to lookup
-:return: a list of partition name</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.get_topic_partitions', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Client.get_topic_partitions" class="source">
-    <pre><code>def get_topic_partitions(self, topic):
-    """
-    Get the list of partitions for a given topic.
-    If the topic is partitioned, this will return a list of partition names. If the topic is not
-    partitioned, the returned list will contain the topic name itself.
-    This can be used to discover the partitions and create Reader, Consumer or Producer
-    instances directly on a particular partition.
-    :param topic: the topic name to lookup
-    :return: a list of partition name
-    """
-    _check_type(str, topic, 'topic')
-    return self._client.get_topic_partitions(topic)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Client.subscribe">
-    <p>def <span class="ident">subscribe</span>(</p><p>self, topic, subscription_name, consumer_type=_pulsar.ConsumerType.Exclusive, schema=&lt;pulsar.schema.schema.BytesSchema object at 0x7f2e807e5e50&gt;, message_listener=None, receiver_queue_size=1000, max_total_receiver_queue_size_across_partitions=50000, consumer_name=None, unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, negative_ack_redelivery_delay_ms=60000, is_read_compacted=False, properties=None, pa [...]
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Subscribe to the given topic and subscription combination.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>topic</code>: The name of the topic, list of topics or regex pattern.
-          This method will accept these forms:
-            - <code>topic='my-topic'</code>
-            - <code>topic=['topic-1', 'topic-2', 'topic-3']</code>
-            - <code>topic=re.compile('persistent://public/default/topic-*')</code></li>
-<li><code>subscription</code>: The name of the subscription.</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>consumer_type</code>:
-  Select the subscription type to be used when subscribing to the topic.</li>
-<li><code>schema</code>:
-   Define the schema of the data that will be received by this consumer.</li>
-<li>
-<p><code>message_listener</code>:
-  Sets a message listener for the consumer. When the listener is set,
-  the application will receive messages through it. Calls to
-  <code>consumer.receive()</code> will not be allowed. The listener function needs
-  to accept (consumer, message), for example:</p>
-<pre><code>#!python
-def my_listener(consumer, message):
-    # process message
-    consumer.acknowledge(message)
-</code></pre>
-</li>
-<li>
-<p><code>receiver_queue_size</code>:
-  Sets the size of the consumer receive queue. The consumer receive
-  queue controls how many messages can be accumulated by the consumer
-  before the application calls <code>receive()</code>. Using a higher value could
-  potentially increase the consumer throughput at the expense of higher
-  memory utilization. Setting the consumer queue size to zero decreases
-  the throughput of the consumer by disabling pre-fetching of messages.
-  This approach improves the message distribution on shared subscription
-  by pushing messages only to those consumers that are ready to process
-  them. Neither receive with timeout nor partitioned topics can be used
-  if the consumer queue size is zero. The <code>receive()</code> function call
-  should not be interrupted when the consumer queue size is zero. The
-  default value is 1000 messages and should work well for most use
-  cases.</p>
-</li>
-<li><code>max_total_receiver_queue_size_across_partitions</code>
-  Set the max total receiver queue size across partitions.
-  This setting will be used to reduce the receiver queue size for individual partitions</li>
-<li><code>consumer_name</code>:
-  Sets the consumer name.</li>
-<li><code>unacked_messages_timeout_ms</code>:
-  Sets the timeout in milliseconds for unacknowledged messages. The
-  timeout needs to be greater than 10 seconds. An exception is thrown if
-  the given value is less than 10 seconds. If a successful
-  acknowledgement is not sent within the timeout, all the unacknowledged
-  messages are redelivered.</li>
-<li><code>negative_ack_redelivery_delay_ms</code>:
-   The delay after which to redeliver the messages that failed to be
-   processed (with the <code>consumer.negative_acknowledge()</code>)</li>
-<li><code>broker_consumer_stats_cache_time_ms</code>:
-  Sets the time duration for which the broker-side consumer stats will
-  be cached in the client.</li>
-<li><code>is_read_compacted</code>:
-  Selects whether to read the compacted version of the topic</li>
-<li><code>properties</code>:
-  Sets the properties for the consumer. The properties associated with a consumer
-  can be used for identify a consumer at broker side.</li>
-<li><code>pattern_auto_discovery_period</code>:
-  Periods of seconds for consumer to auto discover match topics.</li>
-<li><code>initial_position</code>:
-  Set the initial position of a consumer  when subscribing to the topic.
-  It could be either: <code>InitialPosition.Earliest</code> or <code>InitialPosition.Latest</code>.
-  Default: <code>Latest</code>.</li>
-<li>crypto_key_reader:
-   Symmetric encryption class implementation, configuring public key encryption messages for the producer
-   and private key decryption messages for the consumer</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.subscribe', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Client.subscribe" class="source">
-    <pre><code>def subscribe(self, topic, subscription_name,
-              consumer_type=ConsumerType.Exclusive,
-              schema=schema.BytesSchema(),
-              message_listener=None,
-              receiver_queue_size=1000,
-              max_total_receiver_queue_size_across_partitions=50000,
-              consumer_name=None,
-              unacked_messages_timeout_ms=None,
-              broker_consumer_stats_cache_time_ms=30000,
-              negative_ack_redelivery_delay_ms=60000,
-              is_read_compacted=False,
-              properties=None,
-              pattern_auto_discovery_period=60,
-              initial_position=InitialPosition.Latest,
-              crypto_key_reader=None
-              ):
-    """
-    Subscribe to the given topic and subscription combination.
-    **Args**
-    * `topic`: The name of the topic, list of topics or regex pattern.
-              This method will accept these forms:
-                - `topic='my-topic'`
-                - `topic=['topic-1', 'topic-2', 'topic-3']`
-                - `topic=re.compile('persistent://public/default/topic-*')`
-    * `subscription`: The name of the subscription.
-    **Options**
-    * `consumer_type`:
-      Select the subscription type to be used when subscribing to the topic.
-    * `schema`:
-       Define the schema of the data that will be received by this consumer.
-    * `message_listener`:
-      Sets a message listener for the consumer. When the listener is set,
-      the application will receive messages through it. Calls to
-      `consumer.receive()` will not be allowed. The listener function needs
-      to accept (consumer, message), for example:
-            #!python
-            def my_listener(consumer, message):
-                # process message
-                consumer.acknowledge(message)
-    * `receiver_queue_size`:
-      Sets the size of the consumer receive queue. The consumer receive
-      queue controls how many messages can be accumulated by the consumer
-      before the application calls `receive()`. Using a higher value could
-      potentially increase the consumer throughput at the expense of higher
-      memory utilization. Setting the consumer queue size to zero decreases
-      the throughput of the consumer by disabling pre-fetching of messages.
-      This approach improves the message distribution on shared subscription
-      by pushing messages only to those consumers that are ready to process
-      them. Neither receive with timeout nor partitioned topics can be used
-      if the consumer queue size is zero. The `receive()` function call
-      should not be interrupted when the consumer queue size is zero. The
-      default value is 1000 messages and should work well for most use
-      cases.
-    * `max_total_receiver_queue_size_across_partitions`
-      Set the max total receiver queue size across partitions.
-      This setting will be used to reduce the receiver queue size for individual partitions
-    * `consumer_name`:
-      Sets the consumer name.
-    * `unacked_messages_timeout_ms`:
-      Sets the timeout in milliseconds for unacknowledged messages. The
-      timeout needs to be greater than 10 seconds. An exception is thrown if
-      the given value is less than 10 seconds. If a successful
-      acknowledgement is not sent within the timeout, all the unacknowledged
-      messages are redelivered.
-    * `negative_ack_redelivery_delay_ms`:
-       The delay after which to redeliver the messages that failed to be
-       processed (with the `consumer.negative_acknowledge()`)
-    * `broker_consumer_stats_cache_time_ms`:
-      Sets the time duration for which the broker-side consumer stats will
-      be cached in the client.
-    * `is_read_compacted`:
-      Selects whether to read the compacted version of the topic
-    * `properties`:
-      Sets the properties for the consumer. The properties associated with a consumer
-      can be used for identify a consumer at broker side.
-    * `pattern_auto_discovery_period`:
-      Periods of seconds for consumer to auto discover match topics.
-    * `initial_position`:
-      Set the initial position of a consumer  when subscribing to the topic.
-      It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
-      Default: `Latest`.
-    * crypto_key_reader:
-       Symmetric encryption class implementation, configuring public key encryption messages for the producer
-       and private key decryption messages for the consumer
-    """
-    _check_type(str, subscription_name, 'subscription_name')
-    _check_type(ConsumerType, consumer_type, 'consumer_type')
-    _check_type(_schema.Schema, schema, 'schema')
-    _check_type(int, receiver_queue_size, 'receiver_queue_size')
-    _check_type(int, max_total_receiver_queue_size_across_partitions,
-                'max_total_receiver_queue_size_across_partitions')
-    _check_type_or_none(str, consumer_name, 'consumer_name')
-    _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
-    _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
-    _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
-    _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
-    _check_type(bool, is_read_compacted, 'is_read_compacted')
-    _check_type_or_none(dict, properties, 'properties')
-    _check_type(InitialPosition, initial_position, 'initial_position')
-    _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
-    conf = _pulsar.ConsumerConfiguration()
-    conf.consumer_type(consumer_type)
-    conf.read_compacted(is_read_compacted)
-    if message_listener:
-        conf.message_listener(_listener_wrapper(message_listener, schema))
-    conf.receiver_queue_size(receiver_queue_size)
-    conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
-    if consumer_name:
-        conf.consumer_name(consumer_name)
-    if unacked_messages_timeout_ms:
-        conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-    conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
-    conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
-    if properties:
-        for k, v in properties.items():
-            conf.property(k, v)
-    conf.subscription_initial_position(initial_position)
-    conf.schema(schema.schema_info())
-    if crypto_key_reader:
-        conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-    c = Consumer()
-    if isinstance(topic, str):
-        # Single topic
-        c._consumer = self._client.subscribe(topic, subscription_name, conf)
-    elif isinstance(topic, list):
-        # List of topics
-        c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
-    elif isinstance(topic, _retype):
-        # Regex pattern
-        c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
-    else:
-        raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
-    c._client = self
-    c._schema = schema
-    self._consumers.append(c)
-    return c
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.Consumer" class="name">class <span class="ident">Consumer</span></p>
-      
-  
-    <div class="desc"><p>Pulsar consumer.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer" class="source">
-    <pre><code>class Consumer:
-    """
-    Pulsar consumer.
-    """
-
-    def topic(self):
-        """
-        Return the topic this consumer is subscribed to.
-        """
-        return self._consumer.topic()
-
-    def subscription_name(self):
-        """
-        Return the subscription name.
-        """
-        return self._consumer.subscription_name()
-
-    def unsubscribe(self):
-        """
-        Unsubscribe the current consumer from the topic.
-
-        This method will block until the operation is completed. Once the
-        consumer is unsubscribed, no more messages will be received and
-        subsequent new messages will not be retained for this consumer.
-
-        This consumer object cannot be reused.
-        """
-        return self._consumer.unsubscribe()
-
-    def receive(self, timeout_millis=None):
-        """
-        Receive a single message.
-
-        If a message is not immediately available, this method will block until
-        a new message is available.
-
-        **Options**
-
-        * `timeout_millis`:
-          If specified, the receive will raise an exception if a message is not
-          available within the timeout.
-        """
-        if timeout_millis is None:
-            msg = self._consumer.receive()
-        else:
-            _check_type(int, timeout_millis, 'timeout_millis')
-            msg = self._consumer.receive(timeout_millis)
-
-        m = Message()
-        m._message = msg
-        m._schema = self._schema
-        return m
-
-    def acknowledge(self, message):
-        """
-        Acknowledge the reception of a single message.
-
-        This method will block until an acknowledgement is sent to the broker.
-        After that, the message will not be re-delivered to this consumer.
-
-        **Args**
-
-        * `message`:
-          The received message or message id.
-        """
-        if isinstance(message, Message):
-            self._consumer.acknowledge(message._message)
-        else:
-            self._consumer.acknowledge(message)
-
-    def acknowledge_cumulative(self, message):
-        """
-        Acknowledge the reception of all the messages in the stream up to (and
-        including) the provided message.
-
-        This method will block until an acknowledgement is sent to the broker.
-        After that, the messages will not be re-delivered to this consumer.
-
-        **Args**
-
-        * `message`:
-          The received message or message id.
-        """
-        if isinstance(message, Message):
-            self._consumer.acknowledge_cumulative(message._message)
-        else:
-            self._consumer.acknowledge_cumulative(message)
-
-    def negative_acknowledge(self, message):
-        """
-        Acknowledge the failure to process a single message.
-
-        When a message is "negatively acked" it will be marked for redelivery after
-        some fixed delay. The delay is configurable when constructing the consumer
-        with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
-
-        This call is not blocking.
-
-        **Args**
-
-        * `message`:
-          The received message or message id.
-        """
-        if isinstance(message, Message):
-            self._consumer.negative_acknowledge(message._message)
-        else:
-            self._consumer.negative_acknowledge(message)
-
-    def pause_message_listener(self):
-        """
-        Pause receiving messages via the `message_listener` until
-        `resume_message_listener()` is called.
-        """
-        self._consumer.pause_message_listener()
-
-    def resume_message_listener(self):
-        """
-        Resume receiving the messages via the message listener.
-        Asynchronously receive all the messages enqueued from the time
-        `pause_message_listener()` was called.
-        """
-        self._consumer.resume_message_listener()
-
-    def redeliver_unacknowledged_messages(self):
-        """
-        Redelivers all the unacknowledged messages. In failover mode, the
-        request is ignored if the consumer is not active for the given topic. In
-        shared mode, the consumer's messages to be redelivered are distributed
-        across all the connected consumers. This is a non-blocking call and
-        doesn't throw an exception. In case the connection breaks, the messages
-        are redelivered after reconnect.
-        """
-        self._consumer.redeliver_unacknowledged_messages()
-
-    def seek(self, messageid):
-        """
-        Reset the subscription associated with this consumer to a specific message id or publish timestamp.
-        The message id can either be a specific message or represent the first or last messages in the topic.
-        Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
-        seek() on the individual partitions.
-
-        **Args**
-
-        * `message`:
-          The message id for seek, OR an integer event time to seek to
-        """
-        self._consumer.seek(messageid)
-
-    def close(self):
-        """
-        Close the consumer.
-        """
-        self._consumer.close()
-        self._client._consumers.remove(self)
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.Consumer">Consumer</a></li>
-          </ul>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.acknowledge">
-    <p>def <span class="ident">acknowledge</span>(</p><p>self, message)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Acknowledge the reception of a single message.</p>
-<p>This method will block until an acknowledgement is sent to the broker.
-After that, the message will not be re-delivered to this consumer.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>message</code>:
-  The received message or message id.</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.acknowledge', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.acknowledge" class="source">
-    <pre><code>def acknowledge(self, message):
-    """
-    Acknowledge the reception of a single message.
-    This method will block until an acknowledgement is sent to the broker.
-    After that, the message will not be re-delivered to this consumer.
-    **Args**
-    * `message`:
-      The received message or message id.
-    """
-    if isinstance(message, Message):
-        self._consumer.acknowledge(message._message)
-    else:
-        self._consumer.acknowledge(message)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.acknowledge_cumulative">
-    <p>def <span class="ident">acknowledge_cumulative</span>(</p><p>self, message)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Acknowledge the reception of all the messages in the stream up to (and
-including) the provided message.</p>
-<p>This method will block until an acknowledgement is sent to the broker.
-After that, the messages will not be re-delivered to this consumer.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>message</code>:
-  The received message or message id.</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.acknowledge_cumulative', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.acknowledge_cumulative" class="source">
-    <pre><code>def acknowledge_cumulative(self, message):
-    """
-    Acknowledge the reception of all the messages in the stream up to (and
-    including) the provided message.
-    This method will block until an acknowledgement is sent to the broker.
-    After that, the messages will not be re-delivered to this consumer.
-    **Args**
-    * `message`:
-      The received message or message id.
-    """
-    if isinstance(message, Message):
-        self._consumer.acknowledge_cumulative(message._message)
-    else:
-        self._consumer.acknowledge_cumulative(message)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.close">
-    <p>def <span class="ident">close</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Close the consumer.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.close', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.close" class="source">
-    <pre><code>def close(self):
-    """
-    Close the consumer.
-    """
-    self._consumer.close()
-    self._client._consumers.remove(self)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.negative_acknowledge">
-    <p>def <span class="ident">negative_acknowledge</span>(</p><p>self, message)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Acknowledge the failure to process a single message.</p>
-<p>When a message is "negatively acked" it will be marked for redelivery after
-some fixed delay. The delay is configurable when constructing the consumer
-with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.</p>
-<p>This call is not blocking.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>message</code>:
-  The received message or message id.</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.negative_acknowledge', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.negative_acknowledge" class="source">
-    <pre><code>def negative_acknowledge(self, message):
-    """
-    Acknowledge the failure to process a single message.
-    When a message is "negatively acked" it will be marked for redelivery after
-    some fixed delay. The delay is configurable when constructing the consumer
-    with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
-    This call is not blocking.
-    **Args**
-    * `message`:
-      The received message or message id.
-    """
-    if isinstance(message, Message):
-        self._consumer.negative_acknowledge(message._message)
-    else:
-        self._consumer.negative_acknowledge(message)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.pause_message_listener">
-    <p>def <span class="ident">pause_message_listener</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Pause receiving messages via the <code>message_listener</code> until
-<code>resume_message_listener()</code> is called.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.pause_message_listener', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.pause_message_listener" class="source">
-    <pre><code>def pause_message_listener(self):
-    """
-    Pause receiving messages via the `message_listener` until
-    `resume_message_listener()` is called.
-    """
-    self._consumer.pause_message_listener()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.receive">
-    <p>def <span class="ident">receive</span>(</p><p>self, timeout_millis=None)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Receive a single message.</p>
-<p>If a message is not immediately available, this method will block until
-a new message is available.</p>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>timeout_millis</code>:
-  If specified, the receive will raise an exception if a message is not
-  available within the timeout.</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.receive', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.receive" class="source">
-    <pre><code>def receive(self, timeout_millis=None):
-    """
-    Receive a single message.
-    If a message is not immediately available, this method will block until
-    a new message is available.
-    **Options**
-    * `timeout_millis`:
-      If specified, the receive will raise an exception if a message is not
-      available within the timeout.
-    """
-    if timeout_millis is None:
-        msg = self._consumer.receive()
-    else:
-        _check_type(int, timeout_millis, 'timeout_millis')
-        msg = self._consumer.receive(timeout_millis)
-    m = Message()
-    m._message = msg
-    m._schema = self._schema
-    return m
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.redeliver_unacknowledged_messages">
-    <p>def <span class="ident">redeliver_unacknowledged_messages</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Redelivers all the unacknowledged messages. In failover mode, the
-request is ignored if the consumer is not active for the given topic. In
-shared mode, the consumer's messages to be redelivered are distributed
-across all the connected consumers. This is a non-blocking call and
-doesn't throw an exception. In case the connection breaks, the messages
-are redelivered after reconnect.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.redeliver_unacknowledged_messages', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.redeliver_unacknowledged_messages" class="source">
-    <pre><code>def redeliver_unacknowledged_messages(self):
-    """
-    Redelivers all the unacknowledged messages. In failover mode, the
-    request is ignored if the consumer is not active for the given topic. In
-    shared mode, the consumer's messages to be redelivered are distributed
-    across all the connected consumers. This is a non-blocking call and
-    doesn't throw an exception. In case the connection breaks, the messages
-    are redelivered after reconnect.
-    """
-    self._consumer.redeliver_unacknowledged_messages()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.resume_message_listener">
-    <p>def <span class="ident">resume_message_listener</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Resume receiving the messages via the message listener.
-Asynchronously receive all the messages enqueued from the time
-<code>pause_message_listener()</code> was called.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.resume_message_listener', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.resume_message_listener" class="source">
-    <pre><code>def resume_message_listener(self):
-    """
-    Resume receiving the messages via the message listener.
-    Asynchronously receive all the messages enqueued from the time
-    `pause_message_listener()` was called.
-    """
-    self._consumer.resume_message_listener()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.seek">
-    <p>def <span class="ident">seek</span>(</p><p>self, messageid)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Reset the subscription associated with this consumer to a specific message id or publish timestamp.
-The message id can either be a specific message or represent the first or last messages in the topic.
-Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
-seek() on the individual partitions.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>message</code>:
-  The message id for seek, OR an integer event time to seek to</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.seek', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.seek" class="source">
-    <pre><code>def seek(self, messageid):
-    """
-    Reset the subscription associated with this consumer to a specific message id or publish timestamp.
-    The message id can either be a specific message or represent the first or last messages in the topic.
-    Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
-    seek() on the individual partitions.
-    **Args**
-    * `message`:
-      The message id for seek, OR an integer event time to seek to
-    """
-    self._consumer.seek(messageid)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.subscription_name">
-    <p>def <span class="ident">subscription_name</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Return the subscription name.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.subscription_name', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.subscription_name" class="source">
-    <pre><code>def subscription_name(self):
-    """
-    Return the subscription name.
-    """
-    return self._consumer.subscription_name()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.topic">
-    <p>def <span class="ident">topic</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Return the topic this consumer is subscribed to.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.topic', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.topic" class="source">
-    <pre><code>def topic(self):
-    """
-    Return the topic this consumer is subscribed to.
-    """
-    return self._consumer.topic()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Consumer.unsubscribe">
-    <p>def <span class="ident">unsubscribe</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Unsubscribe the current consumer from the topic.</p>
-<p>This method will block until the operation is completed. Once the
-consumer is unsubscribed, no more messages will be received and
-subsequent new messages will not be retained for this consumer.</p>
-<p>This consumer object cannot be reused.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.unsubscribe', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Consumer.unsubscribe" class="source">
-    <pre><code>def unsubscribe(self):
-    """
-    Unsubscribe the current consumer from the topic.
-    This method will block until the operation is completed. Once the
-    consumer is unsubscribed, no more messages will be received and
-    subsequent new messages will not be retained for this consumer.
-    This consumer object cannot be reused.
-    """
-    return self._consumer.unsubscribe()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.CryptoKeyReader" class="name">class <span class="ident">CryptoKeyReader</span></p>
-      
-  
-    <div class="desc"><p>Default crypto key reader implementation</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.CryptoKeyReader', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.CryptoKeyReader" class="source">
-    <pre><code>class CryptoKeyReader:
-    """
-    Default crypto key reader implementation
-    """
-    def __init__(self, public_key_path, private_key_path):
-        """
-        Create crypto key reader.
-
-        **Args**
-
-        * `public_key_path`: Path to the public key
-        * `private_key_path`: Path to private key
-        """
-        _check_type(str, public_key_path, 'public_key_path')
-        _check_type(str, private_key_path, 'private_key_path')
-        self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.CryptoKeyReader">CryptoKeyReader</a></li>
-          </ul>
-          <h3>Instance variables</h3>
-            <div class="item">
-            <p id="pulsar.CryptoKeyReader.cryptoKeyReader" class="name">var <span class="ident">cryptoKeyReader</span></p>
-            
-
-            
-  
-  <div class="source_cont">
-</div>
-
-            </div>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.CryptoKeyReader.__init__">
-    <p>def <span class="ident">__init__</span>(</p><p>self, public_key_path, private_key_path)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Create crypto key reader.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>public_key_path</code>: Path to the public key</li>
-<li><code>private_key_path</code>: Path to private key</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.CryptoKeyReader.__init__', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.CryptoKeyReader.__init__" class="source">
-    <pre><code>def __init__(self, public_key_path, private_key_path):
-    """
-    Create crypto key reader.
-    **Args**
-    * `public_key_path`: Path to the public key
-    * `private_key_path`: Path to private key
-    """
-    _check_type(str, public_key_path, 'public_key_path')
-    _check_type(str, private_key_path, 'private_key_path')
-    self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.Message" class="name">class <span class="ident">Message</span></p>
-      
-  
-    <div class="desc"><p>Message objects are returned by a consumer, either by calling <code>receive</code> or
-through a listener.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Message" class="source">
-    <pre><code>class Message:
-    """
-    Message objects are returned by a consumer, either by calling `receive` or
-    through a listener.
-    """
-
-    def data(self):
-        """
-        Returns object typed bytes with the payload of the message.
-        """
-        return self._message.data()
-
-    def value(self):
-        """
-        Returns object with the de-serialized version of the message content
-        """
-        return self._schema.decode(self._message.data())
-
-    def properties(self):
-        """
-        Return the properties attached to the message. Properties are
-        application-defined key/value pairs that will be attached to the
-        message.
-        """
-        return self._message.properties()
-
-    def partition_key(self):
-        """
-        Get the partitioning key for the message.
-        """
-        return self._message.partition_key()
-
-    def publish_timestamp(self):
-        """
-        Get the timestamp in milliseconds with the message publish time.
-        """
-        return self._message.publish_timestamp()
-
-    def event_timestamp(self):
-        """
-        Get the timestamp in milliseconds with the message event time.
-        """
-        return self._message.event_timestamp()
-
-    def message_id(self):
-        """
-        The message ID that can be used to refere to this particular message.
-        """
-        return self._message.message_id()
-
-    def topic_name(self):
-        """
-        Get the topic Name from which this message originated from
-        """
-        return self._message.topic_name()
-
-    def redelivery_count(self):
-        """
-        Get the redelivery count for this message
-        """
-        return self._message.redelivery_count()
-
-    def schema_version(self):
-        """
-        Get the schema version for this message
-        """
-        return self._message.schema_version()
-
-    @staticmethod
-    def _wrap(_message):
-        self = Message()
-        self._message = _message
-        return self
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.Message">Message</a></li>
-          </ul>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Message.data">
-    <p>def <span class="ident">data</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Returns object typed bytes with the payload of the message.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.data', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Message.data" class="source">
-    <pre><code>def data(self):
-    """
-    Returns object typed bytes with the payload of the message.
-    """
-    return self._message.data()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Message.event_timestamp">
-    <p>def <span class="ident">event_timestamp</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Get the timestamp in milliseconds with the message event time.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.event_timestamp', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Message.event_timestamp" class="source">
-    <pre><code>def event_timestamp(self):
-    """
-    Get the timestamp in milliseconds with the message event time.
-    """
-    return self._message.event_timestamp()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Message.message_id">
-    <p>def <span class="ident">message_id</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>The message ID that can be used to refere to this particular message.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.message_id', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Message.message_id" class="source">
-    <pre><code>def message_id(self):
-    """
-    The message ID that can be used to refere to this particular message.
-    """
-    return self._message.message_id()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Message.partition_key">
-    <p>def <span class="ident">partition_key</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Get the partitioning key for the message.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.partition_key', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Message.partition_key" class="source">
-    <pre><code>def partition_key(self):
-    """
-    Get the partitioning key for the message.
-    """
-    return self._message.partition_key()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Message.properties">
-    <p>def <span class="ident">properties</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Return the properties attached to the message. Properties are
-application-defined key/value pairs that will be attached to the
-message.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.properties', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Message.properties" class="source">
-    <pre><code>def properties(self):
-    """
-    Return the properties attached to the message. Properties are
-    application-defined key/value pairs that will be attached to the
-    message.
-    """
-    return self._message.properties()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Message.publish_timestamp">
-    <p>def <span class="ident">publish_timestamp</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Get the timestamp in milliseconds with the message publish time.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.publish_timestamp', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Message.publish_timestamp" class="source">
-    <pre><code>def publish_timestamp(self):
-    """
-    Get the timestamp in milliseconds with the message publish time.
-    """
-    return self._message.publish_timestamp()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Message.redelivery_count">
-    <p>def <span class="ident">redelivery_count</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Get the redelivery count for this message</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.redelivery_count', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Message.redelivery_count" class="source">
-    <pre><code>def redelivery_count(self):
-    """
-    Get the redelivery count for this message
-    """
-    return self._message.redelivery_count()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Message.schema_version">
-    <p>def <span class="ident">schema_version</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Get the schema version for this message</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.schema_version', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Message.schema_version" class="source">
-    <pre><code>def schema_version(self):
-    """
-    Get the schema version for this message
-    """
-    return self._message.schema_version()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Message.topic_name">
-    <p>def <span class="ident">topic_name</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Get the topic Name from which this message originated from</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.topic_name', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Message.topic_name" class="source">
-    <pre><code>def topic_name(self):
-    """
-    Get the topic Name from which this message originated from
-    """
-    return self._message.topic_name()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Message.value">
-    <p>def <span class="ident">value</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Returns object with the de-serialized version of the message content</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.value', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Message.value" class="source">
-    <pre><code>def value(self):
-    """
-    Returns object with the de-serialized version of the message content
-    """
-    return self._schema.decode(self._message.data())
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.MessageBatch" class="name">class <span class="ident">MessageBatch</span></p>
-      
-  
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageBatch" class="source">
-    <pre><code>class MessageBatch:
-
-    def __init__(self):
-        self._msg_batch = _pulsar.MessageBatch()
-
-    def with_message_id(self, msg_id):
-        if not isinstance(msg_id, _pulsar.MessageId):
-            if isinstance(msg_id, MessageId):
-                msg_id = msg_id._msg_id
-            else:
-                raise TypeError("unknown message id type")
-        self._msg_batch.with_message_id(msg_id)
-        return self
-
-    def parse_from(self, data, size):
-        self._msg_batch.parse_from(data, size)
-        _msgs = self._msg_batch.messages()
-        return list(map(Message._wrap, _msgs))
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.MessageBatch">MessageBatch</a></li>
-          </ul>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.MessageBatch.__init__">
-    <p>def <span class="ident">__init__</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.__init__', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageBatch.__init__" class="source">
-    <pre><code>def __init__(self):
-    self._msg_batch = _pulsar.MessageBatch()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.MessageBatch.parse_from">
-    <p>def <span class="ident">parse_from</span>(</p><p>self, data, size)</p>
-    </div>
-    
-
-    
-  
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.parse_from', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageBatch.parse_from" class="source">
-    <pre><code>def parse_from(self, data, size):
-    self._msg_batch.parse_from(data, size)
-    _msgs = self._msg_batch.messages()
-    return list(map(Message._wrap, _msgs))
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.MessageBatch.with_message_id">
-    <p>def <span class="ident">with_message_id</span>(</p><p>self, msg_id)</p>
-    </div>
-    
-
-    
-  
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.with_message_id', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageBatch.with_message_id" class="source">
-    <pre><code>def with_message_id(self, msg_id):
-    if not isinstance(msg_id, _pulsar.MessageId):
-        if isinstance(msg_id, MessageId):
-            msg_id = msg_id._msg_id
-        else:
-            raise TypeError("unknown message id type")
-    self._msg_batch.with_message_id(msg_id)
-    return self
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.MessageId" class="name">class <span class="ident">MessageId</span></p>
-      
-  
-    <div class="desc"><p>Represents a message id</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageId" class="source">
-    <pre><code>class MessageId:
-    """
-    Represents a message id
-    """
-
-    def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
-        self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
-
-    'Represents the earliest message stored in a topic'
-    earliest = _pulsar.MessageId.earliest
-
-    'Represents the latest message published on a topic'
-    latest = _pulsar.MessageId.latest
-
-    def ledger_id(self):
-        return self._msg_id.ledger_id()
-
-    def entry_id(self):
-        return self._msg_id.entry_id()
-
-    def batch_index(self):
-        return self._msg_id.batch_index()
-
-    def partition(self):
-        return self._msg_id.partition()
-
-    def serialize(self):
-        """
-        Returns a bytes representation of the message id.
-        This bytes sequence can be stored and later deserialized.
-        """
-        return self._msg_id.serialize()
-
-    @staticmethod
-    def deserialize(message_id_bytes):
-        """
-        Deserialize a message id object from a previously
-        serialized bytes sequence.
-        """
-        return _pulsar.MessageId.deserialize(message_id_bytes)
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.MessageId">MessageId</a></li>
-          </ul>
-          <h3>Class variables</h3>
-            <div class="item">
-            <p id="pulsar.MessageId.earliest" class="name">var <span class="ident">earliest</span></p>
-            
-
-            
-  
-    <div class="desc"><p>Represents the latest message published on a topic</p></div>
-  <div class="source_cont">
-</div>
-
-            </div>
-            <div class="item">
-            <p id="pulsar.MessageId.latest" class="name">var <span class="ident">latest</span></p>
-            
-
-            
-  
-  <div class="source_cont">
-</div>
-
-            </div>
-          <h3>Static methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.MessageId.deserialize">
-    <p>def <span class="ident">deserialize</span>(</p><p>message_id_bytes)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Deserialize a message id object from a previously
-serialized bytes sequence.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.deserialize', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageId.deserialize" class="source">
-    <pre><code>@staticmethod
-def deserialize(message_id_bytes):
-    """
-    Deserialize a message id object from a previously
-    serialized bytes sequence.
-    """
-    return _pulsar.MessageId.deserialize(message_id_bytes)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.MessageId.__init__">
-    <p>def <span class="ident">__init__</span>(</p><p>self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1)</p>
-    </div>
-    
-
-    
-  
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.__init__', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageId.__init__" class="source">
-    <pre><code>def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
-    self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.MessageId.batch_index">
-    <p>def <span class="ident">batch_index</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.batch_index', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageId.batch_index" class="source">
-    <pre><code>def batch_index(self):
-    return self._msg_id.batch_index()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.MessageId.entry_id">
-    <p>def <span class="ident">entry_id</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.entry_id', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageId.entry_id" class="source">
-    <pre><code>def entry_id(self):
-    return self._msg_id.entry_id()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.MessageId.ledger_id">
-    <p>def <span class="ident">ledger_id</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.ledger_id', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageId.ledger_id" class="source">
-    <pre><code>def ledger_id(self):
-    return self._msg_id.ledger_id()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.MessageId.partition">
-    <p>def <span class="ident">partition</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.partition', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageId.partition" class="source">
-    <pre><code>def partition(self):
-    return self._msg_id.partition()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.MessageId.serialize">
-    <p>def <span class="ident">serialize</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Returns a bytes representation of the message id.
-This bytes sequence can be stored and later deserialized.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.serialize', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.MessageId.serialize" class="source">
-    <pre><code>def serialize(self):
-    """
-    Returns a bytes representation of the message id.
-    This bytes sequence can be stored and later deserialized.
-    """
-    return self._msg_id.serialize()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.Producer" class="name">class <span class="ident">Producer</span></p>
-      
-  
-    <div class="desc"><p>The Pulsar message producer, used to publish messages on a topic.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Producer" class="source">
-    <pre><code>class Producer:
-    """
-    The Pulsar message producer, used to publish messages on a topic.
-    """
-
-    def topic(self):
-        """
-        Return the topic which producer is publishing to
-        """
-        return self._producer.topic()
-
-    def producer_name(self):
-        """
-        Return the producer name which could have been assigned by the
-        system or specified by the client
-        """
-        return self._producer.producer_name()
-
-    def last_sequence_id(self):
-        """
-        Get the last sequence id that was published by this producer.
-
-        This represent either the automatically assigned or custom sequence id
-        (set on the `MessageBuilder`) that was published and acknowledged by the broker.
-
-        After recreating a producer with the same producer name, this will return the
-        last message that was published in the previous producer session, or -1 if
-        there no message was ever published.
-        """
-        return self._producer.last_sequence_id()
-
-    def send(self, content,
-             properties=None,
-             partition_key=None,
-             sequence_id=None,
-             replication_clusters=None,
-             disable_replication=False,
-             event_timestamp=None,
-             deliver_at=None,
-             deliver_after=None,
-             ):
-        """
-        Publish a message on the topic. Blocks until the message is acknowledged
-
-        Returns a `MessageId` object that represents where the message is persisted.
-
-        **Args**
-
-        * `content`:
-          A `bytes` object with the message payload.
-
-        **Options**
-
-        * `properties`:
-          A dict of application-defined string properties.
-        * `partition_key`:
-          Sets the partition key for message routing. A hash of this key is used
-          to determine the message's topic partition.
-        * `sequence_id`:
-          Specify a custom sequence id for the message being published.
-        * `replication_clusters`:
-          Override namespace replication clusters. Note that it is the caller's
-          responsibility to provide valid cluster names and that all clusters
-          have been previously configured as topics. Given an empty list,
-          the message will replicate according to the namespace configuration.
-        * `disable_replication`:
-          Do not replicate this message.
-        * `event_timestamp`:
-          Timestamp in millis of the timestamp of event creation
-        * `deliver_at`:
-          Specify the this message should not be delivered earlier than the
-          specified timestamp.
-          The timestamp is milliseconds and based on UTC
-        * `deliver_after`:
-          Specify a delay in timedelta for the delivery of the messages.
-
-        """
-        msg = self._build_msg(content, properties, partition_key, sequence_id,
-                              replication_clusters, disable_replication, event_timestamp,
-                              deliver_at, deliver_after)
-        return MessageId.deserialize(self._producer.send(msg))
-
-    def send_async(self, content, callback,
-                   properties=None,
-                   partition_key=None,
-                   sequence_id=None,
-                   replication_clusters=None,
-                   disable_replication=False,
-                   event_timestamp=None,
-                   deliver_at=None,
-                   deliver_after=None,
-                   ):
-        """
-        Send a message asynchronously.
-
-        The `callback` will be invoked once the message has been acknowledged
-        by the broker.
-
-        Example:
-
-            #!python
-            def callback(res, msg_id):
-                print('Message published: %s' % res)
-
-            producer.send_async(msg, callback)
-
-        When the producer queue is full, by default the message will be rejected
-        and the callback invoked with an error code.
-
-        **Args**
-
-        * `content`:
-          A `bytes` object with the message payload.
-
-        **Options**
-
-        * `properties`:
-          A dict of application0-defined string properties.
-        * `partition_key`:
-          Sets the partition key for the message routing. A hash of this key is
-          used to determine the message's topic partition.
-        * `sequence_id`:
-          Specify a custom sequence id for the message being published.
-        * `replication_clusters`: Override namespace replication clusters. Note
-          that it is the caller's responsibility to provide valid cluster names
-          and that all clusters have been previously configured as topics.
-          Given an empty list, the message will replicate per the namespace
-          configuration.
-        * `disable_replication`:
-          Do not replicate this message.
-        * `event_timestamp`:
-          Timestamp in millis of the timestamp of event creation
-        * `deliver_at`:
-          Specify the this message should not be delivered earlier than the
-          specified timestamp.
-          The timestamp is milliseconds and based on UTC
-        * `deliver_after`:
-          Specify a delay in timedelta for the delivery of the messages.
-        """
-        msg = self._build_msg(content, properties, partition_key, sequence_id,
-                              replication_clusters, disable_replication, event_timestamp,
-                              deliver_at, deliver_after)
-        self._producer.send_async(msg, callback)
-
-
-    def flush(self):
-        """
-        Flush all the messages buffered in the client and wait until all messages have been
-        successfully persisted
-        """
-        self._producer.flush()
-
-
-    def close(self):
-        """
-        Close the producer.
-        """
-        self._producer.close()
-
-    def _build_msg(self, content, properties, partition_key, sequence_id,
-                   replication_clusters, disable_replication, event_timestamp,
-                   deliver_at, deliver_after):
-        data = self._schema.encode(content)
-
-        _check_type(bytes, data, 'data')
-        _check_type_or_none(dict, properties, 'properties')
-        _check_type_or_none(str, partition_key, 'partition_key')
-        _check_type_or_none(int, sequence_id, 'sequence_id')
-        _check_type_or_none(list, replication_clusters, 'replication_clusters')
-        _check_type(bool, disable_replication, 'disable_replication')
-        _check_type_or_none(int, event_timestamp, 'event_timestamp')
-        _check_type_or_none(int, deliver_at, 'deliver_at')
-        _check_type_or_none(timedelta, deliver_after, 'deliver_after')
-
-        mb = _pulsar.MessageBuilder()
-        mb.content(data)
-        if properties:
-            for k, v in properties.items():
-                mb.property(k, v)
-        if partition_key:
-            mb.partition_key(partition_key)
-        if sequence_id:
-            mb.sequence_id(sequence_id)
-        if replication_clusters:
-            mb.replication_clusters(replication_clusters)
-        if disable_replication:
-            mb.disable_replication(disable_replication)
-        if event_timestamp:
-            mb.event_timestamp(event_timestamp)
-        if deliver_at:
-            mb.deliver_at(deliver_at)
-        if deliver_after:
-            mb.deliver_after(deliver_after)
-
-        return mb.build()
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.Producer">Producer</a></li>
-          </ul>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Producer.close">
-    <p>def <span class="ident">close</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Close the producer.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.close', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Producer.close" class="source">
-    <pre><code>def close(self):
-    """
-    Close the producer.
-    """
-    self._producer.close()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Producer.flush">
-    <p>def <span class="ident">flush</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Flush all the messages buffered in the client and wait until all messages have been
-successfully persisted</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.flush', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Producer.flush" class="source">
-    <pre><code>def flush(self):
-    """
-    Flush all the messages buffered in the client and wait until all messages have been
-    successfully persisted
-    """
-    self._producer.flush()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Producer.last_sequence_id">
-    <p>def <span class="ident">last_sequence_id</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Get the last sequence id that was published by this producer.</p>
-<p>This represent either the automatically assigned or custom sequence id
-(set on the <code>MessageBuilder</code>) that was published and acknowledged by the broker.</p>
-<p>After recreating a producer with the same producer name, this will return the
-last message that was published in the previous producer session, or -1 if
-there no message was ever published.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.last_sequence_id', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Producer.last_sequence_id" class="source">
-    <pre><code>def last_sequence_id(self):
-    """
-    Get the last sequence id that was published by this producer.
-    This represent either the automatically assigned or custom sequence id
-    (set on the `MessageBuilder`) that was published and acknowledged by the broker.
-    After recreating a producer with the same producer name, this will return the
-    last message that was published in the previous producer session, or -1 if
-    there no message was ever published.
-    """
-    return self._producer.last_sequence_id()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Producer.producer_name">
-    <p>def <span class="ident">producer_name</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Return the producer name which could have been assigned by the
-system or specified by the client</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.producer_name', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Producer.producer_name" class="source">
-    <pre><code>def producer_name(self):
-    """
-    Return the producer name which could have been assigned by the
-    system or specified by the client
-    """
-    return self._producer.producer_name()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Producer.send">
-    <p>def <span class="ident">send</span>(</p><p>self, content, properties=None, partition_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Publish a message on the topic. Blocks until the message is acknowledged</p>
-<p>Returns a <code>MessageId</code> object that represents where the message is persisted.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>content</code>:
-  A <code>bytes</code> object with the message payload.</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>properties</code>:
-  A dict of application-defined string properties.</li>
-<li><code>partition_key</code>:
-  Sets the partition key for message routing. A hash of this key is used
-  to determine the message's topic partition.</li>
-<li><code>sequence_id</code>:
-  Specify a custom sequence id for the message being published.</li>
-<li><code>replication_clusters</code>:
-  Override namespace replication clusters. Note that it is the caller's
-  responsibility to provide valid cluster names and that all clusters
-  have been previously configured as topics. Given an empty list,
-  the message will replicate according to the namespace configuration.</li>
-<li><code>disable_replication</code>:
-  Do not replicate this message.</li>
-<li><code>event_timestamp</code>:
-  Timestamp in millis of the timestamp of event creation</li>
-<li><code>deliver_at</code>:
-  Specify the this message should not be delivered earlier than the
-  specified timestamp.
-  The timestamp is milliseconds and based on UTC</li>
-<li><code>deliver_after</code>:
-  Specify a delay in timedelta for the delivery of the messages.</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.send', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Producer.send" class="source">
-    <pre><code>def send(self, content,
-         properties=None,
-         partition_key=None,
-         sequence_id=None,
-         replication_clusters=None,
-         disable_replication=False,
-         event_timestamp=None,
-         deliver_at=None,
-         deliver_after=None,
-         ):
-    """
-    Publish a message on the topic. Blocks until the message is acknowledged
-    Returns a `MessageId` object that represents where the message is persisted.
-    **Args**
-    * `content`:
-      A `bytes` object with the message payload.
-    **Options**
-    * `properties`:
-      A dict of application-defined string properties.
-    * `partition_key`:
-      Sets the partition key for message routing. A hash of this key is used
-      to determine the message's topic partition.
-    * `sequence_id`:
-      Specify a custom sequence id for the message being published.
-    * `replication_clusters`:
-      Override namespace replication clusters. Note that it is the caller's
-      responsibility to provide valid cluster names and that all clusters
-      have been previously configured as topics. Given an empty list,
-      the message will replicate according to the namespace configuration.
-    * `disable_replication`:
-      Do not replicate this message.
-    * `event_timestamp`:
-      Timestamp in millis of the timestamp of event creation
-    * `deliver_at`:
-      Specify the this message should not be delivered earlier than the
-      specified timestamp.
-      The timestamp is milliseconds and based on UTC
-    * `deliver_after`:
-      Specify a delay in timedelta for the delivery of the messages.
-    """
-    msg = self._build_msg(content, properties, partition_key, sequence_id,
-                          replication_clusters, disable_replication, event_timestamp,
-                          deliver_at, deliver_after)
-    return MessageId.deserialize(self._producer.send(msg))
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Producer.send_async">
-    <p>def <span class="ident">send_async</span>(</p><p>self, content, callback, properties=None, partition_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Send a message asynchronously.</p>
-<p>The <code>callback</code> will be invoked once the message has been acknowledged
-by the broker.</p>
-<p>Example:</p>
-<pre><code>#!python
-def callback(res, msg_id):
-    print('Message published: %s' % res)
-
-producer.send_async(msg, callback)
-</code></pre>
-<p>When the producer queue is full, by default the message will be rejected
-and the callback invoked with an error code.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>content</code>:
-  A <code>bytes</code> object with the message payload.</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>properties</code>:
-  A dict of application0-defined string properties.</li>
-<li><code>partition_key</code>:
-  Sets the partition key for the message routing. A hash of this key is
-  used to determine the message's topic partition.</li>
-<li><code>sequence_id</code>:
-  Specify a custom sequence id for the message being published.</li>
-<li><code>replication_clusters</code>: Override namespace replication clusters. Note
-  that it is the caller's responsibility to provide valid cluster names
-  and that all clusters have been previously configured as topics.
-  Given an empty list, the message will replicate per the namespace
-  configuration.</li>
-<li><code>disable_replication</code>:
-  Do not replicate this message.</li>
-<li><code>event_timestamp</code>:
-  Timestamp in millis of the timestamp of event creation</li>
-<li><code>deliver_at</code>:
-  Specify the this message should not be delivered earlier than the
-  specified timestamp.
-  The timestamp is milliseconds and based on UTC</li>
-<li><code>deliver_after</code>:
-  Specify a delay in timedelta for the delivery of the messages.</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.send_async', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Producer.send_async" class="source">
-    <pre><code>def send_async(self, content, callback,
-               properties=None,
-               partition_key=None,
-               sequence_id=None,
-               replication_clusters=None,
-               disable_replication=False,
-               event_timestamp=None,
-               deliver_at=None,
-               deliver_after=None,
-               ):
-    """
-    Send a message asynchronously.
-    The `callback` will be invoked once the message has been acknowledged
-    by the broker.
-    Example:
-        #!python
-        def callback(res, msg_id):
-            print('Message published: %s' % res)
-        producer.send_async(msg, callback)
-    When the producer queue is full, by default the message will be rejected
-    and the callback invoked with an error code.
-    **Args**
-    * `content`:
-      A `bytes` object with the message payload.
-    **Options**
-    * `properties`:
-      A dict of application0-defined string properties.
-    * `partition_key`:
-      Sets the partition key for the message routing. A hash of this key is
-      used to determine the message's topic partition.
-    * `sequence_id`:
-      Specify a custom sequence id for the message being published.
-    * `replication_clusters`: Override namespace replication clusters. Note
-      that it is the caller's responsibility to provide valid cluster names
-      and that all clusters have been previously configured as topics.
-      Given an empty list, the message will replicate per the namespace
-      configuration.
-    * `disable_replication`:
-      Do not replicate this message.
-    * `event_timestamp`:
-      Timestamp in millis of the timestamp of event creation
-    * `deliver_at`:
-      Specify the this message should not be delivered earlier than the
-      specified timestamp.
-      The timestamp is milliseconds and based on UTC
-    * `deliver_after`:
-      Specify a delay in timedelta for the delivery of the messages.
-    """
-    msg = self._build_msg(content, properties, partition_key, sequence_id,
-                          replication_clusters, disable_replication, event_timestamp,
-                          deliver_at, deliver_after)
-    self._producer.send_async(msg, callback)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Producer.topic">
-    <p>def <span class="ident">topic</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Return the topic which producer is publishing to</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.topic', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Producer.topic" class="source">
-    <pre><code>def topic(self):
-    """
-    Return the topic which producer is publishing to
-    """
-    return self._producer.topic()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-      
-      <div class="item">
-      <p id="pulsar.Reader" class="name">class <span class="ident">Reader</span></p>
-      
-  
-    <div class="desc"><p>Pulsar topic reader.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Reader" class="source">
-    <pre><code>class Reader:
-    """
-    Pulsar topic reader.
-    """
-
-    def topic(self):
-        """
-        Return the topic this reader is reading from.
-        """
-        return self._reader.topic()
-
-    def read_next(self, timeout_millis=None):
-        """
-        Read a single message.
-
-        If a message is not immediately available, this method will block until
-        a new message is available.
-
-        **Options**
-
-        * `timeout_millis`:
-          If specified, the receive will raise an exception if a message is not
-          available within the timeout.
-        """
-        if timeout_millis is None:
-            msg = self._reader.read_next()
-        else:
-            _check_type(int, timeout_millis, 'timeout_millis')
-            msg = self._reader.read_next(timeout_millis)
-
-        m = Message()
-        m._message = msg
-        m._schema = self._schema
-        return m
-
-    def has_message_available(self):
-        """
-        Check if there is any message available to read from the current position.
-        """
-        return self._reader.has_message_available();
-
-    def seek(self, messageid):
-        """
-        Reset this reader to a specific message id or publish timestamp.
-        The message id can either be a specific message or represent the first or last messages in the topic.
-        Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
-        seek() on the individual partitions.
-
-        **Args**
-
-        * `message`:
-          The message id for seek, OR an integer event time to seek to
-        """
-        self._reader.seek(messageid)
-
-    def close(self):
-        """
-        Close the reader.
-        """
-        self._reader.close()
-        self._client._consumers.remove(self)
-</code></pre>
-  </div>
-</div>
-
-
-      <div class="class">
-          <h3>Ancestors (in MRO)</h3>
-          <ul class="class_list">
-          <li><a href="#pulsar.Reader">Reader</a></li>
-          </ul>
-          <h3>Methods</h3>
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Reader.close">
-    <p>def <span class="ident">close</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Close the reader.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.close', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Reader.close" class="source">
-    <pre><code>def close(self):
-    """
-    Close the reader.
-    """
-    self._reader.close()
-    self._client._consumers.remove(self)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Reader.has_message_available">
-    <p>def <span class="ident">has_message_available</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Check if there is any message available to read from the current position.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.has_message_available', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Reader.has_message_available" class="source">
-    <pre><code>def has_message_available(self):
-    """
-    Check if there is any message available to read from the current position.
-    """
-    return self._reader.has_message_available();
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Reader.read_next">
-    <p>def <span class="ident">read_next</span>(</p><p>self, timeout_millis=None)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Read a single message.</p>
-<p>If a message is not immediately available, this method will block until
-a new message is available.</p>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>timeout_millis</code>:
-  If specified, the receive will raise an exception if a message is not
-  available within the timeout.</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.read_next', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Reader.read_next" class="source">
-    <pre><code>def read_next(self, timeout_millis=None):
-    """
-    Read a single message.
-    If a message is not immediately available, this method will block until
-    a new message is available.
-    **Options**
-    * `timeout_millis`:
-      If specified, the receive will raise an exception if a message is not
-      available within the timeout.
-    """
-    if timeout_millis is None:
-        msg = self._reader.read_next()
-    else:
-        _check_type(int, timeout_millis, 'timeout_millis')
-        msg = self._reader.read_next(timeout_millis)
-    m = Message()
-    m._message = msg
-    m._schema = self._schema
-    return m
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Reader.seek">
-    <p>def <span class="ident">seek</span>(</p><p>self, messageid)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Reset this reader to a specific message id or publish timestamp.
-The message id can either be a specific message or represent the first or last messages in the topic.
-Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
-seek() on the individual partitions.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>message</code>:
-  The message id for seek, OR an integer event time to seek to</li>
-</ul></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.seek', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Reader.seek" class="source">
-    <pre><code>def seek(self, messageid):
-    """
-    Reset this reader to a specific message id or publish timestamp.
-    The message id can either be a specific message or represent the first or last messages in the topic.
-    Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
-    seek() on the individual partitions.
-    **Args**
-    * `message`:
-      The message id for seek, OR an integer event time to seek to
-    """
-    self._reader.seek(messageid)
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-            
-  <div class="item">
-    <div class="name def" id="pulsar.Reader.topic">
-    <p>def <span class="ident">topic</span>(</p><p>self)</p>
-    </div>
-    
-
-    
-  
-    <div class="desc"><p>Return the topic this reader is reading from.</p></div>
-  <div class="source_cont">
-  <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.topic', this);">Show source &equiv;</a></p>
-  <div id="source-pulsar.Reader.topic" class="source">
-    <pre><code>def topic(self):
-    """
-    Return the topic this reader is reading from.
-    """
-    return self._reader.topic()
-</code></pre>
-  </div>
-</div>
-
-  </div>
-  
-      </div>
-      </div>
-
-    <h2 class="section-title" id="header-submodules">Sub-modules</h2>
-      <div class="item">
-      <p class="name"><a href="functions/index.html">pulsar.functions</a></p>
-      
-  
-
-      </div>
-      <div class="item">
-      <p class="name"><a href="schema/index.html">pulsar.schema</a></p>
-      
-  
-
-      </div>
-  </section>
-
-    </article>
-  <div class="clear"> </div>
-  <footer id="footer">
-    <p>
-      Documentation generated by
-      <a href="https://github.com/BurntSushi/pdoc">pdoc 0.3.2</a>
-    </p>
-
-    <p>pdoc is in the public domain with the
-      <a href="http://unlicense.org">UNLICENSE</a></p>
-
-    <p>Design by <a href="http://nadh.in">Kailash Nadh</a></p>
-  </footer>
-</div>
-</body>
-</html>