You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by li...@apache.org on 2021/04/15 07:24:47 UTC
[pulsar] branch asf-site updated: [Doc] show multiple versions for
various API docs
This is an automated email from the ASF dual-hosted git repository.
liuyu pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/asf-site by this push:
new cc84f7e [Doc] show multiple versions for various API docs
cc84f7e is described below
commit cc84f7e6042c41e2757db3ae42400fb1f058bae4
Author: Anonymitaet <anonymitaet_hotmail.com>
AuthorDate: Thu Apr 15 15:23:30 2021 +0800
[Doc] show multiple versions for various API docs
---
content/api/admin/index.html | 75 -
content/api/client/index.html | 72 -
content/api/cpp/index.html | 124 -
content/api/pulsar-functions/index.html | 75 -
content/api/python/index.html | 6459 -------------------------------
5 files changed, 6805 deletions(-)
diff --git a/content/api/admin/index.html b/content/api/admin/index.html
deleted file mode 100644
index 3b19c5d..0000000
--- a/content/api/admin/index.html
+++ /dev/null
@@ -1,75 +0,0 @@
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Frameset//EN" "http://www.w3.org/TR/html4/frameset.dtd">
-<!-- NewPage -->
-<html lang="en">
-<head>
-<!-- Generated by javadoc -->
-<title>Pulsar Admin Java API</title>
-<script type="text/javascript">
- tmpTargetPage = "" + window.location.search;
- if (tmpTargetPage != "" && tmpTargetPage != "undefined")
- tmpTargetPage = tmpTargetPage.substring(1);
- if (tmpTargetPage.indexOf(":") != -1 || (tmpTargetPage != "" && !validURL(tmpTargetPage)))
- tmpTargetPage = "undefined";
- targetPage = tmpTargetPage;
- function validURL(url) {
- try {
- url = decodeURIComponent(url);
- }
- catch (error) {
- return false;
- }
- var pos = url.indexOf(".html");
- if (pos == -1 || pos != url.length - 5)
- return false;
- var allowNumber = false;
- var allowSep = false;
- var seenDot = false;
- for (var i = 0; i < url.length - 5; i++) {
- var ch = url.charAt(i);
- if ('a' <= ch && ch <= 'z' ||
- 'A' <= ch && ch <= 'Z' ||
- ch == '$' ||
- ch == '_' ||
- ch.charCodeAt(0) > 127) {
- allowNumber = true;
- allowSep = true;
- } else if ('0' <= ch && ch <= '9'
- || ch == '-') {
- if (!allowNumber)
- return false;
- } else if (ch == '/' || ch == '.') {
- if (!allowSep)
- return false;
- allowNumber = false;
- allowSep = false;
- if (ch == '.')
- seenDot = true;
- if (ch == '/' && seenDot)
- return false;
- } else {
- return false;
- }
- }
- return true;
- }
- function loadFrames() {
- if (targetPage != "" && targetPage != "undefined")
- top.classFrame.location = top.targetPage;
- }
-</script>
-</head>
-<frameset cols="20%,80%" title="Documentation frame" onload="top.loadFrames()">
-<frameset rows="30%,70%" title="Left frames" onload="top.loadFrames()">
-<frame src="overview-frame.html" name="packageListFrame" title="All Packages">
-<frame src="allclasses-frame.html" name="packageFrame" title="All classes and interfaces (except non-static nested types)">
-</frameset>
-<frame src="overview-summary.html" name="classFrame" title="Package, class and interface descriptions" scrolling="yes">
-<noframes>
-<noscript>
-<div>JavaScript is disabled on your browser.</div>
-</noscript>
-<h2>Frame Alert</h2>
-<p>This document is designed to be viewed using the frames feature. If you see this message, you are using a non-frame-capable web client. Link to <a href="overview-summary.html">Non-frame version</a>.</p>
-</noframes>
-</frameset>
-</html>
diff --git a/content/api/client/index.html b/content/api/client/index.html
deleted file mode 100644
index 300f7e4..0000000
--- a/content/api/client/index.html
+++ /dev/null
@@ -1,72 +0,0 @@
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Frameset//EN" "http://www.w3.org/TR/html4/frameset.dtd">
-<!-- NewPage -->
-<html lang="en">
-<head>
-<!-- Generated by javadoc -->
-<title>Pulsar Client Java API</title>
-<script type="text/javascript">
- tmpTargetPage = "" + window.location.search;
- if (tmpTargetPage != "" && tmpTargetPage != "undefined")
- tmpTargetPage = tmpTargetPage.substring(1);
- if (tmpTargetPage.indexOf(":") != -1 || (tmpTargetPage != "" && !validURL(tmpTargetPage)))
- tmpTargetPage = "undefined";
- targetPage = tmpTargetPage;
- function validURL(url) {
- try {
- url = decodeURIComponent(url);
- }
- catch (error) {
- return false;
- }
- var pos = url.indexOf(".html");
- if (pos == -1 || pos != url.length - 5)
- return false;
- var allowNumber = false;
- var allowSep = false;
- var seenDot = false;
- for (var i = 0; i < url.length - 5; i++) {
- var ch = url.charAt(i);
- if ('a' <= ch && ch <= 'z' ||
- 'A' <= ch && ch <= 'Z' ||
- ch == '$' ||
- ch == '_' ||
- ch.charCodeAt(0) > 127) {
- allowNumber = true;
- allowSep = true;
- } else if ('0' <= ch && ch <= '9'
- || ch == '-') {
- if (!allowNumber)
- return false;
- } else if (ch == '/' || ch == '.') {
- if (!allowSep)
- return false;
- allowNumber = false;
- allowSep = false;
- if (ch == '.')
- seenDot = true;
- if (ch == '/' && seenDot)
- return false;
- } else {
- return false;
- }
- }
- return true;
- }
- function loadFrames() {
- if (targetPage != "" && targetPage != "undefined")
- top.classFrame.location = top.targetPage;
- }
-</script>
-</head>
-<frameset cols="20%,80%" title="Documentation frame" onload="top.loadFrames()">
-<frame src="allclasses-frame.html" name="packageFrame" title="All classes and interfaces (except non-static nested types)">
-<frame src="overview-summary.html" name="classFrame" title="Package, class and interface descriptions" scrolling="yes">
-<noframes>
-<noscript>
-<div>JavaScript is disabled on your browser.</div>
-</noscript>
-<h2>Frame Alert</h2>
-<p>This document is designed to be viewed using the frames feature. If you see this message, you are using a non-frame-capable web client. Link to <a href="overview-summary.html">Non-frame version</a>.</p>
-</noframes>
-</frameset>
-</html>
diff --git a/content/api/cpp/index.html b/content/api/cpp/index.html
deleted file mode 100644
index 93d8aa7..0000000
--- a/content/api/cpp/index.html
+++ /dev/null
@@ -1,124 +0,0 @@
-<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
-<html xmlns="http://www.w3.org/1999/xhtml">
-<head>
-<meta http-equiv="Content-Type" content="text/xhtml;charset=UTF-8"/>
-<meta http-equiv="X-UA-Compatible" content="IE=9"/>
-<meta name="generator" content="Doxygen 1.8.11"/>
-<title>pulsar-client-cpp: Main Page</title>
-<link href="tabs.css" rel="stylesheet" type="text/css"/>
-<script type="text/javascript" src="jquery.js"></script>
-<script type="text/javascript" src="dynsections.js"></script>
-<link href="search/search.css" rel="stylesheet" type="text/css"/>
-<script type="text/javascript" src="search/searchdata.js"></script>
-<script type="text/javascript" src="search/search.js"></script>
-<script type="text/javascript">
- $(document).ready(function() { init_search(); });
-</script>
-<link href="doxygen.css" rel="stylesheet" type="text/css" />
-</head>
-<body>
-<div id="top"><!-- do not remove this div, it is closed by doxygen! -->
-<div id="titlearea">
-<table cellspacing="0" cellpadding="0">
- <tbody>
- <tr style="height: 56px;">
- <td id="projectalign" style="padding-left: 0.5em;">
- <div id="projectname">pulsar-client-cpp
- </div>
- </td>
- </tr>
- </tbody>
-</table>
-</div>
-<!-- end header part -->
-<!-- Generated by Doxygen 1.8.11 -->
-<script type="text/javascript">
-var searchBox = new SearchBox("searchBox", "search",false,'Search');
-</script>
- <div id="navrow1" class="tabs">
- <ul class="tablist">
- <li class="current"><a href="index.html"><span>Main Page</span></a></li>
- <li><a href="pages.html"><span>Related Pages</span></a></li>
- <li><a href="namespaces.html"><span>Namespaces</span></a></li>
- <li><a href="annotated.html"><span>Classes</span></a></li>
- <li><a href="files.html"><span>Files</span></a></li>
- <li>
- <div id="MSearchBox" class="MSearchBoxInactive">
- <span class="left">
- <img id="MSearchSelect" src="search/mag_sel.png"
- onmouseover="return searchBox.OnSearchSelectShow()"
- onmouseout="return searchBox.OnSearchSelectHide()"
- alt=""/>
- <input type="text" id="MSearchField" value="Search" accesskey="S"
- onfocus="searchBox.OnSearchFieldFocus(true)"
- onblur="searchBox.OnSearchFieldFocus(false)"
- onkeyup="searchBox.OnSearchFieldChange(event)"/>
- </span><span class="right">
- <a id="MSearchClose" href="javascript:searchBox.CloseResultsWindow()"><img id="MSearchCloseImg" border="0" src="search/close.png" alt=""/></a>
- </span>
- </div>
- </li>
- </ul>
- </div>
-</div><!-- top -->
-<!-- window showing the filter options -->
-<div id="MSearchSelectWindow"
- onmouseover="return searchBox.OnSearchSelectShow()"
- onmouseout="return searchBox.OnSearchSelectHide()"
- onkeydown="return searchBox.OnSearchSelectKey(event)">
-</div>
-
-<!-- iframe showing the search results (closed by default) -->
-<div id="MSearchResultsWindow">
-<iframe src="javascript:void(0)" frameborder="0"
- name="MSearchResults" id="MSearchResults">
-</iframe>
-</div>
-
-<div class="header">
- <div class="headertitle">
-<div class="title">pulsar-client-cpp Documentation</div> </div>
-</div><!--header-->
-<div class="contents">
-<div class="textblock"><h1>The Pulsar C++ client</h1>
-<p>Welcome to the Doxygen documentation for <a href="https://pulsar.apache.org/">Pulsar</a>.</p>
-<h2>Supported platforms</h2>
-<p>The Pulsar C++ client has been successfully tested on <b>MacOS</b> and <b>Linux</b>.</p>
-<h2>System requirements</h2>
-<p>You need to have the following installed to use the C++ client:</p>
-<ul>
-<li><a href="https://cmake.org/">CMake</a></li>
-<li><a href="http://www.boost.org/">Boost</a></li>
-<li><a href="https://developers.google.com/protocol-buffers/">Protocol Buffers</a> 2.6</li>
-<li><a href="https://logging.apache.org/log4cxx">Log4CXX</a></li>
-<li><a href="https://curl.haxx.se/libcurl/">libcurl</a></li>
-<li><a href="https://github.com/google/googletest">Google Test</a></li>
-<li><a href="https://github.com/open-source-parsers/jsoncpp">JsonCpp</a></li>
-</ul>
-<h2>Compilation</h2>
-<p>There are separate compilation instructions for <a href="#macos">MacOS</a> and <a href="#linux">Linux</a>. For both systems, start by cloning the Pulsar repository:</p>
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno"> 1</span> $ git clone https://github.com/apache/pulsar</div></div><!-- fragment --><h3>Linux</h3>
-<p>First, install all of the necessary dependencies:</p>
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno"> 1</span> $ apt-get install cmake libssl-dev libcurl4-openssl-dev liblog4cxx-dev \</div><div class="line"><a name="l00002"></a><span class="lineno"> 2</span>  libprotobuf-dev libboost-all-dev libgtest-dev libjsoncpp-dev</div></div><!-- fragment --><p>Then compile and install <a href="https://github.com/google/googletest">Google Test</a>:</p>
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno"> 1</span> $ git clone https://github.com/google/googletest.git && cd googletest</div><div class="line"><a name="l00002"></a><span class="lineno"> 2</span> $ sudo cmake .</div><div class="line"><a name="l00003"></a><span class="lineno"> 3</span> $ sudo make</div><div class="line"><a name="l00004"></a><span class="lineno"> 4</span> $ sudo cp *.a /usr/lib</div></div><!-- [...]
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno"> 1</span> $ cd pulsar-client-cpp</div><div class="line"><a name="l00002"></a><span class="lineno"> 2</span> $ cmake .</div><div class="line"><a name="l00003"></a><span class="lineno"> 3</span> $ make</div></div><!-- fragment --><p>The resulting files, <code>libpulsar.so</code> and <code>libpulsar.a</code>, will be placed in the <code>lib</code> folder of the repo while two tools, <co [...]
-<h3>MacOS</h3>
-<p>First, install all of the necessary dependencies:</p>
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno"> 1</span> # OpenSSL installation</div><div class="line"><a name="l00002"></a><span class="lineno"> 2</span> $ brew install openssl</div><div class="line"><a name="l00003"></a><span class="lineno"> 3</span> $ export OPENSSL_INCLUDE_DIR=/usr/local/opt/openssl/include/</div><div class="line"><a name="l00004"></a><span class="lineno"> 4</span> $ export OPENSSL_ROOT_DIR=/usr/local [...]
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno"> 1</span> $ cd pulsar-client-cpp</div><div class="line"><a name="l00002"></a><span class="lineno"> 2</span> $ cmake .</div><div class="line"><a name="l00003"></a><span class="lineno"> 3</span> $ make</div></div><!-- fragment --><h2>Consumer</h2>
-<div class="fragment"><div class="line">Client client(<span class="stringliteral">"pulsar://localhost:6650"</span>);</div><div class="line"></div><div class="line">Consumer consumer;</div><div class="line"><a class="code" href="namespacepulsar.html#ae85314d6b9e8afd831cf8c66705f2dbb">Result</a> result = client.subscribe(<span class="stringliteral">"persistent://sample/standalone/ns1/my-topic"</span>, <span class="stringliteral">"my-subscribtion-name"</span>, [...]
-<div class="fragment"><div class="line">Client client(<span class="stringliteral">"pulsar://localhost:6650"</span>);</div><div class="line"></div><div class="line">Producer producer;</div><div class="line"><a class="code" href="namespacepulsar.html#ae85314d6b9e8afd831cf8c66705f2dbb">Result</a> result = client.createProducer(<span class="stringliteral">"persistent://sample/standalone/ns1/my-topic"</span>, producer);</div><div class="line"><span class="keywordflow">if</ [...]
-<div class="fragment"><div class="line">ClientConfiguration config = ClientConfiguration();</div><div class="line">config.setUseTls(<span class="keyword">true</span>);</div><div class="line">std::string certfile = <span class="stringliteral">"/path/to/cacert.pem"</span>;</div><div class="line"></div><div class="line">ParamMap params;</div><div class="line">params[<span class="stringliteral">"tlsCertFile"</span>] = <span class="stringliteral">"/path/to/client-cert [...]
-<p>After you changed code, run auto-formatting by the following command.</p>
-<div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno"> 1</span> make format</div></div><!-- fragment --><p> You need to have the following installed to use the auto-formatting.</p><ul>
-<li><a href="https://clang.llvm.org/">clang-format 5.0</a> </li>
-</ul>
-</div></div><!-- contents -->
-<!-- start footer part -->
-<hr class="footer"/><address class="footer"><small>
-Generated by  <a href="http://www.doxygen.org/index.html">
-<img class="footer" src="doxygen.png" alt="doxygen"/>
-</a> 1.8.11
-</small></address>
-</body>
-</html>
diff --git a/content/api/pulsar-functions/index.html b/content/api/pulsar-functions/index.html
deleted file mode 100644
index ff28246..0000000
--- a/content/api/pulsar-functions/index.html
+++ /dev/null
@@ -1,75 +0,0 @@
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Frameset//EN" "http://www.w3.org/TR/html4/frameset.dtd">
-<!-- NewPage -->
-<html lang="en">
-<head>
-<!-- Generated by javadoc -->
-<title>Pulsar Functions Java SDK</title>
-<script type="text/javascript">
- tmpTargetPage = "" + window.location.search;
- if (tmpTargetPage != "" && tmpTargetPage != "undefined")
- tmpTargetPage = tmpTargetPage.substring(1);
- if (tmpTargetPage.indexOf(":") != -1 || (tmpTargetPage != "" && !validURL(tmpTargetPage)))
- tmpTargetPage = "undefined";
- targetPage = tmpTargetPage;
- function validURL(url) {
- try {
- url = decodeURIComponent(url);
- }
- catch (error) {
- return false;
- }
- var pos = url.indexOf(".html");
- if (pos == -1 || pos != url.length - 5)
- return false;
- var allowNumber = false;
- var allowSep = false;
- var seenDot = false;
- for (var i = 0; i < url.length - 5; i++) {
- var ch = url.charAt(i);
- if ('a' <= ch && ch <= 'z' ||
- 'A' <= ch && ch <= 'Z' ||
- ch == '$' ||
- ch == '_' ||
- ch.charCodeAt(0) > 127) {
- allowNumber = true;
- allowSep = true;
- } else if ('0' <= ch && ch <= '9'
- || ch == '-') {
- if (!allowNumber)
- return false;
- } else if (ch == '/' || ch == '.') {
- if (!allowSep)
- return false;
- allowNumber = false;
- allowSep = false;
- if (ch == '.')
- seenDot = true;
- if (ch == '/' && seenDot)
- return false;
- } else {
- return false;
- }
- }
- return true;
- }
- function loadFrames() {
- if (targetPage != "" && targetPage != "undefined")
- top.classFrame.location = top.targetPage;
- }
-</script>
-</head>
-<frameset cols="20%,80%" title="Documentation frame" onload="top.loadFrames()">
-<frameset rows="30%,70%" title="Left frames" onload="top.loadFrames()">
-<frame src="overview-frame.html" name="packageListFrame" title="All Packages">
-<frame src="allclasses-frame.html" name="packageFrame" title="All classes and interfaces (except non-static nested types)">
-</frameset>
-<frame src="overview-summary.html" name="classFrame" title="Package, class and interface descriptions" scrolling="yes">
-<noframes>
-<noscript>
-<div>JavaScript is disabled on your browser.</div>
-</noscript>
-<h2>Frame Alert</h2>
-<p>This document is designed to be viewed using the frames feature. If you see this message, you are using a non-frame-capable web client. Link to <a href="overview-summary.html">Non-frame version</a>.</p>
-</noframes>
-</frameset>
-</html>
diff --git a/content/api/python/index.html b/content/api/python/index.html
deleted file mode 100644
index 0cd0cf1..0000000
--- a/content/api/python/index.html
+++ /dev/null
@@ -1,6459 +0,0 @@
-<!doctype html>
-<head>
- <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
- <meta name="viewport" content="width=device-width, initial-scale=1, minimum-scale=1" />
-
- <title>pulsar API documentation</title>
- <meta name="description" content="The Pulsar Python client library is based on the existing C++ client library.
-All the same features ..." />
-
- <link href='http://fonts.googleapis.com/css?family=Source+Sans+Pro:400,300' rel='stylesheet' type='text/css'>
-
- <style type="text/css">
-
-* {
- box-sizing: border-box;
-}
-/*! normalize.css v1.1.1 | MIT License | git.io/normalize */
-
-/* ==========================================================================
- HTML5 display definitions
- ========================================================================== */
-
-/**
- * Correct `block` display not defined in IE 6/7/8/9 and Firefox 3.
- */
-
-article,
-aside,
-details,
-figcaption,
-figure,
-footer,
-header,
-hgroup,
-main,
-nav,
-section,
-summary {
- display: block;
-}
-
-/**
- * Correct `inline-block` display not defined in IE 6/7/8/9 and Firefox 3.
- */
-
-audio,
-canvas,
-video {
- display: inline-block;
- *display: inline;
- *zoom: 1;
-}
-
-/**
- * Prevent modern browsers from displaying `audio` without controls.
- * Remove excess height in iOS 5 devices.
- */
-
-audio:not([controls]) {
- display: none;
- height: 0;
-}
-
-/**
- * Address styling not present in IE 7/8/9, Firefox 3, and Safari 4.
- * Known issue: no IE 6 support.
- */
-
-[hidden] {
- display: none;
-}
-
-/* ==========================================================================
- Base
- ========================================================================== */
-
-/**
- * 1. Prevent system color scheme's background color being used in Firefox, IE,
- * and Opera.
- * 2. Prevent system color scheme's text color being used in Firefox, IE, and
- * Opera.
- * 3. Correct text resizing oddly in IE 6/7 when body `font-size` is set using
- * `em` units.
- * 4. Prevent iOS text size adjust after orientation change, without disabling
- * user zoom.
- */
-
-html {
- background: #fff; /* 1 */
- color: #000; /* 2 */
- font-size: 100%; /* 3 */
- -webkit-text-size-adjust: 100%; /* 4 */
- -ms-text-size-adjust: 100%; /* 4 */
-}
-
-/**
- * Address `font-family` inconsistency between `textarea` and other form
- * elements.
- */
-
-html,
-button,
-input,
-select,
-textarea {
- font-family: sans-serif;
-}
-
-/**
- * Address margins handled incorrectly in IE 6/7.
- */
-
-body {
- margin: 0;
-}
-
-/* ==========================================================================
- Links
- ========================================================================== */
-
-/**
- * Address `outline` inconsistency between Chrome and other browsers.
- */
-
-a:focus {
- outline: thin dotted;
-}
-
-/**
- * Improve readability when focused and also mouse hovered in all browsers.
- */
-
-a:active,
-a:hover {
- outline: 0;
-}
-
-/* ==========================================================================
- Typography
- ========================================================================== */
-
-/**
- * Address font sizes and margins set differently in IE 6/7.
- * Address font sizes within `section` and `article` in Firefox 4+, Safari 5,
- * and Chrome.
- */
-
-h1 {
- font-size: 2em;
- margin: 0.67em 0;
-}
-
-h2 {
- font-size: 1.5em;
- margin: 0.83em 0;
-}
-
-h3 {
- font-size: 1.17em;
- margin: 1em 0;
-}
-
-h4 {
- font-size: 1em;
- margin: 1.33em 0;
-}
-
-h5 {
- font-size: 0.83em;
- margin: 1.67em 0;
-}
-
-h6 {
- font-size: 0.67em;
- margin: 2.33em 0;
-}
-
-/**
- * Address styling not present in IE 7/8/9, Safari 5, and Chrome.
- */
-
-abbr[title] {
- border-bottom: 1px dotted;
-}
-
-/**
- * Address style set to `bolder` in Firefox 3+, Safari 4/5, and Chrome.
- */
-
-b,
-strong {
- font-weight: bold;
-}
-
-blockquote {
- margin: 1em 40px;
-}
-
-/**
- * Address styling not present in Safari 5 and Chrome.
- */
-
-dfn {
- font-style: italic;
-}
-
-/**
- * Address differences between Firefox and other browsers.
- * Known issue: no IE 6/7 normalization.
- */
-
-hr {
- -moz-box-sizing: content-box;
- box-sizing: content-box;
- height: 0;
-}
-
-/**
- * Address styling not present in IE 6/7/8/9.
- */
-
-mark {
- background: #ff0;
- color: #000;
-}
-
-/**
- * Address margins set differently in IE 6/7.
- */
-
-p,
-pre {
- margin: 1em 0;
-}
-
-/**
- * Correct font family set oddly in IE 6, Safari 4/5, and Chrome.
- */
-
-code,
-kbd,
-pre,
-samp {
- font-family: monospace, serif;
- _font-family: 'courier new', monospace;
- font-size: 1em;
-}
-
-/**
- * Improve readability of pre-formatted text in all browsers.
- */
-
-pre {
- white-space: pre;
- white-space: pre-wrap;
- word-wrap: break-word;
-}
-
-/**
- * Address CSS quotes not supported in IE 6/7.
- */
-
-q {
- quotes: none;
-}
-
-/**
- * Address `quotes` property not supported in Safari 4.
- */
-
-q:before,
-q:after {
- content: '';
- content: none;
-}
-
-/**
- * Address inconsistent and variable font size in all browsers.
- */
-
-small {
- font-size: 80%;
-}
-
-/**
- * Prevent `sub` and `sup` affecting `line-height` in all browsers.
- */
-
-sub,
-sup {
- font-size: 75%;
- line-height: 0;
- position: relative;
- vertical-align: baseline;
-}
-
-sup {
- top: -0.5em;
-}
-
-sub {
- bottom: -0.25em;
-}
-
-/* ==========================================================================
- Lists
- ========================================================================== */
-
-/**
- * Address margins set differently in IE 6/7.
- */
-
-dl,
-menu,
-ol,
-ul {
- margin: 1em 0;
-}
-
-dd {
- margin: 0 0 0 40px;
-}
-
-/**
- * Address paddings set differently in IE 6/7.
- */
-
-menu,
-ol,
-ul {
- padding: 0 0 0 40px;
-}
-
-/**
- * Correct list images handled incorrectly in IE 7.
- */
-
-nav ul,
-nav ol {
- list-style: none;
- list-style-image: none;
-}
-
-/* ==========================================================================
- Embedded content
- ========================================================================== */
-
-/**
- * 1. Remove border when inside `a` element in IE 6/7/8/9 and Firefox 3.
- * 2. Improve image quality when scaled in IE 7.
- */
-
-img {
- border: 0; /* 1 */
- -ms-interpolation-mode: bicubic; /* 2 */
-}
-
-/**
- * Correct overflow displayed oddly in IE 9.
- */
-
-svg:not(:root) {
- overflow: hidden;
-}
-
-/* ==========================================================================
- Figures
- ========================================================================== */
-
-/**
- * Address margin not present in IE 6/7/8/9, Safari 5, and Opera 11.
- */
-
-figure {
- margin: 0;
-}
-
-/* ==========================================================================
- Forms
- ========================================================================== */
-
-/**
- * Correct margin displayed oddly in IE 6/7.
- */
-
-form {
- margin: 0;
-}
-
-/**
- * Define consistent border, margin, and padding.
- */
-
-fieldset {
- border: 1px solid #c0c0c0;
- margin: 0 2px;
- padding: 0.35em 0.625em 0.75em;
-}
-
-/**
- * 1. Correct color not being inherited in IE 6/7/8/9.
- * 2. Correct text not wrapping in Firefox 3.
- * 3. Correct alignment displayed oddly in IE 6/7.
- */
-
-legend {
- border: 0; /* 1 */
- padding: 0;
- white-space: normal; /* 2 */
- *margin-left: -7px; /* 3 */
-}
-
-/**
- * 1. Correct font size not being inherited in all browsers.
- * 2. Address margins set differently in IE 6/7, Firefox 3+, Safari 5,
- * and Chrome.
- * 3. Improve appearance and consistency in all browsers.
- */
-
-button,
-input,
-select,
-textarea {
- font-size: 100%; /* 1 */
- margin: 0; /* 2 */
- vertical-align: baseline; /* 3 */
- *vertical-align: middle; /* 3 */
-}
-
-/**
- * Address Firefox 3+ setting `line-height` on `input` using `!important` in
- * the UA stylesheet.
- */
-
-button,
-input {
- line-height: normal;
-}
-
-/**
- * Address inconsistent `text-transform` inheritance for `button` and `select`.
- * All other form control elements do not inherit `text-transform` values.
- * Correct `button` style inheritance in Chrome, Safari 5+, and IE 6+.
- * Correct `select` style inheritance in Firefox 4+ and Opera.
- */
-
-button,
-select {
- text-transform: none;
-}
-
-/**
- * 1. Avoid the WebKit bug in Android 4.0.* where (2) destroys native `audio`
- * and `video` controls.
- * 2. Correct inability to style clickable `input` types in iOS.
- * 3. Improve usability and consistency of cursor style between image-type
- * `input` and others.
- * 4. Remove inner spacing in IE 7 without affecting normal text inputs.
- * Known issue: inner spacing remains in IE 6.
- */
-
-button,
-html input[type="button"], /* 1 */
-input[type="reset"],
-input[type="submit"] {
- -webkit-appearance: button; /* 2 */
- cursor: pointer; /* 3 */
- *overflow: visible; /* 4 */
-}
-
-/**
- * Re-set default cursor for disabled elements.
- */
-
-button[disabled],
-html input[disabled] {
- cursor: default;
-}
-
-/**
- * 1. Address box sizing set to content-box in IE 8/9.
- * 2. Remove excess padding in IE 8/9.
- * 3. Remove excess padding in IE 7.
- * Known issue: excess padding remains in IE 6.
- */
-
-input[type="checkbox"],
-input[type="radio"] {
- box-sizing: border-box; /* 1 */
- padding: 0; /* 2 */
- *height: 13px; /* 3 */
- *width: 13px; /* 3 */
-}
-
-/**
- * 1. Address `appearance` set to `searchfield` in Safari 5 and Chrome.
- * 2. Address `box-sizing` set to `border-box` in Safari 5 and Chrome
- * (include `-moz` to future-proof).
- */
-
-input[type="search"] {
- -webkit-appearance: textfield; /* 1 */
- -moz-box-sizing: content-box;
- -webkit-box-sizing: content-box; /* 2 */
- box-sizing: content-box;
-}
-
-/**
- * Remove inner padding and search cancel button in Safari 5 and Chrome
- * on OS X.
- */
-
-input[type="search"]::-webkit-search-cancel-button,
-input[type="search"]::-webkit-search-decoration {
- -webkit-appearance: none;
-}
-
-/**
- * Remove inner padding and border in Firefox 3+.
- */
-
-button::-moz-focus-inner,
-input::-moz-focus-inner {
- border: 0;
- padding: 0;
-}
-
-/**
- * 1. Remove default vertical scrollbar in IE 6/7/8/9.
- * 2. Improve readability and alignment in all browsers.
- */
-
-textarea {
- overflow: auto; /* 1 */
- vertical-align: top; /* 2 */
-}
-
-/* ==========================================================================
- Tables
- ========================================================================== */
-
-/**
- * Remove most spacing between table cells.
- */
-
-table {
- border-collapse: collapse;
- border-spacing: 0;
-}
-
- </style>
-
- <style type="text/css">
-
- html, body {
- margin: 0;
- padding: 0;
- min-height: 100%;
- }
- body {
- background: #fff;
- font-family: "Source Sans Pro", "Helvetica Neueue", Helvetica, sans;
- font-weight: 300;
- font-size: 16px;
- line-height: 1.6em;
- }
- #content {
- width: 70%;
- max-width: 850px;
- float: left;
- padding: 30px 60px;
- border-left: 1px solid #ddd;
- }
- #sidebar {
- width: 25%;
- float: left;
- padding: 30px;
- overflow: hidden;
- }
- #nav {
- font-size: 130%;
- margin: 0 0 15px 0;
- }
-
- #top {
- display: block;
- position: fixed;
- bottom: 5px;
- left: 5px;
- font-size: .85em;
- text-transform: uppercase;
- }
-
- #footer {
- font-size: .75em;
- padding: 5px 30px;
- border-top: 1px solid #ddd;
- text-align: right;
- }
- #footer p {
- margin: 0 0 0 30px;
- display: inline-block;
- }
-
- h1, h2, h3, h4, h5 {
- font-weight: 300;
- }
- h1 {
- font-size: 2.5em;
- line-height: 1.1em;
- margin: 0 0 .50em 0;
- }
-
- h2 {
- font-size: 1.75em;
- margin: 1em 0 .50em 0;
- }
-
- h3 {
- margin: 25px 0 10px 0;
- }
-
- h4 {
- margin: 0;
- font-size: 105%;
- }
-
- a {
- color: #058;
- text-decoration: none;
- transition: color .3s ease-in-out;
- }
-
- a:hover {
- color: #e08524;
- transition: color .3s ease-in-out;
- }
-
- pre, code, .mono, .name {
- font-family: "Ubuntu Mono", "Cousine", "DejaVu Sans Mono", monospace;
- }
-
- .title .name {
- font-weight: bold;
- }
- .section-title {
- margin-top: 2em;
- }
- .ident {
- color: #900;
- }
-
- code {
- background: #f9f9f9;
- }
-
- pre {
- background: #fefefe;
- border: 1px solid #ddd;
- box-shadow: 2px 2px 0 #f3f3f3;
- margin: 0 30px;
- padding: 15px 30px;
- }
-
- .codehilite {
- margin: 0 30px 10px 30px;
- }
-
- .codehilite pre {
- margin: 0;
- }
- .codehilite .err { background: #ff3300; color: #fff !important; }
-
- table#module-list {
- font-size: 110%;
- }
-
- table#module-list tr td:first-child {
- padding-right: 10px;
- white-space: nowrap;
- }
-
- table#module-list td {
- vertical-align: top;
- padding-bottom: 8px;
- }
-
- table#module-list td p {
- margin: 0 0 7px 0;
- }
-
- .def {
- display: table;
- }
-
- .def p {
- display: table-cell;
- vertical-align: top;
- text-align: left;
- }
-
- .def p:first-child {
- white-space: nowrap;
- }
-
- .def p:last-child {
- width: 100%;
- }
-
-
- #index {
- list-style-type: none;
- margin: 0;
- padding: 0;
- }
- ul#index .class_name {
- /* font-size: 110%; */
- font-weight: bold;
- }
- #index ul {
- margin: 0;
- }
-
- .item {
- margin: 0 0 15px 0;
- }
-
- .item .class {
- margin: 0 0 25px 30px;
- }
-
- .item .class ul.class_list {
- margin: 0 0 20px 0;
- }
-
- .item .name {
- background: #fafafa;
- margin: 0;
- font-weight: bold;
- padding: 5px 10px;
- border-radius: 3px;
- display: inline-block;
- min-width: 40%;
- }
- .item .name:hover {
- background: #f6f6f6;
- }
-
- .item .empty_desc {
- margin: 0 0 5px 0;
- padding: 0;
- }
-
- .item .inheritance {
- margin: 3px 0 0 30px;
- }
-
- .item .inherited {
- color: #666;
- }
-
- .item .desc {
- padding: 0 8px;
- margin: 0;
- }
-
- .item .desc p {
- margin: 0 0 10px 0;
- }
-
- .source_cont {
- margin: 0;
- padding: 0;
- }
-
- .source_link a {
- background: #ffc300;
- font-weight: 400;
- font-size: .75em;
- text-transform: uppercase;
- color: #fff;
- text-shadow: 1px 1px 0 #f4b700;
-
- padding: 3px 8px;
- border-radius: 2px;
- transition: background .3s ease-in-out;
- }
- .source_link a:hover {
- background: #FF7200;
- text-shadow: none;
- transition: background .3s ease-in-out;
- }
-
- .source {
- display: none;
- max-height: 600px;
- overflow-y: scroll;
- margin-bottom: 15px;
- }
-
- .source .codehilite {
- margin: 0;
- }
-
- .desc h1, .desc h2, .desc h3 {
- font-size: 100% !important;
- }
- .clear {
- clear: both;
- }
-
- @media all and (max-width: 950px) {
- #sidebar {
- width: 35%;
- }
- #content {
- width: 65%;
- }
- }
- @media all and (max-width: 650px) {
- #top {
- display: none;
- }
- #sidebar {
- float: none;
- width: auto;
- }
- #content {
- float: none;
- width: auto;
- padding: 30px;
- }
-
- #index ul {
- padding: 0;
- margin-bottom: 15px;
- }
- #index ul li {
- display: inline-block;
- margin-right: 30px;
- }
- #footer {
- text-align: left;
- }
- #footer p {
- display: block;
- margin: inherit;
- }
- }
-
- /*****************************/
-
- </style>
-
-
- <style type="text/css">
-
-/* ==========================================================================
- EXAMPLE Media Queries for Responsive Design.
- These examples override the primary ('mobile first') styles.
- Modify as content requires.
- ========================================================================== */
-
-@media only screen and (min-width: 35em) {
- /* Style adjustments for viewports that meet the condition */
-}
-
-@media print,
- (-o-min-device-pixel-ratio: 5/4),
- (-webkit-min-device-pixel-ratio: 1.25),
- (min-resolution: 120dpi) {
- /* Style adjustments for high resolution devices */
-}
-
-/* ==========================================================================
- Print styles.
- Inlined to avoid required HTTP connection: h5bp.com/r
- ========================================================================== */
-
-@media print {
- * {
- background: transparent !important;
- color: #000 !important; /* Black prints faster: h5bp.com/s */
- box-shadow: none !important;
- text-shadow: none !important;
- }
-
- a,
- a:visited {
- text-decoration: underline;
- }
-
- a[href]:after {
- content: " (" attr(href) ")";
- }
-
- abbr[title]:after {
- content: " (" attr(title) ")";
- }
-
- /*
- * Don't show links for images, or javascript/internal links
- */
-
- .ir a:after,
- a[href^="javascript:"]:after,
- a[href^="#"]:after {
- content: "";
- }
-
- pre,
- blockquote {
- border: 1px solid #999;
- page-break-inside: avoid;
- }
-
- thead {
- display: table-header-group; /* h5bp.com/t */
- }
-
- tr,
- img {
- page-break-inside: avoid;
- }
-
- img {
- max-width: 100% !important;
- }
-
- @page {
- margin: 0.5cm;
- }
-
- p,
- h2,
- h3 {
- orphans: 3;
- widows: 3;
- }
-
- h2,
- h3 {
- page-break-after: avoid;
- }
-}
-
- </style>
-
- <script type="text/javascript">
- function toggle(id, $link) {
- $node = document.getElementById(id);
- if (!$node)
- return;
- if (!$node.style.display || $node.style.display == 'none') {
- $node.style.display = 'block';
- $link.innerHTML = 'Hide source ≢';
- } else {
- $node.style.display = 'none';
- $link.innerHTML = 'Show source ≡';
- }
- }
- </script>
-</head>
-<body>
-<a href="#" id="top">Top</a>
-
-<div id="container">
-
-
- <div id="sidebar">
- <h1>Index</h1>
- <ul id="index">
-
-
- <li class="set"><h3><a href="#header-classes">Classes</a></h3>
- <ul>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.Authentication">Authentication</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.Authentication.__init__">__init__</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.AuthenticationAthenz">AuthenticationAthenz</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.AuthenticationAthenz.__init__">__init__</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.AuthenticationOauth2">AuthenticationOauth2</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.AuthenticationOauth2.__init__">__init__</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.AuthenticationTLS">AuthenticationTLS</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.AuthenticationTLS.__init__">__init__</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.AuthenticationToken">AuthenticationToken</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.AuthenticationToken.__init__">__init__</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.Client">Client</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.Client.__init__">__init__</a></li>
- <li class="mono"><a href="#pulsar.Client.close">close</a></li>
- <li class="mono"><a href="#pulsar.Client.create_producer">create_producer</a></li>
- <li class="mono"><a href="#pulsar.Client.create_reader">create_reader</a></li>
- <li class="mono"><a href="#pulsar.Client.get_topic_partitions">get_topic_partitions</a></li>
- <li class="mono"><a href="#pulsar.Client.subscribe">subscribe</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.Consumer">Consumer</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.Consumer.acknowledge">acknowledge</a></li>
- <li class="mono"><a href="#pulsar.Consumer.acknowledge_cumulative">acknowledge_cumulative</a></li>
- <li class="mono"><a href="#pulsar.Consumer.close">close</a></li>
- <li class="mono"><a href="#pulsar.Consumer.negative_acknowledge">negative_acknowledge</a></li>
- <li class="mono"><a href="#pulsar.Consumer.pause_message_listener">pause_message_listener</a></li>
- <li class="mono"><a href="#pulsar.Consumer.receive">receive</a></li>
- <li class="mono"><a href="#pulsar.Consumer.redeliver_unacknowledged_messages">redeliver_unacknowledged_messages</a></li>
- <li class="mono"><a href="#pulsar.Consumer.resume_message_listener">resume_message_listener</a></li>
- <li class="mono"><a href="#pulsar.Consumer.seek">seek</a></li>
- <li class="mono"><a href="#pulsar.Consumer.subscription_name">subscription_name</a></li>
- <li class="mono"><a href="#pulsar.Consumer.topic">topic</a></li>
- <li class="mono"><a href="#pulsar.Consumer.unsubscribe">unsubscribe</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.CryptoKeyReader">CryptoKeyReader</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.CryptoKeyReader.__init__">__init__</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.Message">Message</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.Message.data">data</a></li>
- <li class="mono"><a href="#pulsar.Message.event_timestamp">event_timestamp</a></li>
- <li class="mono"><a href="#pulsar.Message.message_id">message_id</a></li>
- <li class="mono"><a href="#pulsar.Message.partition_key">partition_key</a></li>
- <li class="mono"><a href="#pulsar.Message.properties">properties</a></li>
- <li class="mono"><a href="#pulsar.Message.publish_timestamp">publish_timestamp</a></li>
- <li class="mono"><a href="#pulsar.Message.redelivery_count">redelivery_count</a></li>
- <li class="mono"><a href="#pulsar.Message.schema_version">schema_version</a></li>
- <li class="mono"><a href="#pulsar.Message.topic_name">topic_name</a></li>
- <li class="mono"><a href="#pulsar.Message.value">value</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.MessageBatch">MessageBatch</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.MessageBatch.__init__">__init__</a></li>
- <li class="mono"><a href="#pulsar.MessageBatch.parse_from">parse_from</a></li>
- <li class="mono"><a href="#pulsar.MessageBatch.with_message_id">with_message_id</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.MessageId">MessageId</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.MessageId.deserialize">deserialize</a></li>
- <li class="mono"><a href="#pulsar.MessageId.__init__">__init__</a></li>
- <li class="mono"><a href="#pulsar.MessageId.batch_index">batch_index</a></li>
- <li class="mono"><a href="#pulsar.MessageId.entry_id">entry_id</a></li>
- <li class="mono"><a href="#pulsar.MessageId.ledger_id">ledger_id</a></li>
- <li class="mono"><a href="#pulsar.MessageId.partition">partition</a></li>
- <li class="mono"><a href="#pulsar.MessageId.serialize">serialize</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.Producer">Producer</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.Producer.close">close</a></li>
- <li class="mono"><a href="#pulsar.Producer.flush">flush</a></li>
- <li class="mono"><a href="#pulsar.Producer.last_sequence_id">last_sequence_id</a></li>
- <li class="mono"><a href="#pulsar.Producer.producer_name">producer_name</a></li>
- <li class="mono"><a href="#pulsar.Producer.send">send</a></li>
- <li class="mono"><a href="#pulsar.Producer.send_async">send_async</a></li>
- <li class="mono"><a href="#pulsar.Producer.topic">topic</a></li>
- </ul>
-
- </li>
- <li class="mono">
- <span class="class_name"><a href="#pulsar.Reader">Reader</a></span>
-
-
- <ul>
- <li class="mono"><a href="#pulsar.Reader.close">close</a></li>
- <li class="mono"><a href="#pulsar.Reader.has_message_available">has_message_available</a></li>
- <li class="mono"><a href="#pulsar.Reader.read_next">read_next</a></li>
- <li class="mono"><a href="#pulsar.Reader.seek">seek</a></li>
- <li class="mono"><a href="#pulsar.Reader.topic">topic</a></li>
- </ul>
-
- </li>
- </ul>
- </li>
-
- <li class="set"><h3><a href="#header-submodules">Sub-modules</a></h3>
- <ul>
- <li class="mono"><a href="functions/index.html">pulsar.functions</a></li>
- <li class="mono"><a href="schema/index.html">pulsar.schema</a></li>
- </ul>
- </li>
- </ul>
- </div>
-
- <article id="content">
-
-
-
-
-
-
- <header id="section-intro">
- <h1 class="title"><span class="name">pulsar</span> module</h1>
- <p>The Pulsar Python client library is based on the existing C++ client library.
-All the same features are exposed through the Python interface.</p>
-<p>Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8.</p>
-<h2>Install from PyPI</h2>
-<p>Download Python wheel binary files for MacOS and Linux
-directly from the PyPI archive.</p>
-<pre><code>#!shell
-$ sudo pip install pulsar-client
-</code></pre>
-<h2>Install from sources</h2>
-<p>Follow the instructions to compile the Pulsar C++ client library. This method
-will also build the Python binding for the library.</p>
-<p>To install the Python bindings:</p>
-<pre><code>#!shell
-$ cd pulsar-client-cpp/python
-$ sudo python setup.py install
-</code></pre>
-<h2>Examples</h2>
-<h3><a href="#pulsar.Producer">Producer</a> example</h3>
-<pre><code>#!python
-import pulsar
-
-client = pulsar.Client('pulsar://localhost:6650')
-
-producer = client.create_producer('my-topic')
-
-for i in range(10):
- producer.send(('Hello-%d' % i).encode('utf-8'))
-
-client.close()
-</code></pre>
-<h4><a href="#pulsar.Consumer">Consumer</a> Example</h4>
-<pre><code>#!python
-import pulsar
-
-client = pulsar.Client('pulsar://localhost:6650')
-consumer = client.subscribe('my-topic', 'my-subscription')
-
-while True:
- msg = consumer.receive()
- try:
- print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id())
- consumer.acknowledge(msg)
- except:
- consumer.negative_acknowledge(msg)
-
-client.close()
-</code></pre>
-<h3><a href="#pulsar.Producer.send_async">Async producer</a> example</h3>
-<pre><code>#!python
-import pulsar
-
-client = pulsar.Client('pulsar://localhost:6650')
-
-producer = client.create_producer(
- 'my-topic',
- block_if_queue_full=True,
- batching_enabled=True,
- batching_max_publish_delay_ms=10
- )
-
-def send_callback(res, msg_id):
- print('Message published res=%s', res)
-
-while True:
- producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback)
-
-client.close()
-</code></pre>
-
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar', this);">Show source ≡</a></p>
- <div id="source-pulsar" class="source">
- <pre><code>#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-The Pulsar Python client library is based on the existing C++ client library.
-All the same features are exposed through the Python interface.
-
-Currently, the supported Python versions are 2.7, 3.5, 3.6, 3.7 and 3.8.
-
-## Install from PyPI
-
-Download Python wheel binary files for MacOS and Linux
-directly from the PyPI archive.
-
- #!shell
- $ sudo pip install pulsar-client
-
-## Install from sources
-
-Follow the instructions to compile the Pulsar C++ client library. This method
-will also build the Python binding for the library.
-
-To install the Python bindings:
-
- #!shell
- $ cd pulsar-client-cpp/python
- $ sudo python setup.py install
-
-## Examples
-
-### [Producer](#pulsar.Producer) example
-
- #!python
- import pulsar
-
- client = pulsar.Client('pulsar://localhost:6650')
-
- producer = client.create_producer('my-topic')
-
- for i in range(10):
- producer.send(('Hello-%d' % i).encode('utf-8'))
-
- client.close()
-
-#### [Consumer](#pulsar.Consumer) Example
-
- #!python
- import pulsar
-
- client = pulsar.Client('pulsar://localhost:6650')
- consumer = client.subscribe('my-topic', 'my-subscription')
-
- while True:
- msg = consumer.receive()
- try:
- print("Received message '%s' id='%s'", msg.data().decode('utf-8'), msg.message_id())
- consumer.acknowledge(msg)
- except:
- consumer.negative_acknowledge(msg)
-
- client.close()
-
-### [Async producer](#pulsar.Producer.send_async) example
-
- #!python
- import pulsar
-
- client = pulsar.Client('pulsar://localhost:6650')
-
- producer = client.create_producer(
- 'my-topic',
- block_if_queue_full=True,
- batching_enabled=True,
- batching_max_publish_delay_ms=10
- )
-
- def send_callback(res, msg_id):
- print('Message published res=%s', res)
-
- while True:
- producer.send_async(('Hello-%d' % i).encode('utf-8'), send_callback)
-
- client.close()
-"""
-
-import _pulsar
-
-from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType # noqa: F401
-
-from pulsar.functions.function import Function
-from pulsar.functions.context import Context
-from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
-from pulsar import schema
-_schema = schema
-
-import re
-_retype = type(re.compile('x'))
-
-import certifi
-from datetime import timedelta
-
-
-class MessageId:
- """
- Represents a message id
- """
-
- def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
- self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
-
- 'Represents the earliest message stored in a topic'
- earliest = _pulsar.MessageId.earliest
-
- 'Represents the latest message published on a topic'
- latest = _pulsar.MessageId.latest
-
- def ledger_id(self):
- return self._msg_id.ledger_id()
-
- def entry_id(self):
- return self._msg_id.entry_id()
-
- def batch_index(self):
- return self._msg_id.batch_index()
-
- def partition(self):
- return self._msg_id.partition()
-
- def serialize(self):
- """
- Returns a bytes representation of the message id.
- This bytes sequence can be stored and later deserialized.
- """
- return self._msg_id.serialize()
-
- @staticmethod
- def deserialize(message_id_bytes):
- """
- Deserialize a message id object from a previously
- serialized bytes sequence.
- """
- return _pulsar.MessageId.deserialize(message_id_bytes)
-
-
-class Message:
- """
- Message objects are returned by a consumer, either by calling `receive` or
- through a listener.
- """
-
- def data(self):
- """
- Returns object typed bytes with the payload of the message.
- """
- return self._message.data()
-
- def value(self):
- """
- Returns object with the de-serialized version of the message content
- """
- return self._schema.decode(self._message.data())
-
- def properties(self):
- """
- Return the properties attached to the message. Properties are
- application-defined key/value pairs that will be attached to the
- message.
- """
- return self._message.properties()
-
- def partition_key(self):
- """
- Get the partitioning key for the message.
- """
- return self._message.partition_key()
-
- def publish_timestamp(self):
- """
- Get the timestamp in milliseconds with the message publish time.
- """
- return self._message.publish_timestamp()
-
- def event_timestamp(self):
- """
- Get the timestamp in milliseconds with the message event time.
- """
- return self._message.event_timestamp()
-
- def message_id(self):
- """
- The message ID that can be used to refere to this particular message.
- """
- return self._message.message_id()
-
- def topic_name(self):
- """
- Get the topic Name from which this message originated from
- """
- return self._message.topic_name()
-
- def redelivery_count(self):
- """
- Get the redelivery count for this message
- """
- return self._message.redelivery_count()
-
- def schema_version(self):
- """
- Get the schema version for this message
- """
- return self._message.schema_version()
-
- @staticmethod
- def _wrap(_message):
- self = Message()
- self._message = _message
- return self
-
-
-class MessageBatch:
-
- def __init__(self):
- self._msg_batch = _pulsar.MessageBatch()
-
- def with_message_id(self, msg_id):
- if not isinstance(msg_id, _pulsar.MessageId):
- if isinstance(msg_id, MessageId):
- msg_id = msg_id._msg_id
- else:
- raise TypeError("unknown message id type")
- self._msg_batch.with_message_id(msg_id)
- return self
-
- def parse_from(self, data, size):
- self._msg_batch.parse_from(data, size)
- _msgs = self._msg_batch.messages()
- return list(map(Message._wrap, _msgs))
-
-
-class Authentication:
- """
- Authentication provider object. Used to load authentication from an external
- shared library.
- """
- def __init__(self, dynamicLibPath, authParamsString):
- """
- Create the authentication provider instance.
-
- **Args**
-
- * `dynamicLibPath`: Path to the authentication provider shared library
- (such as `tls.so`)
- * `authParamsString`: Comma-separated list of provider-specific
- configuration params
- """
- _check_type(str, dynamicLibPath, 'dynamicLibPath')
- _check_type(str, authParamsString, 'authParamsString')
- self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
-
-
-class AuthenticationTLS(Authentication):
- """
- TLS Authentication implementation
- """
- def __init__(self, certificate_path, private_key_path):
- """
- Create the TLS authentication provider instance.
-
- **Args**
-
- * `certificatePath`: Path to the public certificate
- * `privateKeyPath`: Path to private TLS key
- """
- _check_type(str, certificate_path, 'certificate_path')
- _check_type(str, private_key_path, 'private_key_path')
- self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
-
-
-class AuthenticationToken(Authentication):
- """
- Token based authentication implementation
- """
- def __init__(self, token):
- """
- Create the token authentication provider instance.
-
- **Args**
-
- * `token`: A string containing the token or a functions that provides a
- string with the token
- """
- if not (isinstance(token, str) or callable(token)):
- raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
- self.auth = _pulsar.AuthenticationToken(token)
-
-
-class AuthenticationAthenz(Authentication):
- """
- Athenz Authentication implementation
- """
- def __init__(self, auth_params_string):
- """
- Create the Athenz authentication provider instance.
-
- **Args**
-
- * `auth_params_string`: JSON encoded configuration for Athenz client
- """
- _check_type(str, auth_params_string, 'auth_params_string')
- self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
-
-class AuthenticationOauth2(Authentication):
- """
- Oauth2 Authentication implementation
- """
- def __init__(self, auth_params_string):
- """
- Create the Oauth2 authentication provider instance.
-
- **Args**
-
- * `auth_params_string`: JSON encoded configuration for Oauth2 client
- """
- _check_type(str, auth_params_string, 'auth_params_string')
- self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
-
-class Client:
- """
- The Pulsar client. A single client instance can be used to create producers
- and consumers on multiple topics.
-
- The client will share the same connection pool and threads across all
- producers and consumers.
- """
-
- def __init__(self, service_url,
- authentication=None,
- operation_timeout_seconds=30,
- io_threads=1,
- message_listener_threads=1,
- concurrent_lookup_requests=50000,
- log_conf_file_path=None,
- use_tls=False,
- tls_trust_certs_file_path=None,
- tls_allow_insecure_connection=False,
- tls_validate_hostname=False,
- ):
- """
- Create a new Pulsar client instance.
-
- **Args**
-
- * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
-
- **Options**
-
- * `authentication`:
- Set the authentication provider to be used with the broker. For example:
- `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
- * `operation_timeout_seconds`:
- Set timeout on client operations (subscribe, create producer, close,
- unsubscribe).
- * `io_threads`:
- Set the number of IO threads to be used by the Pulsar client.
- * `message_listener_threads`:
- Set the number of threads to be used by the Pulsar client when
- delivering messages through message listener. The default is 1 thread
- per Pulsar client. If using more than 1 thread, messages for distinct
- `message_listener`s will be delivered in different threads, however a
- single `MessageListener` will always be assigned to the same thread.
- * `concurrent_lookup_requests`:
- Number of concurrent lookup-requests allowed on each broker connection
- to prevent overload on the broker.
- * `log_conf_file_path`:
- Initialize log4cxx from a configuration file.
- * `use_tls`:
- Configure whether to use TLS encryption on the connection. This setting
- is deprecated. TLS will be automatically enabled if the `serviceUrl` is
- set to `pulsar+ssl://` or `https://`
- * `tls_trust_certs_file_path`:
- Set the path to the trusted TLS certificate file. If empty defaults to
- certifi.
- * `tls_allow_insecure_connection`:
- Configure whether the Pulsar client accepts untrusted TLS certificates
- from the broker.
- * `tls_validate_hostname`:
- Configure whether the Pulsar client validates that the hostname of the
- endpoint, matches the common name on the TLS certificate presented by
- the endpoint.
- """
- _check_type(str, service_url, 'service_url')
- _check_type_or_none(Authentication, authentication, 'authentication')
- _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
- _check_type(int, io_threads, 'io_threads')
- _check_type(int, message_listener_threads, 'message_listener_threads')
- _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
- _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
- _check_type(bool, use_tls, 'use_tls')
- _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
- _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
- _check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
-
- conf = _pulsar.ClientConfiguration()
- if authentication:
- conf.authentication(authentication.auth)
- conf.operation_timeout_seconds(operation_timeout_seconds)
- conf.io_threads(io_threads)
- conf.message_listener_threads(message_listener_threads)
- conf.concurrent_lookup_requests(concurrent_lookup_requests)
- if log_conf_file_path:
- conf.log_conf_file_path(log_conf_file_path)
- if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
- conf.use_tls(True)
- if tls_trust_certs_file_path:
- conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
- else:
- conf.tls_trust_certs_file_path(certifi.where())
- conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
- conf.tls_validate_hostname(tls_validate_hostname)
- self._client = _pulsar.Client(service_url, conf)
- self._consumers = []
-
- def create_producer(self, topic,
- producer_name=None,
- schema=schema.BytesSchema(),
- initial_sequence_id=None,
- send_timeout_millis=30000,
- compression_type=CompressionType.NONE,
- max_pending_messages=1000,
- max_pending_messages_across_partitions=50000,
- block_if_queue_full=False,
- batching_enabled=False,
- batching_max_messages=1000,
- batching_max_allowed_size_in_bytes=128*1024,
- batching_max_publish_delay_ms=10,
- message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
- properties=None,
- batching_type=BatchingType.Default,
- encryption_key=None,
- crypto_key_reader=None
- ):
- """
- Create a new producer on a given topic.
-
- **Args**
-
- * `topic`:
- The topic name
-
- **Options**
-
- * `producer_name`:
- Specify a name for the producer. If not assigned,
- the system will generate a globally unique name which can be accessed
- with `Producer.producer_name()`. When specifying a name, it is app to
- the user to ensure that, for a given topic, the producer name is unique
- across all Pulsar's clusters.
- * `schema`:
- Define the schema of the data that will be published by this producer.
- The schema will be used for two purposes:
- - Validate the data format against the topic defined schema
- - Perform serialization/deserialization between data and objects
- An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
- * `initial_sequence_id`:
- Set the baseline for the sequence ids for messages
- published by the producer. First message will be using
- `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
- be assigned incremental sequence ids, if not otherwise specified.
- * `send_timeout_millis`:
- If a message is not acknowledged by the server before the
- `send_timeout` expires, an error will be reported.
- * `compression_type`:
- Set the compression type for the producer. By default, message
- payloads are not compressed. Supported compression types are
- `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
- ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
- release in order to be able to receive messages compressed with ZSTD.
- SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
- release in order to be able to receive messages compressed with SNAPPY.
- * `max_pending_messages`:
- Set the max size of the queue holding the messages pending to receive
- an acknowledgment from the broker.
- * `max_pending_messages_across_partitions`:
- Set the max size of the queue holding the messages pending to receive
- an acknowledgment across partitions from the broker.
- * `block_if_queue_full`: Set whether `send_async` operations should
- block when the outgoing message queue is full.
- * `message_routing_mode`:
- Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
- other option is `PartitionsRoutingMode.UseSinglePartition`
- * `properties`:
- Sets the properties for the producer. The properties associated with a producer
- can be used for identify a producer at broker side.
- * `batching_type`:
- Sets the batching type for the producer.
- There are two batching type: DefaultBatching and KeyBasedBatching.
- - Default batching
- incoming single messages:
- (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
- batched into single batch message:
- [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
-
- - KeyBasedBatching
- incoming single messages:
- (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
- batched into single batch message:
- [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
- * encryption_key:
- The key used for symmetric encryption, configured on the producer side
- * crypto_key_reader:
- Symmetric encryption class implementation, configuring public key encryption messages for the producer
- and private key decryption messages for the consumer
- """
- _check_type(str, topic, 'topic')
- _check_type_or_none(str, producer_name, 'producer_name')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
- _check_type(int, send_timeout_millis, 'send_timeout_millis')
- _check_type(CompressionType, compression_type, 'compression_type')
- _check_type(int, max_pending_messages, 'max_pending_messages')
- _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
- _check_type(bool, block_if_queue_full, 'block_if_queue_full')
- _check_type(bool, batching_enabled, 'batching_enabled')
- _check_type(int, batching_max_messages, 'batching_max_messages')
- _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
- _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
- _check_type_or_none(dict, properties, 'properties')
- _check_type(BatchingType, batching_type, 'batching_type')
- _check_type_or_none(str, encryption_key, 'encryption_key')
- _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
-
- conf = _pulsar.ProducerConfiguration()
- conf.send_timeout_millis(send_timeout_millis)
- conf.compression_type(compression_type)
- conf.max_pending_messages(max_pending_messages)
- conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
- conf.block_if_queue_full(block_if_queue_full)
- conf.batching_enabled(batching_enabled)
- conf.batching_max_messages(batching_max_messages)
- conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
- conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
- conf.partitions_routing_mode(message_routing_mode)
- conf.batching_type(batching_type)
- if producer_name:
- conf.producer_name(producer_name)
- if initial_sequence_id:
- conf.initial_sequence_id(initial_sequence_id)
- if properties:
- for k, v in properties.items():
- conf.property(k, v)
-
- conf.schema(schema.schema_info())
- if encryption_key:
- conf.encryption_key(encryption_key)
- if crypto_key_reader:
- conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-
- p = Producer()
- p._producer = self._client.create_producer(topic, conf)
- p._schema = schema
- return p
-
- def subscribe(self, topic, subscription_name,
- consumer_type=ConsumerType.Exclusive,
- schema=schema.BytesSchema(),
- message_listener=None,
- receiver_queue_size=1000,
- max_total_receiver_queue_size_across_partitions=50000,
- consumer_name=None,
- unacked_messages_timeout_ms=None,
- broker_consumer_stats_cache_time_ms=30000,
- negative_ack_redelivery_delay_ms=60000,
- is_read_compacted=False,
- properties=None,
- pattern_auto_discovery_period=60,
- initial_position=InitialPosition.Latest,
- crypto_key_reader=None
- ):
- """
- Subscribe to the given topic and subscription combination.
-
- **Args**
-
- * `topic`: The name of the topic, list of topics or regex pattern.
- This method will accept these forms:
- - `topic='my-topic'`
- - `topic=['topic-1', 'topic-2', 'topic-3']`
- - `topic=re.compile('persistent://public/default/topic-*')`
- * `subscription`: The name of the subscription.
-
- **Options**
-
- * `consumer_type`:
- Select the subscription type to be used when subscribing to the topic.
- * `schema`:
- Define the schema of the data that will be received by this consumer.
- * `message_listener`:
- Sets a message listener for the consumer. When the listener is set,
- the application will receive messages through it. Calls to
- `consumer.receive()` will not be allowed. The listener function needs
- to accept (consumer, message), for example:
-
- #!python
- def my_listener(consumer, message):
- # process message
- consumer.acknowledge(message)
-
- * `receiver_queue_size`:
- Sets the size of the consumer receive queue. The consumer receive
- queue controls how many messages can be accumulated by the consumer
- before the application calls `receive()`. Using a higher value could
- potentially increase the consumer throughput at the expense of higher
- memory utilization. Setting the consumer queue size to zero decreases
- the throughput of the consumer by disabling pre-fetching of messages.
- This approach improves the message distribution on shared subscription
- by pushing messages only to those consumers that are ready to process
- them. Neither receive with timeout nor partitioned topics can be used
- if the consumer queue size is zero. The `receive()` function call
- should not be interrupted when the consumer queue size is zero. The
- default value is 1000 messages and should work well for most use
- cases.
- * `max_total_receiver_queue_size_across_partitions`
- Set the max total receiver queue size across partitions.
- This setting will be used to reduce the receiver queue size for individual partitions
- * `consumer_name`:
- Sets the consumer name.
- * `unacked_messages_timeout_ms`:
- Sets the timeout in milliseconds for unacknowledged messages. The
- timeout needs to be greater than 10 seconds. An exception is thrown if
- the given value is less than 10 seconds. If a successful
- acknowledgement is not sent within the timeout, all the unacknowledged
- messages are redelivered.
- * `negative_ack_redelivery_delay_ms`:
- The delay after which to redeliver the messages that failed to be
- processed (with the `consumer.negative_acknowledge()`)
- * `broker_consumer_stats_cache_time_ms`:
- Sets the time duration for which the broker-side consumer stats will
- be cached in the client.
- * `is_read_compacted`:
- Selects whether to read the compacted version of the topic
- * `properties`:
- Sets the properties for the consumer. The properties associated with a consumer
- can be used for identify a consumer at broker side.
- * `pattern_auto_discovery_period`:
- Periods of seconds for consumer to auto discover match topics.
- * `initial_position`:
- Set the initial position of a consumer when subscribing to the topic.
- It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
- Default: `Latest`.
- * crypto_key_reader:
- Symmetric encryption class implementation, configuring public key encryption messages for the producer
- and private key decryption messages for the consumer
- """
- _check_type(str, subscription_name, 'subscription_name')
- _check_type(ConsumerType, consumer_type, 'consumer_type')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type(int, receiver_queue_size, 'receiver_queue_size')
- _check_type(int, max_total_receiver_queue_size_across_partitions,
- 'max_total_receiver_queue_size_across_partitions')
- _check_type_or_none(str, consumer_name, 'consumer_name')
- _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
- _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
- _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
- _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
- _check_type(bool, is_read_compacted, 'is_read_compacted')
- _check_type_or_none(dict, properties, 'properties')
- _check_type(InitialPosition, initial_position, 'initial_position')
- _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
-
- conf = _pulsar.ConsumerConfiguration()
- conf.consumer_type(consumer_type)
- conf.read_compacted(is_read_compacted)
- if message_listener:
- conf.message_listener(_listener_wrapper(message_listener, schema))
- conf.receiver_queue_size(receiver_queue_size)
- conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
- if consumer_name:
- conf.consumer_name(consumer_name)
- if unacked_messages_timeout_ms:
- conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-
- conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
- conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
- if properties:
- for k, v in properties.items():
- conf.property(k, v)
- conf.subscription_initial_position(initial_position)
-
- conf.schema(schema.schema_info())
-
- if crypto_key_reader:
- conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-
- c = Consumer()
- if isinstance(topic, str):
- # Single topic
- c._consumer = self._client.subscribe(topic, subscription_name, conf)
- elif isinstance(topic, list):
- # List of topics
- c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
- elif isinstance(topic, _retype):
- # Regex pattern
- c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
- else:
- raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
-
- c._client = self
- c._schema = schema
- self._consumers.append(c)
- return c
-
- def create_reader(self, topic, start_message_id,
- schema=schema.BytesSchema(),
- reader_listener=None,
- receiver_queue_size=1000,
- reader_name=None,
- subscription_role_prefix=None,
- is_read_compacted=False
- ):
- """
- Create a reader on a particular topic
-
- **Args**
-
- * `topic`: The name of the topic.
- * `start_message_id`: The initial reader positioning is done by specifying a message id.
- The options are:
- * `MessageId.earliest`: Start reading from the earliest message available in the topic
- * `MessageId.latest`: Start reading from the end topic, only getting messages published
- after the reader was created
- * `MessageId`: When passing a particular message id, the reader will position itself on
- that specific position. The first message to be read will be the message next to the
- specified messageId. Message id can be serialized into a string and deserialized
- back into a `MessageId` object:
-
- # Serialize to string
- s = msg.message_id().serialize()
-
- # Deserialize from string
- msg_id = MessageId.deserialize(s)
-
- **Options**
-
- * `schema`:
- Define the schema of the data that will be received by this reader.
- * `reader_listener`:
- Sets a message listener for the reader. When the listener is set,
- the application will receive messages through it. Calls to
- `reader.read_next()` will not be allowed. The listener function needs
- to accept (reader, message), for example:
-
- def my_listener(reader, message):
- # process message
- pass
-
- * `receiver_queue_size`:
- Sets the size of the reader receive queue. The reader receive
- queue controls how many messages can be accumulated by the reader
- before the application calls `read_next()`. Using a higher value could
- potentially increase the reader throughput at the expense of higher
- memory utilization.
- * `reader_name`:
- Sets the reader name.
- * `subscription_role_prefix`:
- Sets the subscription role prefix.
- * `is_read_compacted`:
- Selects whether to read the compacted version of the topic
- """
- _check_type(str, topic, 'topic')
- _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type(int, receiver_queue_size, 'receiver_queue_size')
- _check_type_or_none(str, reader_name, 'reader_name')
- _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
- _check_type(bool, is_read_compacted, 'is_read_compacted')
-
- conf = _pulsar.ReaderConfiguration()
- if reader_listener:
- conf.reader_listener(_listener_wrapper(reader_listener, schema))
- conf.receiver_queue_size(receiver_queue_size)
- if reader_name:
- conf.reader_name(reader_name)
- if subscription_role_prefix:
- conf.subscription_role_prefix(subscription_role_prefix)
- conf.schema(schema.schema_info())
- conf.read_compacted(is_read_compacted)
-
- c = Reader()
- c._reader = self._client.create_reader(topic, start_message_id, conf)
- c._client = self
- c._schema = schema
- self._consumers.append(c)
- return c
-
- def get_topic_partitions(self, topic):
- """
- Get the list of partitions for a given topic.
-
- If the topic is partitioned, this will return a list of partition names. If the topic is not
- partitioned, the returned list will contain the topic name itself.
-
- This can be used to discover the partitions and create Reader, Consumer or Producer
- instances directly on a particular partition.
- :param topic: the topic name to lookup
- :return: a list of partition name
- """
- _check_type(str, topic, 'topic')
- return self._client.get_topic_partitions(topic)
-
- def close(self):
- """
- Close the client and all the associated producers and consumers
- """
- self._client.close()
-
-
-class Producer:
- """
- The Pulsar message producer, used to publish messages on a topic.
- """
-
- def topic(self):
- """
- Return the topic which producer is publishing to
- """
- return self._producer.topic()
-
- def producer_name(self):
- """
- Return the producer name which could have been assigned by the
- system or specified by the client
- """
- return self._producer.producer_name()
-
- def last_sequence_id(self):
- """
- Get the last sequence id that was published by this producer.
-
- This represent either the automatically assigned or custom sequence id
- (set on the `MessageBuilder`) that was published and acknowledged by the broker.
-
- After recreating a producer with the same producer name, this will return the
- last message that was published in the previous producer session, or -1 if
- there no message was ever published.
- """
- return self._producer.last_sequence_id()
-
- def send(self, content,
- properties=None,
- partition_key=None,
- sequence_id=None,
- replication_clusters=None,
- disable_replication=False,
- event_timestamp=None,
- deliver_at=None,
- deliver_after=None,
- ):
- """
- Publish a message on the topic. Blocks until the message is acknowledged
-
- Returns a `MessageId` object that represents where the message is persisted.
-
- **Args**
-
- * `content`:
- A `bytes` object with the message payload.
-
- **Options**
-
- * `properties`:
- A dict of application-defined string properties.
- * `partition_key`:
- Sets the partition key for message routing. A hash of this key is used
- to determine the message's topic partition.
- * `sequence_id`:
- Specify a custom sequence id for the message being published.
- * `replication_clusters`:
- Override namespace replication clusters. Note that it is the caller's
- responsibility to provide valid cluster names and that all clusters
- have been previously configured as topics. Given an empty list,
- the message will replicate according to the namespace configuration.
- * `disable_replication`:
- Do not replicate this message.
- * `event_timestamp`:
- Timestamp in millis of the timestamp of event creation
- * `deliver_at`:
- Specify the this message should not be delivered earlier than the
- specified timestamp.
- The timestamp is milliseconds and based on UTC
- * `deliver_after`:
- Specify a delay in timedelta for the delivery of the messages.
-
- """
- msg = self._build_msg(content, properties, partition_key, sequence_id,
- replication_clusters, disable_replication, event_timestamp,
- deliver_at, deliver_after)
- return MessageId.deserialize(self._producer.send(msg))
-
- def send_async(self, content, callback,
- properties=None,
- partition_key=None,
- sequence_id=None,
- replication_clusters=None,
- disable_replication=False,
- event_timestamp=None,
- deliver_at=None,
- deliver_after=None,
- ):
- """
- Send a message asynchronously.
-
- The `callback` will be invoked once the message has been acknowledged
- by the broker.
-
- Example:
-
- #!python
- def callback(res, msg_id):
- print('Message published: %s' % res)
-
- producer.send_async(msg, callback)
-
- When the producer queue is full, by default the message will be rejected
- and the callback invoked with an error code.
-
- **Args**
-
- * `content`:
- A `bytes` object with the message payload.
-
- **Options**
-
- * `properties`:
- A dict of application0-defined string properties.
- * `partition_key`:
- Sets the partition key for the message routing. A hash of this key is
- used to determine the message's topic partition.
- * `sequence_id`:
- Specify a custom sequence id for the message being published.
- * `replication_clusters`: Override namespace replication clusters. Note
- that it is the caller's responsibility to provide valid cluster names
- and that all clusters have been previously configured as topics.
- Given an empty list, the message will replicate per the namespace
- configuration.
- * `disable_replication`:
- Do not replicate this message.
- * `event_timestamp`:
- Timestamp in millis of the timestamp of event creation
- * `deliver_at`:
- Specify the this message should not be delivered earlier than the
- specified timestamp.
- The timestamp is milliseconds and based on UTC
- * `deliver_after`:
- Specify a delay in timedelta for the delivery of the messages.
- """
- msg = self._build_msg(content, properties, partition_key, sequence_id,
- replication_clusters, disable_replication, event_timestamp,
- deliver_at, deliver_after)
- self._producer.send_async(msg, callback)
-
-
- def flush(self):
- """
- Flush all the messages buffered in the client and wait until all messages have been
- successfully persisted
- """
- self._producer.flush()
-
-
- def close(self):
- """
- Close the producer.
- """
- self._producer.close()
-
- def _build_msg(self, content, properties, partition_key, sequence_id,
- replication_clusters, disable_replication, event_timestamp,
- deliver_at, deliver_after):
- data = self._schema.encode(content)
-
- _check_type(bytes, data, 'data')
- _check_type_or_none(dict, properties, 'properties')
- _check_type_or_none(str, partition_key, 'partition_key')
- _check_type_or_none(int, sequence_id, 'sequence_id')
- _check_type_or_none(list, replication_clusters, 'replication_clusters')
- _check_type(bool, disable_replication, 'disable_replication')
- _check_type_or_none(int, event_timestamp, 'event_timestamp')
- _check_type_or_none(int, deliver_at, 'deliver_at')
- _check_type_or_none(timedelta, deliver_after, 'deliver_after')
-
- mb = _pulsar.MessageBuilder()
- mb.content(data)
- if properties:
- for k, v in properties.items():
- mb.property(k, v)
- if partition_key:
- mb.partition_key(partition_key)
- if sequence_id:
- mb.sequence_id(sequence_id)
- if replication_clusters:
- mb.replication_clusters(replication_clusters)
- if disable_replication:
- mb.disable_replication(disable_replication)
- if event_timestamp:
- mb.event_timestamp(event_timestamp)
- if deliver_at:
- mb.deliver_at(deliver_at)
- if deliver_after:
- mb.deliver_after(deliver_after)
-
- return mb.build()
-
-
-class Consumer:
- """
- Pulsar consumer.
- """
-
- def topic(self):
- """
- Return the topic this consumer is subscribed to.
- """
- return self._consumer.topic()
-
- def subscription_name(self):
- """
- Return the subscription name.
- """
- return self._consumer.subscription_name()
-
- def unsubscribe(self):
- """
- Unsubscribe the current consumer from the topic.
-
- This method will block until the operation is completed. Once the
- consumer is unsubscribed, no more messages will be received and
- subsequent new messages will not be retained for this consumer.
-
- This consumer object cannot be reused.
- """
- return self._consumer.unsubscribe()
-
- def receive(self, timeout_millis=None):
- """
- Receive a single message.
-
- If a message is not immediately available, this method will block until
- a new message is available.
-
- **Options**
-
- * `timeout_millis`:
- If specified, the receive will raise an exception if a message is not
- available within the timeout.
- """
- if timeout_millis is None:
- msg = self._consumer.receive()
- else:
- _check_type(int, timeout_millis, 'timeout_millis')
- msg = self._consumer.receive(timeout_millis)
-
- m = Message()
- m._message = msg
- m._schema = self._schema
- return m
-
- def acknowledge(self, message):
- """
- Acknowledge the reception of a single message.
-
- This method will block until an acknowledgement is sent to the broker.
- After that, the message will not be re-delivered to this consumer.
-
- **Args**
-
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.acknowledge(message._message)
- else:
- self._consumer.acknowledge(message)
-
- def acknowledge_cumulative(self, message):
- """
- Acknowledge the reception of all the messages in the stream up to (and
- including) the provided message.
-
- This method will block until an acknowledgement is sent to the broker.
- After that, the messages will not be re-delivered to this consumer.
-
- **Args**
-
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.acknowledge_cumulative(message._message)
- else:
- self._consumer.acknowledge_cumulative(message)
-
- def negative_acknowledge(self, message):
- """
- Acknowledge the failure to process a single message.
-
- When a message is "negatively acked" it will be marked for redelivery after
- some fixed delay. The delay is configurable when constructing the consumer
- with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
-
- This call is not blocking.
-
- **Args**
-
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.negative_acknowledge(message._message)
- else:
- self._consumer.negative_acknowledge(message)
-
- def pause_message_listener(self):
- """
- Pause receiving messages via the `message_listener` until
- `resume_message_listener()` is called.
- """
- self._consumer.pause_message_listener()
-
- def resume_message_listener(self):
- """
- Resume receiving the messages via the message listener.
- Asynchronously receive all the messages enqueued from the time
- `pause_message_listener()` was called.
- """
- self._consumer.resume_message_listener()
-
- def redeliver_unacknowledged_messages(self):
- """
- Redelivers all the unacknowledged messages. In failover mode, the
- request is ignored if the consumer is not active for the given topic. In
- shared mode, the consumer's messages to be redelivered are distributed
- across all the connected consumers. This is a non-blocking call and
- doesn't throw an exception. In case the connection breaks, the messages
- are redelivered after reconnect.
- """
- self._consumer.redeliver_unacknowledged_messages()
-
- def seek(self, messageid):
- """
- Reset the subscription associated with this consumer to a specific message id or publish timestamp.
- The message id can either be a specific message or represent the first or last messages in the topic.
- Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
- seek() on the individual partitions.
-
- **Args**
-
- * `message`:
- The message id for seek, OR an integer event time to seek to
- """
- self._consumer.seek(messageid)
-
- def close(self):
- """
- Close the consumer.
- """
- self._consumer.close()
- self._client._consumers.remove(self)
-
-
-class Reader:
- """
- Pulsar topic reader.
- """
-
- def topic(self):
- """
- Return the topic this reader is reading from.
- """
- return self._reader.topic()
-
- def read_next(self, timeout_millis=None):
- """
- Read a single message.
-
- If a message is not immediately available, this method will block until
- a new message is available.
-
- **Options**
-
- * `timeout_millis`:
- If specified, the receive will raise an exception if a message is not
- available within the timeout.
- """
- if timeout_millis is None:
- msg = self._reader.read_next()
- else:
- _check_type(int, timeout_millis, 'timeout_millis')
- msg = self._reader.read_next(timeout_millis)
-
- m = Message()
- m._message = msg
- m._schema = self._schema
- return m
-
- def has_message_available(self):
- """
- Check if there is any message available to read from the current position.
- """
- return self._reader.has_message_available();
-
- def seek(self, messageid):
- """
- Reset this reader to a specific message id or publish timestamp.
- The message id can either be a specific message or represent the first or last messages in the topic.
- Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
- seek() on the individual partitions.
-
- **Args**
-
- * `message`:
- The message id for seek, OR an integer event time to seek to
- """
- self._reader.seek(messageid)
-
- def close(self):
- """
- Close the reader.
- """
- self._reader.close()
- self._client._consumers.remove(self)
-
-class CryptoKeyReader:
- """
- Default crypto key reader implementation
- """
- def __init__(self, public_key_path, private_key_path):
- """
- Create crypto key reader.
-
- **Args**
-
- * `public_key_path`: Path to the public key
- * `private_key_path`: Path to private key
- """
- _check_type(str, public_key_path, 'public_key_path')
- _check_type(str, private_key_path, 'private_key_path')
- self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
-
-def _check_type(var_type, var, name):
- if not isinstance(var, var_type):
- raise ValueError("Argument %s is expected to be of type '%s' and not '%s'"
- % (name, var_type.__name__, type(var).__name__))
-
-
-def _check_type_or_none(var_type, var, name):
- if var is not None and not isinstance(var, var_type):
- raise ValueError("Argument %s is expected to be either None or of type '%s'"
- % (name, var_type.__name__))
-
-
-def _listener_wrapper(listener, schema):
- def wrapper(consumer, msg):
- c = Consumer()
- c._consumer = consumer
- m = Message()
- m._message = msg
- m._schema = schema
- listener(c, m)
- return wrapper
-</code></pre>
- </div>
-
- </header>
-
- <section id="section-items">
-
-
- <h2 class="section-title" id="header-classes">Classes</h2>
-
- <div class="item">
- <p id="pulsar.Authentication" class="name">class <span class="ident">Authentication</span></p>
-
-
- <div class="desc"><p>Authentication provider object. Used to load authentication from an external
-shared library.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Authentication', this);">Show source ≡</a></p>
- <div id="source-pulsar.Authentication" class="source">
- <pre><code>class Authentication:
- """
- Authentication provider object. Used to load authentication from an external
- shared library.
- """
- def __init__(self, dynamicLibPath, authParamsString):
- """
- Create the authentication provider instance.
-
- **Args**
-
- * `dynamicLibPath`: Path to the authentication provider shared library
- (such as `tls.so`)
- * `authParamsString`: Comma-separated list of provider-specific
- configuration params
- """
- _check_type(str, dynamicLibPath, 'dynamicLibPath')
- _check_type(str, authParamsString, 'authParamsString')
- self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.Authentication">Authentication</a></li>
- </ul>
- <h3>Instance variables</h3>
- <div class="item">
- <p id="pulsar.Authentication.auth" class="name">var <span class="ident">auth</span></p>
-
-
-
-
- <div class="source_cont">
-</div>
-
- </div>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.Authentication.__init__">
- <p>def <span class="ident">__init__</span>(</p><p>self, dynamicLibPath, authParamsString)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Create the authentication provider instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>dynamicLibPath</code>: Path to the authentication provider shared library
- (such as <code>tls.so</code>)</li>
-<li><code>authParamsString</code>: Comma-separated list of provider-specific
- configuration params</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Authentication.__init__', this);">Show source ≡</a></p>
- <div id="source-pulsar.Authentication.__init__" class="source">
- <pre><code>def __init__(self, dynamicLibPath, authParamsString):
- """
- Create the authentication provider instance.
- **Args**
- * `dynamicLibPath`: Path to the authentication provider shared library
- (such as `tls.so`)
- * `authParamsString`: Comma-separated list of provider-specific
- configuration params
- """
- _check_type(str, dynamicLibPath, 'dynamicLibPath')
- _check_type(str, authParamsString, 'authParamsString')
- self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.AuthenticationAthenz" class="name">class <span class="ident">AuthenticationAthenz</span></p>
-
-
- <div class="desc"><p>Athenz Authentication implementation</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationAthenz', this);">Show source ≡</a></p>
- <div id="source-pulsar.AuthenticationAthenz" class="source">
- <pre><code>class AuthenticationAthenz(Authentication):
- """
- Athenz Authentication implementation
- """
- def __init__(self, auth_params_string):
- """
- Create the Athenz authentication provider instance.
-
- **Args**
-
- * `auth_params_string`: JSON encoded configuration for Athenz client
- """
- _check_type(str, auth_params_string, 'auth_params_string')
- self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.AuthenticationAthenz">AuthenticationAthenz</a></li>
- <li><a href="#pulsar.Authentication">Authentication</a></li>
- </ul>
- <h3>Instance variables</h3>
- <div class="item">
- <p id="pulsar.AuthenticationAthenz.auth" class="name">var <span class="ident">auth</span></p>
-
- <p class="inheritance">
- <strong>Inheritance:</strong>
- <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
- </p>
-
-
-
- <div class="source_cont">
-</div>
-
- </div>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.AuthenticationAthenz.__init__">
- <p>def <span class="ident">__init__</span>(</p><p>self, auth_params_string)</p>
- </div>
-
- <p class="inheritance">
- <strong>Inheritance:</strong>
- <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
- </p>
-
-
-
- <div class="desc"><p>Create the Athenz authentication provider instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>auth_params_string</code>: JSON encoded configuration for Athenz client</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationAthenz.__init__', this);">Show source ≡</a></p>
- <div id="source-pulsar.AuthenticationAthenz.__init__" class="source">
- <pre><code>def __init__(self, auth_params_string):
- """
- Create the Athenz authentication provider instance.
- **Args**
- * `auth_params_string`: JSON encoded configuration for Athenz client
- """
- _check_type(str, auth_params_string, 'auth_params_string')
- self.auth = _pulsar.AuthenticationAthenz(auth_params_string)
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.AuthenticationOauth2" class="name">class <span class="ident">AuthenticationOauth2</span></p>
-
-
- <div class="desc"><p>Oauth2 Authentication implementation</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationOauth2', this);">Show source ≡</a></p>
- <div id="source-pulsar.AuthenticationOauth2" class="source">
- <pre><code>class AuthenticationOauth2(Authentication):
- """
- Oauth2 Authentication implementation
- """
- def __init__(self, auth_params_string):
- """
- Create the Oauth2 authentication provider instance.
-
- **Args**
-
- * `auth_params_string`: JSON encoded configuration for Oauth2 client
- """
- _check_type(str, auth_params_string, 'auth_params_string')
- self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.AuthenticationOauth2">AuthenticationOauth2</a></li>
- <li><a href="#pulsar.Authentication">Authentication</a></li>
- </ul>
- <h3>Instance variables</h3>
- <div class="item">
- <p id="pulsar.AuthenticationOauth2.auth" class="name">var <span class="ident">auth</span></p>
-
- <p class="inheritance">
- <strong>Inheritance:</strong>
- <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
- </p>
-
-
-
- <div class="source_cont">
-</div>
-
- </div>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.AuthenticationOauth2.__init__">
- <p>def <span class="ident">__init__</span>(</p><p>self, auth_params_string)</p>
- </div>
-
- <p class="inheritance">
- <strong>Inheritance:</strong>
- <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
- </p>
-
-
-
- <div class="desc"><p>Create the Oauth2 authentication provider instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>auth_params_string</code>: JSON encoded configuration for Oauth2 client</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationOauth2.__init__', this);">Show source ≡</a></p>
- <div id="source-pulsar.AuthenticationOauth2.__init__" class="source">
- <pre><code>def __init__(self, auth_params_string):
- """
- Create the Oauth2 authentication provider instance.
- **Args**
- * `auth_params_string`: JSON encoded configuration for Oauth2 client
- """
- _check_type(str, auth_params_string, 'auth_params_string')
- self.auth = _pulsar.AuthenticationOauth2(auth_params_string)
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.AuthenticationTLS" class="name">class <span class="ident">AuthenticationTLS</span></p>
-
-
- <div class="desc"><p>TLS Authentication implementation</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationTLS', this);">Show source ≡</a></p>
- <div id="source-pulsar.AuthenticationTLS" class="source">
- <pre><code>class AuthenticationTLS(Authentication):
- """
- TLS Authentication implementation
- """
- def __init__(self, certificate_path, private_key_path):
- """
- Create the TLS authentication provider instance.
-
- **Args**
-
- * `certificatePath`: Path to the public certificate
- * `privateKeyPath`: Path to private TLS key
- """
- _check_type(str, certificate_path, 'certificate_path')
- _check_type(str, private_key_path, 'private_key_path')
- self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.AuthenticationTLS">AuthenticationTLS</a></li>
- <li><a href="#pulsar.Authentication">Authentication</a></li>
- </ul>
- <h3>Instance variables</h3>
- <div class="item">
- <p id="pulsar.AuthenticationTLS.auth" class="name">var <span class="ident">auth</span></p>
-
- <p class="inheritance">
- <strong>Inheritance:</strong>
- <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
- </p>
-
-
-
- <div class="source_cont">
-</div>
-
- </div>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.AuthenticationTLS.__init__">
- <p>def <span class="ident">__init__</span>(</p><p>self, certificate_path, private_key_path)</p>
- </div>
-
- <p class="inheritance">
- <strong>Inheritance:</strong>
- <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
- </p>
-
-
-
- <div class="desc"><p>Create the TLS authentication provider instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>certificatePath</code>: Path to the public certificate</li>
-<li><code>privateKeyPath</code>: Path to private TLS key</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationTLS.__init__', this);">Show source ≡</a></p>
- <div id="source-pulsar.AuthenticationTLS.__init__" class="source">
- <pre><code>def __init__(self, certificate_path, private_key_path):
- """
- Create the TLS authentication provider instance.
- **Args**
- * `certificatePath`: Path to the public certificate
- * `privateKeyPath`: Path to private TLS key
- """
- _check_type(str, certificate_path, 'certificate_path')
- _check_type(str, private_key_path, 'private_key_path')
- self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path)
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.AuthenticationToken" class="name">class <span class="ident">AuthenticationToken</span></p>
-
-
- <div class="desc"><p>Token based authentication implementation</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationToken', this);">Show source ≡</a></p>
- <div id="source-pulsar.AuthenticationToken" class="source">
- <pre><code>class AuthenticationToken(Authentication):
- """
- Token based authentication implementation
- """
- def __init__(self, token):
- """
- Create the token authentication provider instance.
-
- **Args**
-
- * `token`: A string containing the token or a functions that provides a
- string with the token
- """
- if not (isinstance(token, str) or callable(token)):
- raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
- self.auth = _pulsar.AuthenticationToken(token)
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.AuthenticationToken">AuthenticationToken</a></li>
- <li><a href="#pulsar.Authentication">Authentication</a></li>
- </ul>
- <h3>Instance variables</h3>
- <div class="item">
- <p id="pulsar.AuthenticationToken.auth" class="name">var <span class="ident">auth</span></p>
-
- <p class="inheritance">
- <strong>Inheritance:</strong>
- <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.auth">auth</a></code>
- </p>
-
-
-
- <div class="source_cont">
-</div>
-
- </div>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.AuthenticationToken.__init__">
- <p>def <span class="ident">__init__</span>(</p><p>self, token)</p>
- </div>
-
- <p class="inheritance">
- <strong>Inheritance:</strong>
- <code><a href="#pulsar.Authentication">Authentication</a></code>.<code><a href="#pulsar.Authentication.__init__">__init__</a></code>
- </p>
-
-
-
- <div class="desc"><p>Create the token authentication provider instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>token</code>: A string containing the token or a functions that provides a
- string with the token</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.AuthenticationToken.__init__', this);">Show source ≡</a></p>
- <div id="source-pulsar.AuthenticationToken.__init__" class="source">
- <pre><code>def __init__(self, token):
- """
- Create the token authentication provider instance.
- **Args**
- * `token`: A string containing the token or a functions that provides a
- string with the token
- """
- if not (isinstance(token, str) or callable(token)):
- raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'")
- self.auth = _pulsar.AuthenticationToken(token)
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.Client" class="name">class <span class="ident">Client</span></p>
-
-
- <div class="desc"><p>The Pulsar client. A single client instance can be used to create producers
-and consumers on multiple topics.</p>
-<p>The client will share the same connection pool and threads across all
-producers and consumers.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client', this);">Show source ≡</a></p>
- <div id="source-pulsar.Client" class="source">
- <pre><code>class Client:
- """
- The Pulsar client. A single client instance can be used to create producers
- and consumers on multiple topics.
-
- The client will share the same connection pool and threads across all
- producers and consumers.
- """
-
- def __init__(self, service_url,
- authentication=None,
- operation_timeout_seconds=30,
- io_threads=1,
- message_listener_threads=1,
- concurrent_lookup_requests=50000,
- log_conf_file_path=None,
- use_tls=False,
- tls_trust_certs_file_path=None,
- tls_allow_insecure_connection=False,
- tls_validate_hostname=False,
- ):
- """
- Create a new Pulsar client instance.
-
- **Args**
-
- * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
-
- **Options**
-
- * `authentication`:
- Set the authentication provider to be used with the broker. For example:
- `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
- * `operation_timeout_seconds`:
- Set timeout on client operations (subscribe, create producer, close,
- unsubscribe).
- * `io_threads`:
- Set the number of IO threads to be used by the Pulsar client.
- * `message_listener_threads`:
- Set the number of threads to be used by the Pulsar client when
- delivering messages through message listener. The default is 1 thread
- per Pulsar client. If using more than 1 thread, messages for distinct
- `message_listener`s will be delivered in different threads, however a
- single `MessageListener` will always be assigned to the same thread.
- * `concurrent_lookup_requests`:
- Number of concurrent lookup-requests allowed on each broker connection
- to prevent overload on the broker.
- * `log_conf_file_path`:
- Initialize log4cxx from a configuration file.
- * `use_tls`:
- Configure whether to use TLS encryption on the connection. This setting
- is deprecated. TLS will be automatically enabled if the `serviceUrl` is
- set to `pulsar+ssl://` or `https://`
- * `tls_trust_certs_file_path`:
- Set the path to the trusted TLS certificate file. If empty defaults to
- certifi.
- * `tls_allow_insecure_connection`:
- Configure whether the Pulsar client accepts untrusted TLS certificates
- from the broker.
- * `tls_validate_hostname`:
- Configure whether the Pulsar client validates that the hostname of the
- endpoint, matches the common name on the TLS certificate presented by
- the endpoint.
- """
- _check_type(str, service_url, 'service_url')
- _check_type_or_none(Authentication, authentication, 'authentication')
- _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
- _check_type(int, io_threads, 'io_threads')
- _check_type(int, message_listener_threads, 'message_listener_threads')
- _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
- _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
- _check_type(bool, use_tls, 'use_tls')
- _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
- _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
- _check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
-
- conf = _pulsar.ClientConfiguration()
- if authentication:
- conf.authentication(authentication.auth)
- conf.operation_timeout_seconds(operation_timeout_seconds)
- conf.io_threads(io_threads)
- conf.message_listener_threads(message_listener_threads)
- conf.concurrent_lookup_requests(concurrent_lookup_requests)
- if log_conf_file_path:
- conf.log_conf_file_path(log_conf_file_path)
- if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
- conf.use_tls(True)
- if tls_trust_certs_file_path:
- conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
- else:
- conf.tls_trust_certs_file_path(certifi.where())
- conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
- conf.tls_validate_hostname(tls_validate_hostname)
- self._client = _pulsar.Client(service_url, conf)
- self._consumers = []
-
- def create_producer(self, topic,
- producer_name=None,
- schema=schema.BytesSchema(),
- initial_sequence_id=None,
- send_timeout_millis=30000,
- compression_type=CompressionType.NONE,
- max_pending_messages=1000,
- max_pending_messages_across_partitions=50000,
- block_if_queue_full=False,
- batching_enabled=False,
- batching_max_messages=1000,
- batching_max_allowed_size_in_bytes=128*1024,
- batching_max_publish_delay_ms=10,
- message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
- properties=None,
- batching_type=BatchingType.Default,
- encryption_key=None,
- crypto_key_reader=None
- ):
- """
- Create a new producer on a given topic.
-
- **Args**
-
- * `topic`:
- The topic name
-
- **Options**
-
- * `producer_name`:
- Specify a name for the producer. If not assigned,
- the system will generate a globally unique name which can be accessed
- with `Producer.producer_name()`. When specifying a name, it is app to
- the user to ensure that, for a given topic, the producer name is unique
- across all Pulsar's clusters.
- * `schema`:
- Define the schema of the data that will be published by this producer.
- The schema will be used for two purposes:
- - Validate the data format against the topic defined schema
- - Perform serialization/deserialization between data and objects
- An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
- * `initial_sequence_id`:
- Set the baseline for the sequence ids for messages
- published by the producer. First message will be using
- `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
- be assigned incremental sequence ids, if not otherwise specified.
- * `send_timeout_millis`:
- If a message is not acknowledged by the server before the
- `send_timeout` expires, an error will be reported.
- * `compression_type`:
- Set the compression type for the producer. By default, message
- payloads are not compressed. Supported compression types are
- `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
- ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
- release in order to be able to receive messages compressed with ZSTD.
- SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
- release in order to be able to receive messages compressed with SNAPPY.
- * `max_pending_messages`:
- Set the max size of the queue holding the messages pending to receive
- an acknowledgment from the broker.
- * `max_pending_messages_across_partitions`:
- Set the max size of the queue holding the messages pending to receive
- an acknowledgment across partitions from the broker.
- * `block_if_queue_full`: Set whether `send_async` operations should
- block when the outgoing message queue is full.
- * `message_routing_mode`:
- Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
- other option is `PartitionsRoutingMode.UseSinglePartition`
- * `properties`:
- Sets the properties for the producer. The properties associated with a producer
- can be used for identify a producer at broker side.
- * `batching_type`:
- Sets the batching type for the producer.
- There are two batching type: DefaultBatching and KeyBasedBatching.
- - Default batching
- incoming single messages:
- (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
- batched into single batch message:
- [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
-
- - KeyBasedBatching
- incoming single messages:
- (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
- batched into single batch message:
- [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
- * encryption_key:
- The key used for symmetric encryption, configured on the producer side
- * crypto_key_reader:
- Symmetric encryption class implementation, configuring public key encryption messages for the producer
- and private key decryption messages for the consumer
- """
- _check_type(str, topic, 'topic')
- _check_type_or_none(str, producer_name, 'producer_name')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
- _check_type(int, send_timeout_millis, 'send_timeout_millis')
- _check_type(CompressionType, compression_type, 'compression_type')
- _check_type(int, max_pending_messages, 'max_pending_messages')
- _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
- _check_type(bool, block_if_queue_full, 'block_if_queue_full')
- _check_type(bool, batching_enabled, 'batching_enabled')
- _check_type(int, batching_max_messages, 'batching_max_messages')
- _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
- _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
- _check_type_or_none(dict, properties, 'properties')
- _check_type(BatchingType, batching_type, 'batching_type')
- _check_type_or_none(str, encryption_key, 'encryption_key')
- _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
-
- conf = _pulsar.ProducerConfiguration()
- conf.send_timeout_millis(send_timeout_millis)
- conf.compression_type(compression_type)
- conf.max_pending_messages(max_pending_messages)
- conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
- conf.block_if_queue_full(block_if_queue_full)
- conf.batching_enabled(batching_enabled)
- conf.batching_max_messages(batching_max_messages)
- conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
- conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
- conf.partitions_routing_mode(message_routing_mode)
- conf.batching_type(batching_type)
- if producer_name:
- conf.producer_name(producer_name)
- if initial_sequence_id:
- conf.initial_sequence_id(initial_sequence_id)
- if properties:
- for k, v in properties.items():
- conf.property(k, v)
-
- conf.schema(schema.schema_info())
- if encryption_key:
- conf.encryption_key(encryption_key)
- if crypto_key_reader:
- conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-
- p = Producer()
- p._producer = self._client.create_producer(topic, conf)
- p._schema = schema
- return p
-
- def subscribe(self, topic, subscription_name,
- consumer_type=ConsumerType.Exclusive,
- schema=schema.BytesSchema(),
- message_listener=None,
- receiver_queue_size=1000,
- max_total_receiver_queue_size_across_partitions=50000,
- consumer_name=None,
- unacked_messages_timeout_ms=None,
- broker_consumer_stats_cache_time_ms=30000,
- negative_ack_redelivery_delay_ms=60000,
- is_read_compacted=False,
- properties=None,
- pattern_auto_discovery_period=60,
- initial_position=InitialPosition.Latest,
- crypto_key_reader=None
- ):
- """
- Subscribe to the given topic and subscription combination.
-
- **Args**
-
- * `topic`: The name of the topic, list of topics or regex pattern.
- This method will accept these forms:
- - `topic='my-topic'`
- - `topic=['topic-1', 'topic-2', 'topic-3']`
- - `topic=re.compile('persistent://public/default/topic-*')`
- * `subscription`: The name of the subscription.
-
- **Options**
-
- * `consumer_type`:
- Select the subscription type to be used when subscribing to the topic.
- * `schema`:
- Define the schema of the data that will be received by this consumer.
- * `message_listener`:
- Sets a message listener for the consumer. When the listener is set,
- the application will receive messages through it. Calls to
- `consumer.receive()` will not be allowed. The listener function needs
- to accept (consumer, message), for example:
-
- #!python
- def my_listener(consumer, message):
- # process message
- consumer.acknowledge(message)
-
- * `receiver_queue_size`:
- Sets the size of the consumer receive queue. The consumer receive
- queue controls how many messages can be accumulated by the consumer
- before the application calls `receive()`. Using a higher value could
- potentially increase the consumer throughput at the expense of higher
- memory utilization. Setting the consumer queue size to zero decreases
- the throughput of the consumer by disabling pre-fetching of messages.
- This approach improves the message distribution on shared subscription
- by pushing messages only to those consumers that are ready to process
- them. Neither receive with timeout nor partitioned topics can be used
- if the consumer queue size is zero. The `receive()` function call
- should not be interrupted when the consumer queue size is zero. The
- default value is 1000 messages and should work well for most use
- cases.
- * `max_total_receiver_queue_size_across_partitions`
- Set the max total receiver queue size across partitions.
- This setting will be used to reduce the receiver queue size for individual partitions
- * `consumer_name`:
- Sets the consumer name.
- * `unacked_messages_timeout_ms`:
- Sets the timeout in milliseconds for unacknowledged messages. The
- timeout needs to be greater than 10 seconds. An exception is thrown if
- the given value is less than 10 seconds. If a successful
- acknowledgement is not sent within the timeout, all the unacknowledged
- messages are redelivered.
- * `negative_ack_redelivery_delay_ms`:
- The delay after which to redeliver the messages that failed to be
- processed (with the `consumer.negative_acknowledge()`)
- * `broker_consumer_stats_cache_time_ms`:
- Sets the time duration for which the broker-side consumer stats will
- be cached in the client.
- * `is_read_compacted`:
- Selects whether to read the compacted version of the topic
- * `properties`:
- Sets the properties for the consumer. The properties associated with a consumer
- can be used for identify a consumer at broker side.
- * `pattern_auto_discovery_period`:
- Periods of seconds for consumer to auto discover match topics.
- * `initial_position`:
- Set the initial position of a consumer when subscribing to the topic.
- It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
- Default: `Latest`.
- * crypto_key_reader:
- Symmetric encryption class implementation, configuring public key encryption messages for the producer
- and private key decryption messages for the consumer
- """
- _check_type(str, subscription_name, 'subscription_name')
- _check_type(ConsumerType, consumer_type, 'consumer_type')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type(int, receiver_queue_size, 'receiver_queue_size')
- _check_type(int, max_total_receiver_queue_size_across_partitions,
- 'max_total_receiver_queue_size_across_partitions')
- _check_type_or_none(str, consumer_name, 'consumer_name')
- _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
- _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
- _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
- _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
- _check_type(bool, is_read_compacted, 'is_read_compacted')
- _check_type_or_none(dict, properties, 'properties')
- _check_type(InitialPosition, initial_position, 'initial_position')
- _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
-
- conf = _pulsar.ConsumerConfiguration()
- conf.consumer_type(consumer_type)
- conf.read_compacted(is_read_compacted)
- if message_listener:
- conf.message_listener(_listener_wrapper(message_listener, schema))
- conf.receiver_queue_size(receiver_queue_size)
- conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
- if consumer_name:
- conf.consumer_name(consumer_name)
- if unacked_messages_timeout_ms:
- conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
-
- conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
- conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
- if properties:
- for k, v in properties.items():
- conf.property(k, v)
- conf.subscription_initial_position(initial_position)
-
- conf.schema(schema.schema_info())
-
- if crypto_key_reader:
- conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
-
- c = Consumer()
- if isinstance(topic, str):
- # Single topic
- c._consumer = self._client.subscribe(topic, subscription_name, conf)
- elif isinstance(topic, list):
- # List of topics
- c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
- elif isinstance(topic, _retype):
- # Regex pattern
- c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
- else:
- raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
-
- c._client = self
- c._schema = schema
- self._consumers.append(c)
- return c
-
- def create_reader(self, topic, start_message_id,
- schema=schema.BytesSchema(),
- reader_listener=None,
- receiver_queue_size=1000,
- reader_name=None,
- subscription_role_prefix=None,
- is_read_compacted=False
- ):
- """
- Create a reader on a particular topic
-
- **Args**
-
- * `topic`: The name of the topic.
- * `start_message_id`: The initial reader positioning is done by specifying a message id.
- The options are:
- * `MessageId.earliest`: Start reading from the earliest message available in the topic
- * `MessageId.latest`: Start reading from the end topic, only getting messages published
- after the reader was created
- * `MessageId`: When passing a particular message id, the reader will position itself on
- that specific position. The first message to be read will be the message next to the
- specified messageId. Message id can be serialized into a string and deserialized
- back into a `MessageId` object:
-
- # Serialize to string
- s = msg.message_id().serialize()
-
- # Deserialize from string
- msg_id = MessageId.deserialize(s)
-
- **Options**
-
- * `schema`:
- Define the schema of the data that will be received by this reader.
- * `reader_listener`:
- Sets a message listener for the reader. When the listener is set,
- the application will receive messages through it. Calls to
- `reader.read_next()` will not be allowed. The listener function needs
- to accept (reader, message), for example:
-
- def my_listener(reader, message):
- # process message
- pass
-
- * `receiver_queue_size`:
- Sets the size of the reader receive queue. The reader receive
- queue controls how many messages can be accumulated by the reader
- before the application calls `read_next()`. Using a higher value could
- potentially increase the reader throughput at the expense of higher
- memory utilization.
- * `reader_name`:
- Sets the reader name.
- * `subscription_role_prefix`:
- Sets the subscription role prefix.
- * `is_read_compacted`:
- Selects whether to read the compacted version of the topic
- """
- _check_type(str, topic, 'topic')
- _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type(int, receiver_queue_size, 'receiver_queue_size')
- _check_type_or_none(str, reader_name, 'reader_name')
- _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
- _check_type(bool, is_read_compacted, 'is_read_compacted')
-
- conf = _pulsar.ReaderConfiguration()
- if reader_listener:
- conf.reader_listener(_listener_wrapper(reader_listener, schema))
- conf.receiver_queue_size(receiver_queue_size)
- if reader_name:
- conf.reader_name(reader_name)
- if subscription_role_prefix:
- conf.subscription_role_prefix(subscription_role_prefix)
- conf.schema(schema.schema_info())
- conf.read_compacted(is_read_compacted)
-
- c = Reader()
- c._reader = self._client.create_reader(topic, start_message_id, conf)
- c._client = self
- c._schema = schema
- self._consumers.append(c)
- return c
-
- def get_topic_partitions(self, topic):
- """
- Get the list of partitions for a given topic.
-
- If the topic is partitioned, this will return a list of partition names. If the topic is not
- partitioned, the returned list will contain the topic name itself.
-
- This can be used to discover the partitions and create Reader, Consumer or Producer
- instances directly on a particular partition.
- :param topic: the topic name to lookup
- :return: a list of partition name
- """
- _check_type(str, topic, 'topic')
- return self._client.get_topic_partitions(topic)
-
- def close(self):
- """
- Close the client and all the associated producers and consumers
- """
- self._client.close()
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.Client">Client</a></li>
- </ul>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.Client.__init__">
- <p>def <span class="ident">__init__</span>(</p><p>self, service_url, authentication=None, operation_timeout_seconds=30, io_threads=1, message_listener_threads=1, concurrent_lookup_requests=50000, log_conf_file_path=None, use_tls=False, tls_trust_certs_file_path=None, tls_allow_insecure_connection=False, tls_validate_hostname=False)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Create a new Pulsar client instance.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>service_url</code>: The Pulsar service url eg: pulsar://my-broker.com:6650/</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>authentication</code>:
- Set the authentication provider to be used with the broker. For example:
- <code>AuthenticationTls</code>, AuthenticaionToken, <code>AuthenticationAthenz</code>or <code>AuthenticationOauth2</code></li>
-<li><code>operation_timeout_seconds</code>:
- Set timeout on client operations (subscribe, create producer, close,
- unsubscribe).</li>
-<li><code>io_threads</code>:
- Set the number of IO threads to be used by the Pulsar client.</li>
-<li><code>message_listener_threads</code>:
- Set the number of threads to be used by the Pulsar client when
- delivering messages through message listener. The default is 1 thread
- per Pulsar client. If using more than 1 thread, messages for distinct
- <code>message_listener</code>s will be delivered in different threads, however a
- single <code>MessageListener</code> will always be assigned to the same thread.</li>
-<li><code>concurrent_lookup_requests</code>:
- Number of concurrent lookup-requests allowed on each broker connection
- to prevent overload on the broker.</li>
-<li><code>log_conf_file_path</code>:
- Initialize log4cxx from a configuration file.</li>
-<li><code>use_tls</code>:
- Configure whether to use TLS encryption on the connection. This setting
- is deprecated. TLS will be automatically enabled if the <code>serviceUrl</code> is
- set to <code>pulsar+ssl://</code> or <code>https://</code></li>
-<li><code>tls_trust_certs_file_path</code>:
- Set the path to the trusted TLS certificate file. If empty defaults to
- certifi.</li>
-<li><code>tls_allow_insecure_connection</code>:
- Configure whether the Pulsar client accepts untrusted TLS certificates
- from the broker.</li>
-<li><code>tls_validate_hostname</code>:
- Configure whether the Pulsar client validates that the hostname of the
- endpoint, matches the common name on the TLS certificate presented by
- the endpoint.</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.__init__', this);">Show source ≡</a></p>
- <div id="source-pulsar.Client.__init__" class="source">
- <pre><code>def __init__(self, service_url,
- authentication=None,
- operation_timeout_seconds=30,
- io_threads=1,
- message_listener_threads=1,
- concurrent_lookup_requests=50000,
- log_conf_file_path=None,
- use_tls=False,
- tls_trust_certs_file_path=None,
- tls_allow_insecure_connection=False,
- tls_validate_hostname=False,
- ):
- """
- Create a new Pulsar client instance.
- **Args**
- * `service_url`: The Pulsar service url eg: pulsar://my-broker.com:6650/
- **Options**
- * `authentication`:
- Set the authentication provider to be used with the broker. For example:
- `AuthenticationTls`, AuthenticaionToken, `AuthenticationAthenz`or `AuthenticationOauth2`
- * `operation_timeout_seconds`:
- Set timeout on client operations (subscribe, create producer, close,
- unsubscribe).
- * `io_threads`:
- Set the number of IO threads to be used by the Pulsar client.
- * `message_listener_threads`:
- Set the number of threads to be used by the Pulsar client when
- delivering messages through message listener. The default is 1 thread
- per Pulsar client. If using more than 1 thread, messages for distinct
- `message_listener`s will be delivered in different threads, however a
- single `MessageListener` will always be assigned to the same thread.
- * `concurrent_lookup_requests`:
- Number of concurrent lookup-requests allowed on each broker connection
- to prevent overload on the broker.
- * `log_conf_file_path`:
- Initialize log4cxx from a configuration file.
- * `use_tls`:
- Configure whether to use TLS encryption on the connection. This setting
- is deprecated. TLS will be automatically enabled if the `serviceUrl` is
- set to `pulsar+ssl://` or `https://`
- * `tls_trust_certs_file_path`:
- Set the path to the trusted TLS certificate file. If empty defaults to
- certifi.
- * `tls_allow_insecure_connection`:
- Configure whether the Pulsar client accepts untrusted TLS certificates
- from the broker.
- * `tls_validate_hostname`:
- Configure whether the Pulsar client validates that the hostname of the
- endpoint, matches the common name on the TLS certificate presented by
- the endpoint.
- """
- _check_type(str, service_url, 'service_url')
- _check_type_or_none(Authentication, authentication, 'authentication')
- _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
- _check_type(int, io_threads, 'io_threads')
- _check_type(int, message_listener_threads, 'message_listener_threads')
- _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
- _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
- _check_type(bool, use_tls, 'use_tls')
- _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
- _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
- _check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
- conf = _pulsar.ClientConfiguration()
- if authentication:
- conf.authentication(authentication.auth)
- conf.operation_timeout_seconds(operation_timeout_seconds)
- conf.io_threads(io_threads)
- conf.message_listener_threads(message_listener_threads)
- conf.concurrent_lookup_requests(concurrent_lookup_requests)
- if log_conf_file_path:
- conf.log_conf_file_path(log_conf_file_path)
- if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
- conf.use_tls(True)
- if tls_trust_certs_file_path:
- conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
- else:
- conf.tls_trust_certs_file_path(certifi.where())
- conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
- conf.tls_validate_hostname(tls_validate_hostname)
- self._client = _pulsar.Client(service_url, conf)
- self._consumers = []
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Client.close">
- <p>def <span class="ident">close</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Close the client and all the associated producers and consumers</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.close', this);">Show source ≡</a></p>
- <div id="source-pulsar.Client.close" class="source">
- <pre><code>def close(self):
- """
- Close the client and all the associated producers and consumers
- """
- self._client.close()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Client.create_producer">
- <p>def <span class="ident">create_producer</span>(</p><p>self, topic, producer_name=None, schema=<pulsar.schema.schema.BytesSchema object at 0x7f2e807d5810>, initial_sequence_id=None, send_timeout_millis=30000, compression_type=_pulsar.CompressionType.NONE, max_pending_messages=1000, max_pending_messages_across_partitions=50000, block_if_queue_full=False, batching_enabled=False, batching_max_messages=1000, batching_max_allowed_size_in_bytes=131072, batching_max_publish_delay_ms [...]
- </div>
-
-
-
-
- <div class="desc"><p>Create a new producer on a given topic.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>topic</code>:
- The topic name</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>producer_name</code>:
- Specify a name for the producer. If not assigned,
- the system will generate a globally unique name which can be accessed
- with <code>Producer.producer_name()</code>. When specifying a name, it is app to
- the user to ensure that, for a given topic, the producer name is unique
- across all Pulsar's clusters.</li>
-<li><code>schema</code>:
- Define the schema of the data that will be published by this producer.
- The schema will be used for two purposes:<ul>
-<li>Validate the data format against the topic defined schema</li>
-<li>Perform serialization/deserialization between data and objects
- An example for this parameter would be to pass <code>schema=JsonSchema(MyRecordClass)</code>.</li>
-</ul>
-</li>
-<li><code>initial_sequence_id</code>:
- Set the baseline for the sequence ids for messages
- published by the producer. First message will be using
- `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
- be assigned incremental sequence ids, if not otherwise specified.</li>
-<li><code>send_timeout_millis</code>:
- If a message is not acknowledged by the server before the
- <code>send_timeout</code> expires, an error will be reported.</li>
-<li><code>compression_type</code>:
- Set the compression type for the producer. By default, message
- payloads are not compressed. Supported compression types are
- <code>CompressionType.LZ4</code>, <code>CompressionType.ZLib</code>, <code>CompressionType.ZSTD</code> and <code>CompressionType.SNAPPY</code>.
- ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
- release in order to be able to receive messages compressed with ZSTD.
- SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
- release in order to be able to receive messages compressed with SNAPPY.</li>
-<li><code>max_pending_messages</code>:
- Set the max size of the queue holding the messages pending to receive
- an acknowledgment from the broker.</li>
-<li><code>max_pending_messages_across_partitions</code>:
- Set the max size of the queue holding the messages pending to receive
- an acknowledgment across partitions from the broker.</li>
-<li><code>block_if_queue_full</code>: Set whether <code>send_async</code> operations should
- block when the outgoing message queue is full.</li>
-<li><code>message_routing_mode</code>:
- Set the message routing mode for the partitioned producer. Default is <code>PartitionsRoutingMode.RoundRobinDistribution</code>,
- other option is <code>PartitionsRoutingMode.UseSinglePartition</code></li>
-<li><code>properties</code>:
- Sets the properties for the producer. The properties associated with a producer
- can be used for identify a producer at broker side.</li>
-<li>
-<p><code>batching_type</code>:
- Sets the batching type for the producer.
- There are two batching type: DefaultBatching and KeyBasedBatching.</p>
-<ul>
-<li>
-<p>Default batching
-incoming single messages:
-(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
-batched into single batch message:
-[(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]</p>
-</li>
-<li>
-<p>KeyBasedBatching
-incoming single messages:
-(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
-batched into single batch message:
-[(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]</p>
-</li>
-<li>encryption_key:
- The key used for symmetric encryption, configured on the producer side</li>
-<li>crypto_key_reader:
- Symmetric encryption class implementation, configuring public key encryption messages for the producer
- and private key decryption messages for the consumer</li>
-</ul>
-</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.create_producer', this);">Show source ≡</a></p>
- <div id="source-pulsar.Client.create_producer" class="source">
- <pre><code>def create_producer(self, topic,
- producer_name=None,
- schema=schema.BytesSchema(),
- initial_sequence_id=None,
- send_timeout_millis=30000,
- compression_type=CompressionType.NONE,
- max_pending_messages=1000,
- max_pending_messages_across_partitions=50000,
- block_if_queue_full=False,
- batching_enabled=False,
- batching_max_messages=1000,
- batching_max_allowed_size_in_bytes=128*1024,
- batching_max_publish_delay_ms=10,
- message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
- properties=None,
- batching_type=BatchingType.Default,
- encryption_key=None,
- crypto_key_reader=None
- ):
- """
- Create a new producer on a given topic.
- **Args**
- * `topic`:
- The topic name
- **Options**
- * `producer_name`:
- Specify a name for the producer. If not assigned,
- the system will generate a globally unique name which can be accessed
- with `Producer.producer_name()`. When specifying a name, it is app to
- the user to ensure that, for a given topic, the producer name is unique
- across all Pulsar's clusters.
- * `schema`:
- Define the schema of the data that will be published by this producer.
- The schema will be used for two purposes:
- - Validate the data format against the topic defined schema
- - Perform serialization/deserialization between data and objects
- An example for this parameter would be to pass `schema=JsonSchema(MyRecordClass)`.
- * `initial_sequence_id`:
- Set the baseline for the sequence ids for messages
- published by the producer. First message will be using
- `(initialSequenceId + 1)`` as its sequence id and subsequent messages will
- be assigned incremental sequence ids, if not otherwise specified.
- * `send_timeout_millis`:
- If a message is not acknowledged by the server before the
- `send_timeout` expires, an error will be reported.
- * `compression_type`:
- Set the compression type for the producer. By default, message
- payloads are not compressed. Supported compression types are
- `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
- ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
- release in order to be able to receive messages compressed with ZSTD.
- SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
- release in order to be able to receive messages compressed with SNAPPY.
- * `max_pending_messages`:
- Set the max size of the queue holding the messages pending to receive
- an acknowledgment from the broker.
- * `max_pending_messages_across_partitions`:
- Set the max size of the queue holding the messages pending to receive
- an acknowledgment across partitions from the broker.
- * `block_if_queue_full`: Set whether `send_async` operations should
- block when the outgoing message queue is full.
- * `message_routing_mode`:
- Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
- other option is `PartitionsRoutingMode.UseSinglePartition`
- * `properties`:
- Sets the properties for the producer. The properties associated with a producer
- can be used for identify a producer at broker side.
- * `batching_type`:
- Sets the batching type for the producer.
- There are two batching type: DefaultBatching and KeyBasedBatching.
- - Default batching
- incoming single messages:
- (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
- batched into single batch message:
- [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
- - KeyBasedBatching
- incoming single messages:
- (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
- batched into single batch message:
- [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
- * encryption_key:
- The key used for symmetric encryption, configured on the producer side
- * crypto_key_reader:
- Symmetric encryption class implementation, configuring public key encryption messages for the producer
- and private key decryption messages for the consumer
- """
- _check_type(str, topic, 'topic')
- _check_type_or_none(str, producer_name, 'producer_name')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
- _check_type(int, send_timeout_millis, 'send_timeout_millis')
- _check_type(CompressionType, compression_type, 'compression_type')
- _check_type(int, max_pending_messages, 'max_pending_messages')
- _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions')
- _check_type(bool, block_if_queue_full, 'block_if_queue_full')
- _check_type(bool, batching_enabled, 'batching_enabled')
- _check_type(int, batching_max_messages, 'batching_max_messages')
- _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
- _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
- _check_type_or_none(dict, properties, 'properties')
- _check_type(BatchingType, batching_type, 'batching_type')
- _check_type_or_none(str, encryption_key, 'encryption_key')
- _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
- conf = _pulsar.ProducerConfiguration()
- conf.send_timeout_millis(send_timeout_millis)
- conf.compression_type(compression_type)
- conf.max_pending_messages(max_pending_messages)
- conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
- conf.block_if_queue_full(block_if_queue_full)
- conf.batching_enabled(batching_enabled)
- conf.batching_max_messages(batching_max_messages)
- conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
- conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
- conf.partitions_routing_mode(message_routing_mode)
- conf.batching_type(batching_type)
- if producer_name:
- conf.producer_name(producer_name)
- if initial_sequence_id:
- conf.initial_sequence_id(initial_sequence_id)
- if properties:
- for k, v in properties.items():
- conf.property(k, v)
- conf.schema(schema.schema_info())
- if encryption_key:
- conf.encryption_key(encryption_key)
- if crypto_key_reader:
- conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
- p = Producer()
- p._producer = self._client.create_producer(topic, conf)
- p._schema = schema
- return p
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Client.create_reader">
- <p>def <span class="ident">create_reader</span>(</p><p>self, topic, start_message_id, schema=<pulsar.schema.schema.BytesSchema object at 0x7f2e807e5ed0>, reader_listener=None, receiver_queue_size=1000, reader_name=None, subscription_role_prefix=None, is_read_compacted=False)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Create a reader on a particular topic</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>topic</code>: The name of the topic.</li>
-<li><code>start_message_id</code>: The initial reader positioning is done by specifying a message id.
- The options are:<ul>
-<li><code>MessageId.earliest</code>: Start reading from the earliest message available in the topic</li>
-<li><code>MessageId.latest</code>: Start reading from the end topic, only getting messages published
- after the reader was created</li>
-<li>
-<p><code>MessageId</code>: When passing a particular message id, the reader will position itself on
- that specific position. The first message to be read will be the message next to the
- specified messageId. Message id can be serialized into a string and deserialized
- back into a <code>MessageId</code> object:</p>
-<p># Serialize to string
- s = msg.message_id().serialize()</p>
-<p># Deserialize from string
- msg_id = MessageId.deserialize(s)</p>
-</li>
-</ul>
-</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>schema</code>:
- Define the schema of the data that will be received by this reader.</li>
-<li>
-<p><code>reader_listener</code>:
- Sets a message listener for the reader. When the listener is set,
- the application will receive messages through it. Calls to
- <code>reader.read_next()</code> will not be allowed. The listener function needs
- to accept (reader, message), for example:</p>
-<pre><code>def my_listener(reader, message):
- # process message
- pass
-</code></pre>
-</li>
-<li>
-<p><code>receiver_queue_size</code>:
- Sets the size of the reader receive queue. The reader receive
- queue controls how many messages can be accumulated by the reader
- before the application calls <code>read_next()</code>. Using a higher value could
- potentially increase the reader throughput at the expense of higher
- memory utilization.</p>
-</li>
-<li><code>reader_name</code>:
- Sets the reader name.</li>
-<li><code>subscription_role_prefix</code>:
- Sets the subscription role prefix.</li>
-<li><code>is_read_compacted</code>:
- Selects whether to read the compacted version of the topic</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.create_reader', this);">Show source ≡</a></p>
- <div id="source-pulsar.Client.create_reader" class="source">
- <pre><code>def create_reader(self, topic, start_message_id,
- schema=schema.BytesSchema(),
- reader_listener=None,
- receiver_queue_size=1000,
- reader_name=None,
- subscription_role_prefix=None,
- is_read_compacted=False
- ):
- """
- Create a reader on a particular topic
- **Args**
- * `topic`: The name of the topic.
- * `start_message_id`: The initial reader positioning is done by specifying a message id.
- The options are:
- * `MessageId.earliest`: Start reading from the earliest message available in the topic
- * `MessageId.latest`: Start reading from the end topic, only getting messages published
- after the reader was created
- * `MessageId`: When passing a particular message id, the reader will position itself on
- that specific position. The first message to be read will be the message next to the
- specified messageId. Message id can be serialized into a string and deserialized
- back into a `MessageId` object:
- # Serialize to string
- s = msg.message_id().serialize()
- # Deserialize from string
- msg_id = MessageId.deserialize(s)
- **Options**
- * `schema`:
- Define the schema of the data that will be received by this reader.
- * `reader_listener`:
- Sets a message listener for the reader. When the listener is set,
- the application will receive messages through it. Calls to
- `reader.read_next()` will not be allowed. The listener function needs
- to accept (reader, message), for example:
- def my_listener(reader, message):
- # process message
- pass
- * `receiver_queue_size`:
- Sets the size of the reader receive queue. The reader receive
- queue controls how many messages can be accumulated by the reader
- before the application calls `read_next()`. Using a higher value could
- potentially increase the reader throughput at the expense of higher
- memory utilization.
- * `reader_name`:
- Sets the reader name.
- * `subscription_role_prefix`:
- Sets the subscription role prefix.
- * `is_read_compacted`:
- Selects whether to read the compacted version of the topic
- """
- _check_type(str, topic, 'topic')
- _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type(int, receiver_queue_size, 'receiver_queue_size')
- _check_type_or_none(str, reader_name, 'reader_name')
- _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
- _check_type(bool, is_read_compacted, 'is_read_compacted')
- conf = _pulsar.ReaderConfiguration()
- if reader_listener:
- conf.reader_listener(_listener_wrapper(reader_listener, schema))
- conf.receiver_queue_size(receiver_queue_size)
- if reader_name:
- conf.reader_name(reader_name)
- if subscription_role_prefix:
- conf.subscription_role_prefix(subscription_role_prefix)
- conf.schema(schema.schema_info())
- conf.read_compacted(is_read_compacted)
- c = Reader()
- c._reader = self._client.create_reader(topic, start_message_id, conf)
- c._client = self
- c._schema = schema
- self._consumers.append(c)
- return c
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Client.get_topic_partitions">
- <p>def <span class="ident">get_topic_partitions</span>(</p><p>self, topic)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Get the list of partitions for a given topic.</p>
-<p>If the topic is partitioned, this will return a list of partition names. If the topic is not
-partitioned, the returned list will contain the topic name itself.</p>
-<p>This can be used to discover the partitions and create Reader, Consumer or Producer
-instances directly on a particular partition.
-:param topic: the topic name to lookup
-:return: a list of partition name</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.get_topic_partitions', this);">Show source ≡</a></p>
- <div id="source-pulsar.Client.get_topic_partitions" class="source">
- <pre><code>def get_topic_partitions(self, topic):
- """
- Get the list of partitions for a given topic.
- If the topic is partitioned, this will return a list of partition names. If the topic is not
- partitioned, the returned list will contain the topic name itself.
- This can be used to discover the partitions and create Reader, Consumer or Producer
- instances directly on a particular partition.
- :param topic: the topic name to lookup
- :return: a list of partition name
- """
- _check_type(str, topic, 'topic')
- return self._client.get_topic_partitions(topic)
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Client.subscribe">
- <p>def <span class="ident">subscribe</span>(</p><p>self, topic, subscription_name, consumer_type=_pulsar.ConsumerType.Exclusive, schema=<pulsar.schema.schema.BytesSchema object at 0x7f2e807e5e50>, message_listener=None, receiver_queue_size=1000, max_total_receiver_queue_size_across_partitions=50000, consumer_name=None, unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, negative_ack_redelivery_delay_ms=60000, is_read_compacted=False, properties=None, pa [...]
- </div>
-
-
-
-
- <div class="desc"><p>Subscribe to the given topic and subscription combination.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>topic</code>: The name of the topic, list of topics or regex pattern.
- This method will accept these forms:
- - <code>topic='my-topic'</code>
- - <code>topic=['topic-1', 'topic-2', 'topic-3']</code>
- - <code>topic=re.compile('persistent://public/default/topic-*')</code></li>
-<li><code>subscription</code>: The name of the subscription.</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>consumer_type</code>:
- Select the subscription type to be used when subscribing to the topic.</li>
-<li><code>schema</code>:
- Define the schema of the data that will be received by this consumer.</li>
-<li>
-<p><code>message_listener</code>:
- Sets a message listener for the consumer. When the listener is set,
- the application will receive messages through it. Calls to
- <code>consumer.receive()</code> will not be allowed. The listener function needs
- to accept (consumer, message), for example:</p>
-<pre><code>#!python
-def my_listener(consumer, message):
- # process message
- consumer.acknowledge(message)
-</code></pre>
-</li>
-<li>
-<p><code>receiver_queue_size</code>:
- Sets the size of the consumer receive queue. The consumer receive
- queue controls how many messages can be accumulated by the consumer
- before the application calls <code>receive()</code>. Using a higher value could
- potentially increase the consumer throughput at the expense of higher
- memory utilization. Setting the consumer queue size to zero decreases
- the throughput of the consumer by disabling pre-fetching of messages.
- This approach improves the message distribution on shared subscription
- by pushing messages only to those consumers that are ready to process
- them. Neither receive with timeout nor partitioned topics can be used
- if the consumer queue size is zero. The <code>receive()</code> function call
- should not be interrupted when the consumer queue size is zero. The
- default value is 1000 messages and should work well for most use
- cases.</p>
-</li>
-<li><code>max_total_receiver_queue_size_across_partitions</code>
- Set the max total receiver queue size across partitions.
- This setting will be used to reduce the receiver queue size for individual partitions</li>
-<li><code>consumer_name</code>:
- Sets the consumer name.</li>
-<li><code>unacked_messages_timeout_ms</code>:
- Sets the timeout in milliseconds for unacknowledged messages. The
- timeout needs to be greater than 10 seconds. An exception is thrown if
- the given value is less than 10 seconds. If a successful
- acknowledgement is not sent within the timeout, all the unacknowledged
- messages are redelivered.</li>
-<li><code>negative_ack_redelivery_delay_ms</code>:
- The delay after which to redeliver the messages that failed to be
- processed (with the <code>consumer.negative_acknowledge()</code>)</li>
-<li><code>broker_consumer_stats_cache_time_ms</code>:
- Sets the time duration for which the broker-side consumer stats will
- be cached in the client.</li>
-<li><code>is_read_compacted</code>:
- Selects whether to read the compacted version of the topic</li>
-<li><code>properties</code>:
- Sets the properties for the consumer. The properties associated with a consumer
- can be used for identify a consumer at broker side.</li>
-<li><code>pattern_auto_discovery_period</code>:
- Periods of seconds for consumer to auto discover match topics.</li>
-<li><code>initial_position</code>:
- Set the initial position of a consumer when subscribing to the topic.
- It could be either: <code>InitialPosition.Earliest</code> or <code>InitialPosition.Latest</code>.
- Default: <code>Latest</code>.</li>
-<li>crypto_key_reader:
- Symmetric encryption class implementation, configuring public key encryption messages for the producer
- and private key decryption messages for the consumer</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Client.subscribe', this);">Show source ≡</a></p>
- <div id="source-pulsar.Client.subscribe" class="source">
- <pre><code>def subscribe(self, topic, subscription_name,
- consumer_type=ConsumerType.Exclusive,
- schema=schema.BytesSchema(),
- message_listener=None,
- receiver_queue_size=1000,
- max_total_receiver_queue_size_across_partitions=50000,
- consumer_name=None,
- unacked_messages_timeout_ms=None,
- broker_consumer_stats_cache_time_ms=30000,
- negative_ack_redelivery_delay_ms=60000,
- is_read_compacted=False,
- properties=None,
- pattern_auto_discovery_period=60,
- initial_position=InitialPosition.Latest,
- crypto_key_reader=None
- ):
- """
- Subscribe to the given topic and subscription combination.
- **Args**
- * `topic`: The name of the topic, list of topics or regex pattern.
- This method will accept these forms:
- - `topic='my-topic'`
- - `topic=['topic-1', 'topic-2', 'topic-3']`
- - `topic=re.compile('persistent://public/default/topic-*')`
- * `subscription`: The name of the subscription.
- **Options**
- * `consumer_type`:
- Select the subscription type to be used when subscribing to the topic.
- * `schema`:
- Define the schema of the data that will be received by this consumer.
- * `message_listener`:
- Sets a message listener for the consumer. When the listener is set,
- the application will receive messages through it. Calls to
- `consumer.receive()` will not be allowed. The listener function needs
- to accept (consumer, message), for example:
- #!python
- def my_listener(consumer, message):
- # process message
- consumer.acknowledge(message)
- * `receiver_queue_size`:
- Sets the size of the consumer receive queue. The consumer receive
- queue controls how many messages can be accumulated by the consumer
- before the application calls `receive()`. Using a higher value could
- potentially increase the consumer throughput at the expense of higher
- memory utilization. Setting the consumer queue size to zero decreases
- the throughput of the consumer by disabling pre-fetching of messages.
- This approach improves the message distribution on shared subscription
- by pushing messages only to those consumers that are ready to process
- them. Neither receive with timeout nor partitioned topics can be used
- if the consumer queue size is zero. The `receive()` function call
- should not be interrupted when the consumer queue size is zero. The
- default value is 1000 messages and should work well for most use
- cases.
- * `max_total_receiver_queue_size_across_partitions`
- Set the max total receiver queue size across partitions.
- This setting will be used to reduce the receiver queue size for individual partitions
- * `consumer_name`:
- Sets the consumer name.
- * `unacked_messages_timeout_ms`:
- Sets the timeout in milliseconds for unacknowledged messages. The
- timeout needs to be greater than 10 seconds. An exception is thrown if
- the given value is less than 10 seconds. If a successful
- acknowledgement is not sent within the timeout, all the unacknowledged
- messages are redelivered.
- * `negative_ack_redelivery_delay_ms`:
- The delay after which to redeliver the messages that failed to be
- processed (with the `consumer.negative_acknowledge()`)
- * `broker_consumer_stats_cache_time_ms`:
- Sets the time duration for which the broker-side consumer stats will
- be cached in the client.
- * `is_read_compacted`:
- Selects whether to read the compacted version of the topic
- * `properties`:
- Sets the properties for the consumer. The properties associated with a consumer
- can be used for identify a consumer at broker side.
- * `pattern_auto_discovery_period`:
- Periods of seconds for consumer to auto discover match topics.
- * `initial_position`:
- Set the initial position of a consumer when subscribing to the topic.
- It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
- Default: `Latest`.
- * crypto_key_reader:
- Symmetric encryption class implementation, configuring public key encryption messages for the producer
- and private key decryption messages for the consumer
- """
- _check_type(str, subscription_name, 'subscription_name')
- _check_type(ConsumerType, consumer_type, 'consumer_type')
- _check_type(_schema.Schema, schema, 'schema')
- _check_type(int, receiver_queue_size, 'receiver_queue_size')
- _check_type(int, max_total_receiver_queue_size_across_partitions,
- 'max_total_receiver_queue_size_across_partitions')
- _check_type_or_none(str, consumer_name, 'consumer_name')
- _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
- _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
- _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms')
- _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
- _check_type(bool, is_read_compacted, 'is_read_compacted')
- _check_type_or_none(dict, properties, 'properties')
- _check_type(InitialPosition, initial_position, 'initial_position')
- _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
- conf = _pulsar.ConsumerConfiguration()
- conf.consumer_type(consumer_type)
- conf.read_compacted(is_read_compacted)
- if message_listener:
- conf.message_listener(_listener_wrapper(message_listener, schema))
- conf.receiver_queue_size(receiver_queue_size)
- conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
- if consumer_name:
- conf.consumer_name(consumer_name)
- if unacked_messages_timeout_ms:
- conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
- conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
- conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
- if properties:
- for k, v in properties.items():
- conf.property(k, v)
- conf.subscription_initial_position(initial_position)
- conf.schema(schema.schema_info())
- if crypto_key_reader:
- conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
- c = Consumer()
- if isinstance(topic, str):
- # Single topic
- c._consumer = self._client.subscribe(topic, subscription_name, conf)
- elif isinstance(topic, list):
- # List of topics
- c._consumer = self._client.subscribe_topics(topic, subscription_name, conf)
- elif isinstance(topic, _retype):
- # Regex pattern
- c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf)
- else:
- raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
- c._client = self
- c._schema = schema
- self._consumers.append(c)
- return c
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.Consumer" class="name">class <span class="ident">Consumer</span></p>
-
-
- <div class="desc"><p>Pulsar consumer.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer" class="source">
- <pre><code>class Consumer:
- """
- Pulsar consumer.
- """
-
- def topic(self):
- """
- Return the topic this consumer is subscribed to.
- """
- return self._consumer.topic()
-
- def subscription_name(self):
- """
- Return the subscription name.
- """
- return self._consumer.subscription_name()
-
- def unsubscribe(self):
- """
- Unsubscribe the current consumer from the topic.
-
- This method will block until the operation is completed. Once the
- consumer is unsubscribed, no more messages will be received and
- subsequent new messages will not be retained for this consumer.
-
- This consumer object cannot be reused.
- """
- return self._consumer.unsubscribe()
-
- def receive(self, timeout_millis=None):
- """
- Receive a single message.
-
- If a message is not immediately available, this method will block until
- a new message is available.
-
- **Options**
-
- * `timeout_millis`:
- If specified, the receive will raise an exception if a message is not
- available within the timeout.
- """
- if timeout_millis is None:
- msg = self._consumer.receive()
- else:
- _check_type(int, timeout_millis, 'timeout_millis')
- msg = self._consumer.receive(timeout_millis)
-
- m = Message()
- m._message = msg
- m._schema = self._schema
- return m
-
- def acknowledge(self, message):
- """
- Acknowledge the reception of a single message.
-
- This method will block until an acknowledgement is sent to the broker.
- After that, the message will not be re-delivered to this consumer.
-
- **Args**
-
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.acknowledge(message._message)
- else:
- self._consumer.acknowledge(message)
-
- def acknowledge_cumulative(self, message):
- """
- Acknowledge the reception of all the messages in the stream up to (and
- including) the provided message.
-
- This method will block until an acknowledgement is sent to the broker.
- After that, the messages will not be re-delivered to this consumer.
-
- **Args**
-
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.acknowledge_cumulative(message._message)
- else:
- self._consumer.acknowledge_cumulative(message)
-
- def negative_acknowledge(self, message):
- """
- Acknowledge the failure to process a single message.
-
- When a message is "negatively acked" it will be marked for redelivery after
- some fixed delay. The delay is configurable when constructing the consumer
- with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
-
- This call is not blocking.
-
- **Args**
-
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.negative_acknowledge(message._message)
- else:
- self._consumer.negative_acknowledge(message)
-
- def pause_message_listener(self):
- """
- Pause receiving messages via the `message_listener` until
- `resume_message_listener()` is called.
- """
- self._consumer.pause_message_listener()
-
- def resume_message_listener(self):
- """
- Resume receiving the messages via the message listener.
- Asynchronously receive all the messages enqueued from the time
- `pause_message_listener()` was called.
- """
- self._consumer.resume_message_listener()
-
- def redeliver_unacknowledged_messages(self):
- """
- Redelivers all the unacknowledged messages. In failover mode, the
- request is ignored if the consumer is not active for the given topic. In
- shared mode, the consumer's messages to be redelivered are distributed
- across all the connected consumers. This is a non-blocking call and
- doesn't throw an exception. In case the connection breaks, the messages
- are redelivered after reconnect.
- """
- self._consumer.redeliver_unacknowledged_messages()
-
- def seek(self, messageid):
- """
- Reset the subscription associated with this consumer to a specific message id or publish timestamp.
- The message id can either be a specific message or represent the first or last messages in the topic.
- Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
- seek() on the individual partitions.
-
- **Args**
-
- * `message`:
- The message id for seek, OR an integer event time to seek to
- """
- self._consumer.seek(messageid)
-
- def close(self):
- """
- Close the consumer.
- """
- self._consumer.close()
- self._client._consumers.remove(self)
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.Consumer">Consumer</a></li>
- </ul>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.acknowledge">
- <p>def <span class="ident">acknowledge</span>(</p><p>self, message)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Acknowledge the reception of a single message.</p>
-<p>This method will block until an acknowledgement is sent to the broker.
-After that, the message will not be re-delivered to this consumer.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>message</code>:
- The received message or message id.</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.acknowledge', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.acknowledge" class="source">
- <pre><code>def acknowledge(self, message):
- """
- Acknowledge the reception of a single message.
- This method will block until an acknowledgement is sent to the broker.
- After that, the message will not be re-delivered to this consumer.
- **Args**
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.acknowledge(message._message)
- else:
- self._consumer.acknowledge(message)
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.acknowledge_cumulative">
- <p>def <span class="ident">acknowledge_cumulative</span>(</p><p>self, message)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Acknowledge the reception of all the messages in the stream up to (and
-including) the provided message.</p>
-<p>This method will block until an acknowledgement is sent to the broker.
-After that, the messages will not be re-delivered to this consumer.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>message</code>:
- The received message or message id.</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.acknowledge_cumulative', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.acknowledge_cumulative" class="source">
- <pre><code>def acknowledge_cumulative(self, message):
- """
- Acknowledge the reception of all the messages in the stream up to (and
- including) the provided message.
- This method will block until an acknowledgement is sent to the broker.
- After that, the messages will not be re-delivered to this consumer.
- **Args**
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.acknowledge_cumulative(message._message)
- else:
- self._consumer.acknowledge_cumulative(message)
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.close">
- <p>def <span class="ident">close</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Close the consumer.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.close', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.close" class="source">
- <pre><code>def close(self):
- """
- Close the consumer.
- """
- self._consumer.close()
- self._client._consumers.remove(self)
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.negative_acknowledge">
- <p>def <span class="ident">negative_acknowledge</span>(</p><p>self, message)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Acknowledge the failure to process a single message.</p>
-<p>When a message is "negatively acked" it will be marked for redelivery after
-some fixed delay. The delay is configurable when constructing the consumer
-with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.</p>
-<p>This call is not blocking.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>message</code>:
- The received message or message id.</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.negative_acknowledge', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.negative_acknowledge" class="source">
- <pre><code>def negative_acknowledge(self, message):
- """
- Acknowledge the failure to process a single message.
- When a message is "negatively acked" it will be marked for redelivery after
- some fixed delay. The delay is configurable when constructing the consumer
- with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
- This call is not blocking.
- **Args**
- * `message`:
- The received message or message id.
- """
- if isinstance(message, Message):
- self._consumer.negative_acknowledge(message._message)
- else:
- self._consumer.negative_acknowledge(message)
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.pause_message_listener">
- <p>def <span class="ident">pause_message_listener</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Pause receiving messages via the <code>message_listener</code> until
-<code>resume_message_listener()</code> is called.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.pause_message_listener', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.pause_message_listener" class="source">
- <pre><code>def pause_message_listener(self):
- """
- Pause receiving messages via the `message_listener` until
- `resume_message_listener()` is called.
- """
- self._consumer.pause_message_listener()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.receive">
- <p>def <span class="ident">receive</span>(</p><p>self, timeout_millis=None)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Receive a single message.</p>
-<p>If a message is not immediately available, this method will block until
-a new message is available.</p>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>timeout_millis</code>:
- If specified, the receive will raise an exception if a message is not
- available within the timeout.</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.receive', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.receive" class="source">
- <pre><code>def receive(self, timeout_millis=None):
- """
- Receive a single message.
- If a message is not immediately available, this method will block until
- a new message is available.
- **Options**
- * `timeout_millis`:
- If specified, the receive will raise an exception if a message is not
- available within the timeout.
- """
- if timeout_millis is None:
- msg = self._consumer.receive()
- else:
- _check_type(int, timeout_millis, 'timeout_millis')
- msg = self._consumer.receive(timeout_millis)
- m = Message()
- m._message = msg
- m._schema = self._schema
- return m
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.redeliver_unacknowledged_messages">
- <p>def <span class="ident">redeliver_unacknowledged_messages</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Redelivers all the unacknowledged messages. In failover mode, the
-request is ignored if the consumer is not active for the given topic. In
-shared mode, the consumer's messages to be redelivered are distributed
-across all the connected consumers. This is a non-blocking call and
-doesn't throw an exception. In case the connection breaks, the messages
-are redelivered after reconnect.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.redeliver_unacknowledged_messages', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.redeliver_unacknowledged_messages" class="source">
- <pre><code>def redeliver_unacknowledged_messages(self):
- """
- Redelivers all the unacknowledged messages. In failover mode, the
- request is ignored if the consumer is not active for the given topic. In
- shared mode, the consumer's messages to be redelivered are distributed
- across all the connected consumers. This is a non-blocking call and
- doesn't throw an exception. In case the connection breaks, the messages
- are redelivered after reconnect.
- """
- self._consumer.redeliver_unacknowledged_messages()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.resume_message_listener">
- <p>def <span class="ident">resume_message_listener</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Resume receiving the messages via the message listener.
-Asynchronously receive all the messages enqueued from the time
-<code>pause_message_listener()</code> was called.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.resume_message_listener', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.resume_message_listener" class="source">
- <pre><code>def resume_message_listener(self):
- """
- Resume receiving the messages via the message listener.
- Asynchronously receive all the messages enqueued from the time
- `pause_message_listener()` was called.
- """
- self._consumer.resume_message_listener()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.seek">
- <p>def <span class="ident">seek</span>(</p><p>self, messageid)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Reset the subscription associated with this consumer to a specific message id or publish timestamp.
-The message id can either be a specific message or represent the first or last messages in the topic.
-Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
-seek() on the individual partitions.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>message</code>:
- The message id for seek, OR an integer event time to seek to</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.seek', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.seek" class="source">
- <pre><code>def seek(self, messageid):
- """
- Reset the subscription associated with this consumer to a specific message id or publish timestamp.
- The message id can either be a specific message or represent the first or last messages in the topic.
- Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
- seek() on the individual partitions.
- **Args**
- * `message`:
- The message id for seek, OR an integer event time to seek to
- """
- self._consumer.seek(messageid)
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.subscription_name">
- <p>def <span class="ident">subscription_name</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Return the subscription name.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.subscription_name', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.subscription_name" class="source">
- <pre><code>def subscription_name(self):
- """
- Return the subscription name.
- """
- return self._consumer.subscription_name()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.topic">
- <p>def <span class="ident">topic</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Return the topic this consumer is subscribed to.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.topic', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.topic" class="source">
- <pre><code>def topic(self):
- """
- Return the topic this consumer is subscribed to.
- """
- return self._consumer.topic()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Consumer.unsubscribe">
- <p>def <span class="ident">unsubscribe</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Unsubscribe the current consumer from the topic.</p>
-<p>This method will block until the operation is completed. Once the
-consumer is unsubscribed, no more messages will be received and
-subsequent new messages will not be retained for this consumer.</p>
-<p>This consumer object cannot be reused.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Consumer.unsubscribe', this);">Show source ≡</a></p>
- <div id="source-pulsar.Consumer.unsubscribe" class="source">
- <pre><code>def unsubscribe(self):
- """
- Unsubscribe the current consumer from the topic.
- This method will block until the operation is completed. Once the
- consumer is unsubscribed, no more messages will be received and
- subsequent new messages will not be retained for this consumer.
- This consumer object cannot be reused.
- """
- return self._consumer.unsubscribe()
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.CryptoKeyReader" class="name">class <span class="ident">CryptoKeyReader</span></p>
-
-
- <div class="desc"><p>Default crypto key reader implementation</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.CryptoKeyReader', this);">Show source ≡</a></p>
- <div id="source-pulsar.CryptoKeyReader" class="source">
- <pre><code>class CryptoKeyReader:
- """
- Default crypto key reader implementation
- """
- def __init__(self, public_key_path, private_key_path):
- """
- Create crypto key reader.
-
- **Args**
-
- * `public_key_path`: Path to the public key
- * `private_key_path`: Path to private key
- """
- _check_type(str, public_key_path, 'public_key_path')
- _check_type(str, private_key_path, 'private_key_path')
- self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.CryptoKeyReader">CryptoKeyReader</a></li>
- </ul>
- <h3>Instance variables</h3>
- <div class="item">
- <p id="pulsar.CryptoKeyReader.cryptoKeyReader" class="name">var <span class="ident">cryptoKeyReader</span></p>
-
-
-
-
- <div class="source_cont">
-</div>
-
- </div>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.CryptoKeyReader.__init__">
- <p>def <span class="ident">__init__</span>(</p><p>self, public_key_path, private_key_path)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Create crypto key reader.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>public_key_path</code>: Path to the public key</li>
-<li><code>private_key_path</code>: Path to private key</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.CryptoKeyReader.__init__', this);">Show source ≡</a></p>
- <div id="source-pulsar.CryptoKeyReader.__init__" class="source">
- <pre><code>def __init__(self, public_key_path, private_key_path):
- """
- Create crypto key reader.
- **Args**
- * `public_key_path`: Path to the public key
- * `private_key_path`: Path to private key
- """
- _check_type(str, public_key_path, 'public_key_path')
- _check_type(str, private_key_path, 'private_key_path')
- self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path)
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.Message" class="name">class <span class="ident">Message</span></p>
-
-
- <div class="desc"><p>Message objects are returned by a consumer, either by calling <code>receive</code> or
-through a listener.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message', this);">Show source ≡</a></p>
- <div id="source-pulsar.Message" class="source">
- <pre><code>class Message:
- """
- Message objects are returned by a consumer, either by calling `receive` or
- through a listener.
- """
-
- def data(self):
- """
- Returns object typed bytes with the payload of the message.
- """
- return self._message.data()
-
- def value(self):
- """
- Returns object with the de-serialized version of the message content
- """
- return self._schema.decode(self._message.data())
-
- def properties(self):
- """
- Return the properties attached to the message. Properties are
- application-defined key/value pairs that will be attached to the
- message.
- """
- return self._message.properties()
-
- def partition_key(self):
- """
- Get the partitioning key for the message.
- """
- return self._message.partition_key()
-
- def publish_timestamp(self):
- """
- Get the timestamp in milliseconds with the message publish time.
- """
- return self._message.publish_timestamp()
-
- def event_timestamp(self):
- """
- Get the timestamp in milliseconds with the message event time.
- """
- return self._message.event_timestamp()
-
- def message_id(self):
- """
- The message ID that can be used to refere to this particular message.
- """
- return self._message.message_id()
-
- def topic_name(self):
- """
- Get the topic Name from which this message originated from
- """
- return self._message.topic_name()
-
- def redelivery_count(self):
- """
- Get the redelivery count for this message
- """
- return self._message.redelivery_count()
-
- def schema_version(self):
- """
- Get the schema version for this message
- """
- return self._message.schema_version()
-
- @staticmethod
- def _wrap(_message):
- self = Message()
- self._message = _message
- return self
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.Message">Message</a></li>
- </ul>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.Message.data">
- <p>def <span class="ident">data</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Returns object typed bytes with the payload of the message.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.data', this);">Show source ≡</a></p>
- <div id="source-pulsar.Message.data" class="source">
- <pre><code>def data(self):
- """
- Returns object typed bytes with the payload of the message.
- """
- return self._message.data()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Message.event_timestamp">
- <p>def <span class="ident">event_timestamp</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Get the timestamp in milliseconds with the message event time.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.event_timestamp', this);">Show source ≡</a></p>
- <div id="source-pulsar.Message.event_timestamp" class="source">
- <pre><code>def event_timestamp(self):
- """
- Get the timestamp in milliseconds with the message event time.
- """
- return self._message.event_timestamp()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Message.message_id">
- <p>def <span class="ident">message_id</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>The message ID that can be used to refere to this particular message.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.message_id', this);">Show source ≡</a></p>
- <div id="source-pulsar.Message.message_id" class="source">
- <pre><code>def message_id(self):
- """
- The message ID that can be used to refere to this particular message.
- """
- return self._message.message_id()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Message.partition_key">
- <p>def <span class="ident">partition_key</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Get the partitioning key for the message.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.partition_key', this);">Show source ≡</a></p>
- <div id="source-pulsar.Message.partition_key" class="source">
- <pre><code>def partition_key(self):
- """
- Get the partitioning key for the message.
- """
- return self._message.partition_key()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Message.properties">
- <p>def <span class="ident">properties</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Return the properties attached to the message. Properties are
-application-defined key/value pairs that will be attached to the
-message.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.properties', this);">Show source ≡</a></p>
- <div id="source-pulsar.Message.properties" class="source">
- <pre><code>def properties(self):
- """
- Return the properties attached to the message. Properties are
- application-defined key/value pairs that will be attached to the
- message.
- """
- return self._message.properties()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Message.publish_timestamp">
- <p>def <span class="ident">publish_timestamp</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Get the timestamp in milliseconds with the message publish time.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.publish_timestamp', this);">Show source ≡</a></p>
- <div id="source-pulsar.Message.publish_timestamp" class="source">
- <pre><code>def publish_timestamp(self):
- """
- Get the timestamp in milliseconds with the message publish time.
- """
- return self._message.publish_timestamp()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Message.redelivery_count">
- <p>def <span class="ident">redelivery_count</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Get the redelivery count for this message</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.redelivery_count', this);">Show source ≡</a></p>
- <div id="source-pulsar.Message.redelivery_count" class="source">
- <pre><code>def redelivery_count(self):
- """
- Get the redelivery count for this message
- """
- return self._message.redelivery_count()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Message.schema_version">
- <p>def <span class="ident">schema_version</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Get the schema version for this message</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.schema_version', this);">Show source ≡</a></p>
- <div id="source-pulsar.Message.schema_version" class="source">
- <pre><code>def schema_version(self):
- """
- Get the schema version for this message
- """
- return self._message.schema_version()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Message.topic_name">
- <p>def <span class="ident">topic_name</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Get the topic Name from which this message originated from</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.topic_name', this);">Show source ≡</a></p>
- <div id="source-pulsar.Message.topic_name" class="source">
- <pre><code>def topic_name(self):
- """
- Get the topic Name from which this message originated from
- """
- return self._message.topic_name()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Message.value">
- <p>def <span class="ident">value</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Returns object with the de-serialized version of the message content</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Message.value', this);">Show source ≡</a></p>
- <div id="source-pulsar.Message.value" class="source">
- <pre><code>def value(self):
- """
- Returns object with the de-serialized version of the message content
- """
- return self._schema.decode(self._message.data())
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.MessageBatch" class="name">class <span class="ident">MessageBatch</span></p>
-
-
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageBatch" class="source">
- <pre><code>class MessageBatch:
-
- def __init__(self):
- self._msg_batch = _pulsar.MessageBatch()
-
- def with_message_id(self, msg_id):
- if not isinstance(msg_id, _pulsar.MessageId):
- if isinstance(msg_id, MessageId):
- msg_id = msg_id._msg_id
- else:
- raise TypeError("unknown message id type")
- self._msg_batch.with_message_id(msg_id)
- return self
-
- def parse_from(self, data, size):
- self._msg_batch.parse_from(data, size)
- _msgs = self._msg_batch.messages()
- return list(map(Message._wrap, _msgs))
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.MessageBatch">MessageBatch</a></li>
- </ul>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.MessageBatch.__init__">
- <p>def <span class="ident">__init__</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.__init__', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageBatch.__init__" class="source">
- <pre><code>def __init__(self):
- self._msg_batch = _pulsar.MessageBatch()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.MessageBatch.parse_from">
- <p>def <span class="ident">parse_from</span>(</p><p>self, data, size)</p>
- </div>
-
-
-
-
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.parse_from', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageBatch.parse_from" class="source">
- <pre><code>def parse_from(self, data, size):
- self._msg_batch.parse_from(data, size)
- _msgs = self._msg_batch.messages()
- return list(map(Message._wrap, _msgs))
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.MessageBatch.with_message_id">
- <p>def <span class="ident">with_message_id</span>(</p><p>self, msg_id)</p>
- </div>
-
-
-
-
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageBatch.with_message_id', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageBatch.with_message_id" class="source">
- <pre><code>def with_message_id(self, msg_id):
- if not isinstance(msg_id, _pulsar.MessageId):
- if isinstance(msg_id, MessageId):
- msg_id = msg_id._msg_id
- else:
- raise TypeError("unknown message id type")
- self._msg_batch.with_message_id(msg_id)
- return self
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.MessageId" class="name">class <span class="ident">MessageId</span></p>
-
-
- <div class="desc"><p>Represents a message id</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageId" class="source">
- <pre><code>class MessageId:
- """
- Represents a message id
- """
-
- def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
- self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
-
- 'Represents the earliest message stored in a topic'
- earliest = _pulsar.MessageId.earliest
-
- 'Represents the latest message published on a topic'
- latest = _pulsar.MessageId.latest
-
- def ledger_id(self):
- return self._msg_id.ledger_id()
-
- def entry_id(self):
- return self._msg_id.entry_id()
-
- def batch_index(self):
- return self._msg_id.batch_index()
-
- def partition(self):
- return self._msg_id.partition()
-
- def serialize(self):
- """
- Returns a bytes representation of the message id.
- This bytes sequence can be stored and later deserialized.
- """
- return self._msg_id.serialize()
-
- @staticmethod
- def deserialize(message_id_bytes):
- """
- Deserialize a message id object from a previously
- serialized bytes sequence.
- """
- return _pulsar.MessageId.deserialize(message_id_bytes)
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.MessageId">MessageId</a></li>
- </ul>
- <h3>Class variables</h3>
- <div class="item">
- <p id="pulsar.MessageId.earliest" class="name">var <span class="ident">earliest</span></p>
-
-
-
-
- <div class="desc"><p>Represents the latest message published on a topic</p></div>
- <div class="source_cont">
-</div>
-
- </div>
- <div class="item">
- <p id="pulsar.MessageId.latest" class="name">var <span class="ident">latest</span></p>
-
-
-
-
- <div class="source_cont">
-</div>
-
- </div>
- <h3>Static methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.MessageId.deserialize">
- <p>def <span class="ident">deserialize</span>(</p><p>message_id_bytes)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Deserialize a message id object from a previously
-serialized bytes sequence.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.deserialize', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageId.deserialize" class="source">
- <pre><code>@staticmethod
-def deserialize(message_id_bytes):
- """
- Deserialize a message id object from a previously
- serialized bytes sequence.
- """
- return _pulsar.MessageId.deserialize(message_id_bytes)
-</code></pre>
- </div>
-</div>
-
- </div>
-
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.MessageId.__init__">
- <p>def <span class="ident">__init__</span>(</p><p>self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1)</p>
- </div>
-
-
-
-
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.__init__', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageId.__init__" class="source">
- <pre><code>def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
- self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.MessageId.batch_index">
- <p>def <span class="ident">batch_index</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.batch_index', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageId.batch_index" class="source">
- <pre><code>def batch_index(self):
- return self._msg_id.batch_index()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.MessageId.entry_id">
- <p>def <span class="ident">entry_id</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.entry_id', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageId.entry_id" class="source">
- <pre><code>def entry_id(self):
- return self._msg_id.entry_id()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.MessageId.ledger_id">
- <p>def <span class="ident">ledger_id</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.ledger_id', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageId.ledger_id" class="source">
- <pre><code>def ledger_id(self):
- return self._msg_id.ledger_id()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.MessageId.partition">
- <p>def <span class="ident">partition</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.partition', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageId.partition" class="source">
- <pre><code>def partition(self):
- return self._msg_id.partition()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.MessageId.serialize">
- <p>def <span class="ident">serialize</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Returns a bytes representation of the message id.
-This bytes sequence can be stored and later deserialized.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.MessageId.serialize', this);">Show source ≡</a></p>
- <div id="source-pulsar.MessageId.serialize" class="source">
- <pre><code>def serialize(self):
- """
- Returns a bytes representation of the message id.
- This bytes sequence can be stored and later deserialized.
- """
- return self._msg_id.serialize()
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.Producer" class="name">class <span class="ident">Producer</span></p>
-
-
- <div class="desc"><p>The Pulsar message producer, used to publish messages on a topic.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer', this);">Show source ≡</a></p>
- <div id="source-pulsar.Producer" class="source">
- <pre><code>class Producer:
- """
- The Pulsar message producer, used to publish messages on a topic.
- """
-
- def topic(self):
- """
- Return the topic which producer is publishing to
- """
- return self._producer.topic()
-
- def producer_name(self):
- """
- Return the producer name which could have been assigned by the
- system or specified by the client
- """
- return self._producer.producer_name()
-
- def last_sequence_id(self):
- """
- Get the last sequence id that was published by this producer.
-
- This represent either the automatically assigned or custom sequence id
- (set on the `MessageBuilder`) that was published and acknowledged by the broker.
-
- After recreating a producer with the same producer name, this will return the
- last message that was published in the previous producer session, or -1 if
- there no message was ever published.
- """
- return self._producer.last_sequence_id()
-
- def send(self, content,
- properties=None,
- partition_key=None,
- sequence_id=None,
- replication_clusters=None,
- disable_replication=False,
- event_timestamp=None,
- deliver_at=None,
- deliver_after=None,
- ):
- """
- Publish a message on the topic. Blocks until the message is acknowledged
-
- Returns a `MessageId` object that represents where the message is persisted.
-
- **Args**
-
- * `content`:
- A `bytes` object with the message payload.
-
- **Options**
-
- * `properties`:
- A dict of application-defined string properties.
- * `partition_key`:
- Sets the partition key for message routing. A hash of this key is used
- to determine the message's topic partition.
- * `sequence_id`:
- Specify a custom sequence id for the message being published.
- * `replication_clusters`:
- Override namespace replication clusters. Note that it is the caller's
- responsibility to provide valid cluster names and that all clusters
- have been previously configured as topics. Given an empty list,
- the message will replicate according to the namespace configuration.
- * `disable_replication`:
- Do not replicate this message.
- * `event_timestamp`:
- Timestamp in millis of the timestamp of event creation
- * `deliver_at`:
- Specify the this message should not be delivered earlier than the
- specified timestamp.
- The timestamp is milliseconds and based on UTC
- * `deliver_after`:
- Specify a delay in timedelta for the delivery of the messages.
-
- """
- msg = self._build_msg(content, properties, partition_key, sequence_id,
- replication_clusters, disable_replication, event_timestamp,
- deliver_at, deliver_after)
- return MessageId.deserialize(self._producer.send(msg))
-
- def send_async(self, content, callback,
- properties=None,
- partition_key=None,
- sequence_id=None,
- replication_clusters=None,
- disable_replication=False,
- event_timestamp=None,
- deliver_at=None,
- deliver_after=None,
- ):
- """
- Send a message asynchronously.
-
- The `callback` will be invoked once the message has been acknowledged
- by the broker.
-
- Example:
-
- #!python
- def callback(res, msg_id):
- print('Message published: %s' % res)
-
- producer.send_async(msg, callback)
-
- When the producer queue is full, by default the message will be rejected
- and the callback invoked with an error code.
-
- **Args**
-
- * `content`:
- A `bytes` object with the message payload.
-
- **Options**
-
- * `properties`:
- A dict of application0-defined string properties.
- * `partition_key`:
- Sets the partition key for the message routing. A hash of this key is
- used to determine the message's topic partition.
- * `sequence_id`:
- Specify a custom sequence id for the message being published.
- * `replication_clusters`: Override namespace replication clusters. Note
- that it is the caller's responsibility to provide valid cluster names
- and that all clusters have been previously configured as topics.
- Given an empty list, the message will replicate per the namespace
- configuration.
- * `disable_replication`:
- Do not replicate this message.
- * `event_timestamp`:
- Timestamp in millis of the timestamp of event creation
- * `deliver_at`:
- Specify the this message should not be delivered earlier than the
- specified timestamp.
- The timestamp is milliseconds and based on UTC
- * `deliver_after`:
- Specify a delay in timedelta for the delivery of the messages.
- """
- msg = self._build_msg(content, properties, partition_key, sequence_id,
- replication_clusters, disable_replication, event_timestamp,
- deliver_at, deliver_after)
- self._producer.send_async(msg, callback)
-
-
- def flush(self):
- """
- Flush all the messages buffered in the client and wait until all messages have been
- successfully persisted
- """
- self._producer.flush()
-
-
- def close(self):
- """
- Close the producer.
- """
- self._producer.close()
-
- def _build_msg(self, content, properties, partition_key, sequence_id,
- replication_clusters, disable_replication, event_timestamp,
- deliver_at, deliver_after):
- data = self._schema.encode(content)
-
- _check_type(bytes, data, 'data')
- _check_type_or_none(dict, properties, 'properties')
- _check_type_or_none(str, partition_key, 'partition_key')
- _check_type_or_none(int, sequence_id, 'sequence_id')
- _check_type_or_none(list, replication_clusters, 'replication_clusters')
- _check_type(bool, disable_replication, 'disable_replication')
- _check_type_or_none(int, event_timestamp, 'event_timestamp')
- _check_type_or_none(int, deliver_at, 'deliver_at')
- _check_type_or_none(timedelta, deliver_after, 'deliver_after')
-
- mb = _pulsar.MessageBuilder()
- mb.content(data)
- if properties:
- for k, v in properties.items():
- mb.property(k, v)
- if partition_key:
- mb.partition_key(partition_key)
- if sequence_id:
- mb.sequence_id(sequence_id)
- if replication_clusters:
- mb.replication_clusters(replication_clusters)
- if disable_replication:
- mb.disable_replication(disable_replication)
- if event_timestamp:
- mb.event_timestamp(event_timestamp)
- if deliver_at:
- mb.deliver_at(deliver_at)
- if deliver_after:
- mb.deliver_after(deliver_after)
-
- return mb.build()
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.Producer">Producer</a></li>
- </ul>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.Producer.close">
- <p>def <span class="ident">close</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Close the producer.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.close', this);">Show source ≡</a></p>
- <div id="source-pulsar.Producer.close" class="source">
- <pre><code>def close(self):
- """
- Close the producer.
- """
- self._producer.close()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Producer.flush">
- <p>def <span class="ident">flush</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Flush all the messages buffered in the client and wait until all messages have been
-successfully persisted</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.flush', this);">Show source ≡</a></p>
- <div id="source-pulsar.Producer.flush" class="source">
- <pre><code>def flush(self):
- """
- Flush all the messages buffered in the client and wait until all messages have been
- successfully persisted
- """
- self._producer.flush()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Producer.last_sequence_id">
- <p>def <span class="ident">last_sequence_id</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Get the last sequence id that was published by this producer.</p>
-<p>This represent either the automatically assigned or custom sequence id
-(set on the <code>MessageBuilder</code>) that was published and acknowledged by the broker.</p>
-<p>After recreating a producer with the same producer name, this will return the
-last message that was published in the previous producer session, or -1 if
-there no message was ever published.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.last_sequence_id', this);">Show source ≡</a></p>
- <div id="source-pulsar.Producer.last_sequence_id" class="source">
- <pre><code>def last_sequence_id(self):
- """
- Get the last sequence id that was published by this producer.
- This represent either the automatically assigned or custom sequence id
- (set on the `MessageBuilder`) that was published and acknowledged by the broker.
- After recreating a producer with the same producer name, this will return the
- last message that was published in the previous producer session, or -1 if
- there no message was ever published.
- """
- return self._producer.last_sequence_id()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Producer.producer_name">
- <p>def <span class="ident">producer_name</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Return the producer name which could have been assigned by the
-system or specified by the client</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.producer_name', this);">Show source ≡</a></p>
- <div id="source-pulsar.Producer.producer_name" class="source">
- <pre><code>def producer_name(self):
- """
- Return the producer name which could have been assigned by the
- system or specified by the client
- """
- return self._producer.producer_name()
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Producer.send">
- <p>def <span class="ident">send</span>(</p><p>self, content, properties=None, partition_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Publish a message on the topic. Blocks until the message is acknowledged</p>
-<p>Returns a <code>MessageId</code> object that represents where the message is persisted.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>content</code>:
- A <code>bytes</code> object with the message payload.</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>properties</code>:
- A dict of application-defined string properties.</li>
-<li><code>partition_key</code>:
- Sets the partition key for message routing. A hash of this key is used
- to determine the message's topic partition.</li>
-<li><code>sequence_id</code>:
- Specify a custom sequence id for the message being published.</li>
-<li><code>replication_clusters</code>:
- Override namespace replication clusters. Note that it is the caller's
- responsibility to provide valid cluster names and that all clusters
- have been previously configured as topics. Given an empty list,
- the message will replicate according to the namespace configuration.</li>
-<li><code>disable_replication</code>:
- Do not replicate this message.</li>
-<li><code>event_timestamp</code>:
- Timestamp in millis of the timestamp of event creation</li>
-<li><code>deliver_at</code>:
- Specify the this message should not be delivered earlier than the
- specified timestamp.
- The timestamp is milliseconds and based on UTC</li>
-<li><code>deliver_after</code>:
- Specify a delay in timedelta for the delivery of the messages.</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.send', this);">Show source ≡</a></p>
- <div id="source-pulsar.Producer.send" class="source">
- <pre><code>def send(self, content,
- properties=None,
- partition_key=None,
- sequence_id=None,
- replication_clusters=None,
- disable_replication=False,
- event_timestamp=None,
- deliver_at=None,
- deliver_after=None,
- ):
- """
- Publish a message on the topic. Blocks until the message is acknowledged
- Returns a `MessageId` object that represents where the message is persisted.
- **Args**
- * `content`:
- A `bytes` object with the message payload.
- **Options**
- * `properties`:
- A dict of application-defined string properties.
- * `partition_key`:
- Sets the partition key for message routing. A hash of this key is used
- to determine the message's topic partition.
- * `sequence_id`:
- Specify a custom sequence id for the message being published.
- * `replication_clusters`:
- Override namespace replication clusters. Note that it is the caller's
- responsibility to provide valid cluster names and that all clusters
- have been previously configured as topics. Given an empty list,
- the message will replicate according to the namespace configuration.
- * `disable_replication`:
- Do not replicate this message.
- * `event_timestamp`:
- Timestamp in millis of the timestamp of event creation
- * `deliver_at`:
- Specify the this message should not be delivered earlier than the
- specified timestamp.
- The timestamp is milliseconds and based on UTC
- * `deliver_after`:
- Specify a delay in timedelta for the delivery of the messages.
- """
- msg = self._build_msg(content, properties, partition_key, sequence_id,
- replication_clusters, disable_replication, event_timestamp,
- deliver_at, deliver_after)
- return MessageId.deserialize(self._producer.send(msg))
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Producer.send_async">
- <p>def <span class="ident">send_async</span>(</p><p>self, content, callback, properties=None, partition_key=None, sequence_id=None, replication_clusters=None, disable_replication=False, event_timestamp=None, deliver_at=None, deliver_after=None)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Send a message asynchronously.</p>
-<p>The <code>callback</code> will be invoked once the message has been acknowledged
-by the broker.</p>
-<p>Example:</p>
-<pre><code>#!python
-def callback(res, msg_id):
- print('Message published: %s' % res)
-
-producer.send_async(msg, callback)
-</code></pre>
-<p>When the producer queue is full, by default the message will be rejected
-and the callback invoked with an error code.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>content</code>:
- A <code>bytes</code> object with the message payload.</li>
-</ul>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>properties</code>:
- A dict of application0-defined string properties.</li>
-<li><code>partition_key</code>:
- Sets the partition key for the message routing. A hash of this key is
- used to determine the message's topic partition.</li>
-<li><code>sequence_id</code>:
- Specify a custom sequence id for the message being published.</li>
-<li><code>replication_clusters</code>: Override namespace replication clusters. Note
- that it is the caller's responsibility to provide valid cluster names
- and that all clusters have been previously configured as topics.
- Given an empty list, the message will replicate per the namespace
- configuration.</li>
-<li><code>disable_replication</code>:
- Do not replicate this message.</li>
-<li><code>event_timestamp</code>:
- Timestamp in millis of the timestamp of event creation</li>
-<li><code>deliver_at</code>:
- Specify the this message should not be delivered earlier than the
- specified timestamp.
- The timestamp is milliseconds and based on UTC</li>
-<li><code>deliver_after</code>:
- Specify a delay in timedelta for the delivery of the messages.</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.send_async', this);">Show source ≡</a></p>
- <div id="source-pulsar.Producer.send_async" class="source">
- <pre><code>def send_async(self, content, callback,
- properties=None,
- partition_key=None,
- sequence_id=None,
- replication_clusters=None,
- disable_replication=False,
- event_timestamp=None,
- deliver_at=None,
- deliver_after=None,
- ):
- """
- Send a message asynchronously.
- The `callback` will be invoked once the message has been acknowledged
- by the broker.
- Example:
- #!python
- def callback(res, msg_id):
- print('Message published: %s' % res)
- producer.send_async(msg, callback)
- When the producer queue is full, by default the message will be rejected
- and the callback invoked with an error code.
- **Args**
- * `content`:
- A `bytes` object with the message payload.
- **Options**
- * `properties`:
- A dict of application0-defined string properties.
- * `partition_key`:
- Sets the partition key for the message routing. A hash of this key is
- used to determine the message's topic partition.
- * `sequence_id`:
- Specify a custom sequence id for the message being published.
- * `replication_clusters`: Override namespace replication clusters. Note
- that it is the caller's responsibility to provide valid cluster names
- and that all clusters have been previously configured as topics.
- Given an empty list, the message will replicate per the namespace
- configuration.
- * `disable_replication`:
- Do not replicate this message.
- * `event_timestamp`:
- Timestamp in millis of the timestamp of event creation
- * `deliver_at`:
- Specify the this message should not be delivered earlier than the
- specified timestamp.
- The timestamp is milliseconds and based on UTC
- * `deliver_after`:
- Specify a delay in timedelta for the delivery of the messages.
- """
- msg = self._build_msg(content, properties, partition_key, sequence_id,
- replication_clusters, disable_replication, event_timestamp,
- deliver_at, deliver_after)
- self._producer.send_async(msg, callback)
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Producer.topic">
- <p>def <span class="ident">topic</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Return the topic which producer is publishing to</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Producer.topic', this);">Show source ≡</a></p>
- <div id="source-pulsar.Producer.topic" class="source">
- <pre><code>def topic(self):
- """
- Return the topic which producer is publishing to
- """
- return self._producer.topic()
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <div class="item">
- <p id="pulsar.Reader" class="name">class <span class="ident">Reader</span></p>
-
-
- <div class="desc"><p>Pulsar topic reader.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader', this);">Show source ≡</a></p>
- <div id="source-pulsar.Reader" class="source">
- <pre><code>class Reader:
- """
- Pulsar topic reader.
- """
-
- def topic(self):
- """
- Return the topic this reader is reading from.
- """
- return self._reader.topic()
-
- def read_next(self, timeout_millis=None):
- """
- Read a single message.
-
- If a message is not immediately available, this method will block until
- a new message is available.
-
- **Options**
-
- * `timeout_millis`:
- If specified, the receive will raise an exception if a message is not
- available within the timeout.
- """
- if timeout_millis is None:
- msg = self._reader.read_next()
- else:
- _check_type(int, timeout_millis, 'timeout_millis')
- msg = self._reader.read_next(timeout_millis)
-
- m = Message()
- m._message = msg
- m._schema = self._schema
- return m
-
- def has_message_available(self):
- """
- Check if there is any message available to read from the current position.
- """
- return self._reader.has_message_available();
-
- def seek(self, messageid):
- """
- Reset this reader to a specific message id or publish timestamp.
- The message id can either be a specific message or represent the first or last messages in the topic.
- Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
- seek() on the individual partitions.
-
- **Args**
-
- * `message`:
- The message id for seek, OR an integer event time to seek to
- """
- self._reader.seek(messageid)
-
- def close(self):
- """
- Close the reader.
- """
- self._reader.close()
- self._client._consumers.remove(self)
-</code></pre>
- </div>
-</div>
-
-
- <div class="class">
- <h3>Ancestors (in MRO)</h3>
- <ul class="class_list">
- <li><a href="#pulsar.Reader">Reader</a></li>
- </ul>
- <h3>Methods</h3>
-
- <div class="item">
- <div class="name def" id="pulsar.Reader.close">
- <p>def <span class="ident">close</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Close the reader.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.close', this);">Show source ≡</a></p>
- <div id="source-pulsar.Reader.close" class="source">
- <pre><code>def close(self):
- """
- Close the reader.
- """
- self._reader.close()
- self._client._consumers.remove(self)
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Reader.has_message_available">
- <p>def <span class="ident">has_message_available</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Check if there is any message available to read from the current position.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.has_message_available', this);">Show source ≡</a></p>
- <div id="source-pulsar.Reader.has_message_available" class="source">
- <pre><code>def has_message_available(self):
- """
- Check if there is any message available to read from the current position.
- """
- return self._reader.has_message_available();
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Reader.read_next">
- <p>def <span class="ident">read_next</span>(</p><p>self, timeout_millis=None)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Read a single message.</p>
-<p>If a message is not immediately available, this method will block until
-a new message is available.</p>
-<p><strong>Options</strong></p>
-<ul>
-<li><code>timeout_millis</code>:
- If specified, the receive will raise an exception if a message is not
- available within the timeout.</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.read_next', this);">Show source ≡</a></p>
- <div id="source-pulsar.Reader.read_next" class="source">
- <pre><code>def read_next(self, timeout_millis=None):
- """
- Read a single message.
- If a message is not immediately available, this method will block until
- a new message is available.
- **Options**
- * `timeout_millis`:
- If specified, the receive will raise an exception if a message is not
- available within the timeout.
- """
- if timeout_millis is None:
- msg = self._reader.read_next()
- else:
- _check_type(int, timeout_millis, 'timeout_millis')
- msg = self._reader.read_next(timeout_millis)
- m = Message()
- m._message = msg
- m._schema = self._schema
- return m
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Reader.seek">
- <p>def <span class="ident">seek</span>(</p><p>self, messageid)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Reset this reader to a specific message id or publish timestamp.
-The message id can either be a specific message or represent the first or last messages in the topic.
-Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
-seek() on the individual partitions.</p>
-<p><strong>Args</strong></p>
-<ul>
-<li><code>message</code>:
- The message id for seek, OR an integer event time to seek to</li>
-</ul></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.seek', this);">Show source ≡</a></p>
- <div id="source-pulsar.Reader.seek" class="source">
- <pre><code>def seek(self, messageid):
- """
- Reset this reader to a specific message id or publish timestamp.
- The message id can either be a specific message or represent the first or last messages in the topic.
- Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
- seek() on the individual partitions.
- **Args**
- * `message`:
- The message id for seek, OR an integer event time to seek to
- """
- self._reader.seek(messageid)
-</code></pre>
- </div>
-</div>
-
- </div>
-
-
- <div class="item">
- <div class="name def" id="pulsar.Reader.topic">
- <p>def <span class="ident">topic</span>(</p><p>self)</p>
- </div>
-
-
-
-
- <div class="desc"><p>Return the topic this reader is reading from.</p></div>
- <div class="source_cont">
- <p class="source_link"><a href="javascript:void(0);" onclick="toggle('source-pulsar.Reader.topic', this);">Show source ≡</a></p>
- <div id="source-pulsar.Reader.topic" class="source">
- <pre><code>def topic(self):
- """
- Return the topic this reader is reading from.
- """
- return self._reader.topic()
-</code></pre>
- </div>
-</div>
-
- </div>
-
- </div>
- </div>
-
- <h2 class="section-title" id="header-submodules">Sub-modules</h2>
- <div class="item">
- <p class="name"><a href="functions/index.html">pulsar.functions</a></p>
-
-
-
- </div>
- <div class="item">
- <p class="name"><a href="schema/index.html">pulsar.schema</a></p>
-
-
-
- </div>
- </section>
-
- </article>
- <div class="clear"> </div>
- <footer id="footer">
- <p>
- Documentation generated by
- <a href="https://github.com/BurntSushi/pdoc">pdoc 0.3.2</a>
- </p>
-
- <p>pdoc is in the public domain with the
- <a href="http://unlicense.org">UNLICENSE</a></p>
-
- <p>Design by <a href="http://nadh.in">Kailash Nadh</a></p>
- </footer>
-</div>
-</body>
-</html>