You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2021/04/16 02:44:26 UTC

[pulsar-client-go] branch master updated: [#504] add http lookup service support (#510)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ec40d79  [#504] add http lookup service support (#510)
ec40d79 is described below

commit ec40d795841b1d61f7cbb07fad08e3c587db8801
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Apr 16 10:44:19 2021 +0800

    [#504] add http lookup service support (#510)
    
    Fixes #504
    
    Master Issue: #504
    
    ### Motivation
    
    java-client has support http-based lookup: https://github.com/apache/pulsar/blob/12ef7c9b7ce698e6a8181700a2e8a2b030bcfa60/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L150
    
    This pr adds http lookup service into `pulsar-client-go`, to allow user use http/https protocol as service url.
    
    ### Modifications
    
    - [x] add `HTTPLookupService`
    - [x] add `HTTPClient`
    - [x] add tests
    - [x] apply go 1.15 with enable Common Name matching with x509 cert in integration-tests certs
    - [x] add auth support
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (not documented)
      - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
---
 go.mod                                             |   3 +
 integration-tests/certs/broker-cert.pem            | 113 +++---
 integration-tests/certs/broker-key.pem             |  50 +--
 integration-tests/certs/cacert.pem                 | 114 +++---
 integration-tests/certs/client-cert.pem            | 113 +++---
 integration-tests/certs/client-key.pem             |  50 +--
 integration-tests/certs/openssl.cnf                |   4 +-
 pulsar-test-service-start.sh                       |   2 +-
 pulsar/client_impl.go                              |  35 +-
 pulsar/client_impl_test.go                         | 418 +++++++++++++++++++++
 pulsar/internal/auth/athenz.go                     |  26 +-
 pulsar/internal/auth/disabled.go                   |  17 +-
 pulsar/internal/auth/oauth2.go                     |  66 +++-
 pulsar/internal/auth/provider.go                   |  13 +
 pulsar/internal/auth/tls.go                        |  29 +-
 pulsar/internal/auth/token.go                      |  18 +
 pulsar/internal/http_client.go                     | 352 +++++++++++++++++
 .../{auth/disabled.go => http_client_go_1.11.go}   |  35 +-
 .../{auth/disabled.go => http_client_go_1.12.go}   |  31 +-
 pulsar/internal/lookup_service.go                  | 131 ++++++-
 pulsar/internal/lookup_service_test.go             |  83 +++-
 pulsar/internal/topic_name.go                      |  15 +
 pulsar/internal/topic_name_test.go                 |  70 ++++
 pulsar/test_helper.go                              |   3 +-
 24 files changed, 1497 insertions(+), 294 deletions(-)

diff --git a/go.mod b/go.mod
index 8eb3571..1ecd393 100644
--- a/go.mod
+++ b/go.mod
@@ -15,6 +15,8 @@ require (
 	github.com/klauspost/compress v1.10.8
 	github.com/kr/pretty v0.2.0 // indirect
 	github.com/linkedin/goavro/v2 v2.9.8
+	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
+	github.com/modern-go/reflect2 v1.0.1 // indirect
 	github.com/pierrec/lz4 v2.0.5+incompatible
 	github.com/pkg/errors v0.9.1
 	github.com/prometheus/client_golang v1.7.1
@@ -25,6 +27,7 @@ require (
 	github.com/stretchr/testify v1.4.0
 	github.com/yahoo/athenz v1.8.55
 	go.uber.org/atomic v1.7.0
+	golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
 )
 
 replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
diff --git a/integration-tests/certs/broker-cert.pem b/integration-tests/certs/broker-cert.pem
index 1c3ac16..504534e 100644
--- a/integration-tests/certs/broker-cert.pem
+++ b/integration-tests/certs/broker-cert.pem
@@ -1,78 +1,81 @@
 Certificate:
     Data:
         Version: 3 (0x2)
-        Serial Number: 11302484053501346331 (0x9cda7d5d1077021b)
+        Serial Number: 9806047019225563732 (0x881613cf2fb7c254)
     Signature Algorithm: sha1WithRSAEncryption
         Issuer: C=US, ST=CA, O=Apache, OU=Apache Pulsar, CN=localhost
         Validity
-            Not Before: Feb 16 19:12:19 2021 GMT
-            Not After : Feb 14 19:12:19 2031 GMT
+            Not Before: Apr 13 08:01:48 2021 GMT
+            Not After : Apr 11 08:01:48 2031 GMT
         Subject: C=US, ST=CA, O=Apache, OU=Apache Pulsar, CN=localhost
         Subject Public Key Info:
             Public Key Algorithm: rsaEncryption
                 Public-Key: (2048 bit)
                 Modulus:
-                    00:bb:c0:b5:9c:7a:6e:bd:9d:18:c0:34:13:cb:e4:
-                    af:d7:ef:47:ee:78:2c:84:48:6c:35:8b:74:6a:58:
-                    ab:dc:52:5c:b3:38:bf:dd:3d:dd:c1:0d:af:8d:9c:
-                    53:db:1f:a5:53:f1:4a:05:63:b3:3a:97:1c:e1:5b:
-                    c3:2d:11:09:6d:fc:58:29:36:c1:ad:2d:3a:35:39:
-                    ab:52:81:89:6d:be:3a:25:4e:a2:29:aa:86:83:3e:
-                    72:92:1f:1f:33:8a:0c:f4:28:de:62:16:25:0b:f5:
-                    45:f7:a9:71:c2:30:ab:52:dc:07:d5:a4:5e:c9:46:
-                    e8:03:0d:a7:57:74:a4:e5:ed:8a:c9:e2:0c:f1:e1:
-                    10:4d:bb:7f:bc:84:c5:c1:13:22:3d:c9:d8:d0:55:
-                    8a:bb:51:98:e5:11:36:8e:9b:da:1a:f2:0f:48:99:
-                    49:24:38:94:fa:ab:ef:50:c5:13:a7:df:f6:17:8c:
-                    a5:5e:31:87:ab:5a:27:c8:e9:fb:e7:db:94:92:ec:
-                    4d:b6:2f:56:8e:73:b5:ff:57:f2:ed:f8:09:2d:f9:
-                    e2:93:ed:8d:6a:18:54:3e:86:04:79:5a:7f:d1:db:
-                    ae:47:b0:03:21:6b:57:f4:6b:84:a5:ba:41:84:09:
-                    e2:4c:5c:bd:f8:42:c6:46:39:d1:64:8e:db:7c:41:
-                    d9:b5
+                    00:a2:78:b9:3f:af:e2:d3:15:e4:9a:d1:54:27:93:
+                    a2:6a:f1:db:e0:40:34:d5:72:96:b9:61:c7:c3:02:
+                    22:5c:03:9d:d9:f6:5c:37:d6:9e:93:e4:64:37:69:
+                    1b:56:e1:a8:58:09:17:3d:87:63:07:c3:c2:b5:33:
+                    55:e6:b6:cf:c3:d9:5b:7a:8a:90:5e:19:35:2b:1f:
+                    c1:38:ba:02:7f:49:4e:7f:49:9b:d3:e3:a3:a0:f1:
+                    55:75:83:e5:75:ec:fb:f8:88:a9:b1:10:2b:14:00:
+                    f8:f9:6c:13:be:9b:ca:64:b1:66:f4:ab:6e:b4:ca:
+                    10:85:15:84:6c:f1:1a:29:06:15:e3:2e:3e:d5:83:
+                    c5:8d:e9:b7:42:5c:16:e8:72:37:e8:bc:f0:57:59:
+                    37:3b:14:ad:3d:12:0b:41:12:1b:54:1e:41:de:5f:
+                    44:58:6b:25:e6:b9:6f:02:bd:f7:ed:43:44:2f:62:
+                    66:9c:05:1e:99:f9:56:1e:c7:de:09:96:55:ea:dd:
+                    a1:38:42:ae:d7:a1:88:b3:9e:21:ea:a1:7c:6d:76:
+                    6a:d4:fd:d7:fd:ba:03:7a:90:46:66:29:4f:40:b6:
+                    95:8b:c3:48:44:c5:00:1b:4c:5e:0c:c2:f5:7a:1f:
+                    2f:35:38:da:37:b7:05:ce:f9:20:ec:f2:28:1b:8d:
+                    60:ef
                 Exponent: 65537 (0x10001)
         X509v3 extensions:
             X509v3 Basic Constraints: 
                 CA:FALSE
             X509v3 Authority Key Identifier: 
-                keyid:AE:7E:50:17:5D:13:6E:22:E5:C6:97:43:B9:8F:3A:B6:FA:FB:48:D2
+                keyid:59:A0:A4:78:56:E1:56:61:4E:DC:E9:80:FF:EC:66:B6:2F:04:6D:AA
 
             X509v3 Subject Key Identifier: 
-                19:CF:49:FC:44:E7:14:CD:F8:D3:1F:F0:7F:1E:AB:BC:39:3B:4C:EE
+                9B:27:81:1A:7A:94:A7:C0:05:1C:6E:23:8D:80:B9:97:0D:46:95:F8
+            X509v3 Subject Alternative Name: 
+                DNS:localhost
     Signature Algorithm: sha1WithRSAEncryption
-         de:68:16:95:78:37:bc:67:6f:a4:b2:ba:d4:9b:e8:d0:9c:d3:
-         c3:88:25:1a:db:c1:ad:d1:a3:b9:a4:22:0a:34:ce:30:ea:cc:
-         ca:95:b4:86:18:31:d7:be:45:62:cc:5e:97:8b:f8:01:3d:03:
-         ac:1e:d0:67:7d:ad:c6:0d:39:ce:61:bf:0b:ab:6b:7d:8b:41:
-         96:15:b4:d7:5d:9b:02:d2:15:9e:3d:03:84:f2:f5:41:f7:a8:
-         f0:14:e1:22:d3:81:8b:d1:e6:b5:55:95:83:85:c8:41:73:d4:
-         66:fb:4d:52:3b:5a:20:e0:1a:f2:44:08:b0:aa:31:8b:57:bf:
-         b0:d5:51:3c:af:c7:0f:2f:27:23:fe:32:12:57:f4:7d:a0:74:
-         a9:3b:05:fd:c2:12:90:33:e6:75:3b:56:66:4b:4b:b9:af:05:
-         f7:84:0d:78:fb:1d:94:4b:bf:d8:bb:85:b7:0b:d2:64:27:05:
-         1a:a3:81:00:31:76:83:7d:ed:e2:98:0b:02:e6:0e:c0:c8:42:
-         d1:54:cf:a4:20:50:f1:99:47:b9:06:18:29:24:4f:64:a2:86:
-         69:0c:61:59:53:77:86:0b:ec:ca:e6:5a:60:a9:72:6c:88:72:
-         8e:b2:75:0a:a1:fa:1d:6e:cd:e9:ee:a2:05:c4:f4:4c:6b:42:
-         e2:c4:6d:a2
+         11:a0:ed:8a:27:16:7b:80:f5:12:e5:0a:ed:e4:73:11:55:bf:
+         4e:c5:b0:62:c7:0e:74:51:0a:31:34:e3:98:7e:68:32:79:75:
+         52:0a:b3:9d:12:f7:47:bd:a4:48:dc:00:d5:46:c5:6f:9c:67:
+         59:72:8b:fb:97:95:aa:88:e7:62:d2:3f:16:67:d9:c8:b5:8a:
+         eb:b2:71:0d:e5:dd:fe:05:bc:8e:79:59:95:d2:e5:d9:16:94:
+         e7:6f:44:8f:91:22:30:8e:bf:d3:9d:a4:7f:0e:84:77:d3:15:
+         a0:ed:ed:da:ca:74:99:6d:3e:6a:a3:63:f3:cb:cc:f4:d4:1c:
+         63:a7:5c:2c:4c:cb:8b:35:a7:f5:01:1b:df:8b:1c:e5:40:88:
+         47:32:f5:f2:f7:9a:35:29:37:0a:7d:77:a0:3d:db:fb:53:95:
+         af:74:f4:c0:bb:cb:e1:4a:9b:b4:29:32:21:31:50:fd:57:d0:
+         e1:f2:81:98:8c:06:93:1d:da:c6:89:c2:54:44:5f:85:8b:a9:
+         e8:23:7b:0a:dc:1e:ab:1a:73:3a:19:f1:89:ad:91:54:1d:c3:
+         1a:e9:e1:aa:fa:d9:37:35:19:dd:6f:f1:d2:d8:9e:c2:3e:4c:
+         86:26:38:d3:25:5e:03:f7:8e:96:a1:63:d0:a3:3b:a4:5e:ac:
+         bc:45:90:f2
 -----BEGIN CERTIFICATE-----
-MIIDfjCCAmagAwIBAgIJAJzafV0QdwIbMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+MIIDlDCCAnygAwIBAgIJAIgWE88vt8JUMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
 BAYTAlVTMQswCQYDVQQIDAJDQTEPMA0GA1UECgwGQXBhY2hlMRYwFAYDVQQLDA1B
-cGFjaGUgUHVsc2FyMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMjEwMjE2MTkxMjE5
-WhcNMzEwMjE0MTkxMjE5WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExDzAN
+cGFjaGUgUHVsc2FyMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMjEwNDEzMDgwMTQ4
+WhcNMzEwNDExMDgwMTQ4WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExDzAN
 BgNVBAoMBkFwYWNoZTEWMBQGA1UECwwNQXBhY2hlIFB1bHNhcjESMBAGA1UEAwwJ
-bG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAu8C1nHpu
-vZ0YwDQTy+Sv1+9H7ngshEhsNYt0alir3FJcszi/3T3dwQ2vjZxT2x+lU/FKBWOz
-Opcc4VvDLREJbfxYKTbBrS06NTmrUoGJbb46JU6iKaqGgz5ykh8fM4oM9CjeYhYl
-C/VF96lxwjCrUtwH1aReyUboAw2nV3Sk5e2KyeIM8eEQTbt/vITFwRMiPcnY0FWK
-u1GY5RE2jpvaGvIPSJlJJDiU+qvvUMUTp9/2F4ylXjGHq1onyOn759uUkuxNti9W
-jnO1/1fy7fgJLfnik+2NahhUPoYEeVp/0duuR7ADIWtX9GuEpbpBhAniTFy9+ELG
-RjnRZI7bfEHZtQIDAQABo00wSzAJBgNVHRMEAjAAMB8GA1UdIwQYMBaAFK5+UBdd
-E24i5caXQ7mPOrb6+0jSMB0GA1UdDgQWBBQZz0n8ROcUzfjTH/B/Hqu8OTtM7jAN
-BgkqhkiG9w0BAQUFAAOCAQEA3mgWlXg3vGdvpLK61Jvo0JzTw4glGtvBrdGjuaQi
-CjTOMOrMypW0hhgx175FYsxel4v4AT0DrB7QZ32txg05zmG/C6trfYtBlhW0112b
-AtIVnj0DhPL1Qfeo8BThItOBi9HmtVWVg4XIQXPUZvtNUjtaIOAa8kQIsKoxi1e/
-sNVRPK/HDy8nI/4yElf0faB0qTsF/cISkDPmdTtWZktLua8F94QNePsdlEu/2LuF
-twvSZCcFGqOBADF2g33t4pgLAuYOwMhC0VTPpCBQ8ZlHuQYYKSRPZKKGaQxhWVN3
-hgvsyuZaYKlybIhyjrJ1CqH6HW7N6e6iBcT0TGtC4sRtog==
+bG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoni5P6/i
+0xXkmtFUJ5OiavHb4EA01XKWuWHHwwIiXAOd2fZcN9aek+RkN2kbVuGoWAkXPYdj
+B8PCtTNV5rbPw9lbeoqQXhk1Kx/BOLoCf0lOf0mb0+OjoPFVdYPldez7+IipsRAr
+FAD4+WwTvpvKZLFm9KtutMoQhRWEbPEaKQYV4y4+1YPFjem3QlwW6HI36LzwV1k3
+OxStPRILQRIbVB5B3l9EWGsl5rlvAr337UNEL2JmnAUemflWHsfeCZZV6t2hOEKu
+16GIs54h6qF8bXZq1P3X/boDepBGZilPQLaVi8NIRMUAG0xeDML1eh8vNTjaN7cF
+zvkg7PIoG41g7wIDAQABo2MwYTAJBgNVHRMEAjAAMB8GA1UdIwQYMBaAFFmgpHhW
+4VZhTtzpgP/sZrYvBG2qMB0GA1UdDgQWBBSbJ4EaepSnwAUcbiONgLmXDUaV+DAU
+BgNVHREEDTALgglsb2NhbGhvc3QwDQYJKoZIhvcNAQEFBQADggEBABGg7YonFnuA
+9RLlCu3kcxFVv07FsGLHDnRRCjE045h+aDJ5dVIKs50S90e9pEjcANVGxW+cZ1ly
+i/uXlaqI52LSPxZn2ci1iuuycQ3l3f4FvI55WZXS5dkWlOdvRI+RIjCOv9OdpH8O
+hHfTFaDt7drKdJltPmqjY/PLzPTUHGOnXCxMy4s1p/UBG9+LHOVAiEcy9fL3mjUp
+Nwp9d6A92/tTla909MC7y+FKm7QpMiExUP1X0OHygZiMBpMd2saJwlREX4WLqegj
+ewrcHqsaczoZ8YmtkVQdwxrp4ar62Tc1Gd1v8dLYnsI+TIYmONMlXgP3jpahY9Cj
+O6RerLxFkPI=
 -----END CERTIFICATE-----
diff --git a/integration-tests/certs/broker-key.pem b/integration-tests/certs/broker-key.pem
index 9906b35..ef7049e 100644
--- a/integration-tests/certs/broker-key.pem
+++ b/integration-tests/certs/broker-key.pem
@@ -1,27 +1,27 @@
 -----BEGIN RSA PRIVATE KEY-----
-MIIEpAIBAAKCAQEAu8C1nHpuvZ0YwDQTy+Sv1+9H7ngshEhsNYt0alir3FJcszi/
-3T3dwQ2vjZxT2x+lU/FKBWOzOpcc4VvDLREJbfxYKTbBrS06NTmrUoGJbb46JU6i
-KaqGgz5ykh8fM4oM9CjeYhYlC/VF96lxwjCrUtwH1aReyUboAw2nV3Sk5e2KyeIM
-8eEQTbt/vITFwRMiPcnY0FWKu1GY5RE2jpvaGvIPSJlJJDiU+qvvUMUTp9/2F4yl
-XjGHq1onyOn759uUkuxNti9WjnO1/1fy7fgJLfnik+2NahhUPoYEeVp/0duuR7AD
-IWtX9GuEpbpBhAniTFy9+ELGRjnRZI7bfEHZtQIDAQABAoIBACDirSy+SR1PIeGw
-fXMuGBqzX58GT43P/ZwXVAm4hEmv52rA2uJOjsPcuM4N/qYX0eZ1EekFwIPXqghT
-AUmofUVVeL+nRp8ll9dwxXBLHh4pvpeAQWkY5DyiuICdkOJ/IKu3aNQ3Wr/QczPg
-6vr/eVKnFXw3MR8CG2tFffUUijl3Z/EswoezMhWNLJE8Gp1HqgPxSuEisw8aUBvT
-WqSg313FX+BKLKJKfddxmf6D4CkzKZBfxxVt0oCct0UmSMqNxPNpxGFuhi2+RBUp
-/tsRF8DXk2PZPh7FzeiAF4ebeUxXeh723i9ugROMMSOIoKjhAgKpRNaosXtDj6Mb
-sjw886ECgYEA6QdrzjqGIh7802+cLa63BWD1a6LA9lo8b9JmDT7/SPbjQ8hWVgUK
-h6zD8QnA3aK8oNX4/TR4xgBIaVgLgV2M8R3C4F5GFEwN9+rc4JUgfG3xOa3AG81I
-bQkOPr6weSwd1bsW4AddJEJ3jmLMgTBIG57p/vV3f931UOAx846hD40CgYEAzkK5
-mjWcUtAivNbOgSjPsgCM0e1WwVf5rh8Da65QoVFE1u1K+t9T311OhrdJG50BMc6V
-B4UdoM7s54BKFc68/3Kaen81bW3l3nmBVoWJGMiF/VLRF/W1PNFg2H3TSTjH8C3W
-xn+2i5tbAJfbv+Rj44sNjXsN/45blhzDuyA0NMkCgYEAwZFnjLRyjU9eRqgUfKH/
-59jlpFZaoYjNYTnNib1Fw69JkxmMFgI0AAWT8dpfOfwfYh67qGD43ciIXfFEBV7X
-X8aQBV5qKhwPElwmw+s8Q//zfC0HQlDgk6oNCjPOVak+0zaIxul3BlKsCzuokruv
-ejokN6RhcnbGezrwY1ecFFUCgYA73eYdOl5LMxpCEV8S4hOWqM42yFqb97sCrQHD
-KW9Bw2Llty5CLrwbYuu98kkH+MQ1/q4G5r44xJbCgtv+BGmbA1qPH4kUYAeOUVoX
-EN9AEEPDyVvjRm/6XRNtAuhmolJ2BfgG2V7Ump0JSQqBYfsUOnIOlJ71hA5VPbga
-Ajjk8QKBgQDoZUeFa5y6B3YWdgI24nRustuCOCBLPndL2Hx5S2EZN/ydoEd8I667
-1OhztTiLFBuHYdagOJp9XGoFIurbE/oMrOsWwS+L+QfXuQR9PKCdvxHIsj+bNzNg
-QdvUrlX+6kpriBN5dtniqCUVA+dlKcHSUllwVSvxTqrjs9U+N+fSnA==
+MIIEogIBAAKCAQEAoni5P6/i0xXkmtFUJ5OiavHb4EA01XKWuWHHwwIiXAOd2fZc
+N9aek+RkN2kbVuGoWAkXPYdjB8PCtTNV5rbPw9lbeoqQXhk1Kx/BOLoCf0lOf0mb
+0+OjoPFVdYPldez7+IipsRArFAD4+WwTvpvKZLFm9KtutMoQhRWEbPEaKQYV4y4+
+1YPFjem3QlwW6HI36LzwV1k3OxStPRILQRIbVB5B3l9EWGsl5rlvAr337UNEL2Jm
+nAUemflWHsfeCZZV6t2hOEKu16GIs54h6qF8bXZq1P3X/boDepBGZilPQLaVi8NI
+RMUAG0xeDML1eh8vNTjaN7cFzvkg7PIoG41g7wIDAQABAoIBAGfai2+uWmtbvzlH
+PWfe8x8xYr9By4P1L2tomWyJ3LS8ixZcaC45PjyC7ZyNzzAGJrm/dd+avy6jb76s
+BejoSn6CXftCv22m51mdfsFoBVif31F2F+mlxV6HZX0sxmxPA/GmSfIox6CNzpiB
+AU4B/6nFzG3xd16RG2Y2juOkanh1qn8fY+pDspV1LG1v4MBRrFfPPRdJ5m4+Xp5a
+dt+EGqjYX8qdsbKJ6ZqwvvrOZMk4vDBCgYwtPEPGLYCJUNpcNcaFH013eRxW9pQ7
+X9azRrFBfbiIGvucgoqfA6N4mnA/hiHZFAO/37n3+R5aKpy+SGVPlPfj4nE9jEQ0
+bi4R2kECgYEA0vV263GC2PApybf0+UPi1B+i4MtoV4/bUT2CKByEKt++uKgH0bIT
+ytNNobnEoMSyrwkRGNlsFD0xRZn7yQBFuAZ0H0pk1QZsHVhy8Ax1S9tC2ec3qloH
+OxvWwkTbjlCkdWTEa3Hgp9R+ITChs4vLUVYkEEIeH+w4uv4uUI/iLYcCgYEAxSkN
+3No9nPJgK4OhR1e+K9MaFGVApdV7hgNFWcGpOs6NY4Br9Exdr5MNY8ONIVXmFBSO
+D1z3yC/Wv1lFHpJKoJB6lSaPLMfYoyRODR27X3/9WyPbbEQmj1LPn7yFOrxu+Rtw
+PvwtoWz22C+iY2a1OCtYlQRt75uFuUy/R+YDS1kCgYAY7jtUjcCZ5J/7n8rKnnQy
+/14fVMqK9oxDqJI2LVCNRI3kgq8R4AqowJn/D9Yl7bj0KI0ls6QrLCQjhpQT5/1V
+RyX4ALGz3yNtwGMs7Tv7b/kRiVqxvhv2p06U/NA5nFwjJ3py/C1BiEIzp+ykLFQl
+05lFdprayO7XYmyUxrxDBwKBgHR4+FOMKfHZza5nKJO45gOFxlZeKn7NSxe0Pvfz
+TEINW8F4UE4V/xKF0Ncq4ujakf0y12mEBm0gtcB8wDR7P9LzBzete898kCJwhBuD
+gPm95rI80Jd4+z6YMUewWxSD1Rscdob++wXuFTVJjqkiN9Ri5wXBhCCUw77f8/BJ
+lZChAoGAZdXu+to1Y86TxO+kdZmIk40suEBrmdyAS7M33Zwby/8I+YA9wuU2qTF7
+VyKUz6LGmAH1WdPgFeDFFjwbzahLTJ6wsTYnoCT7RB881v1gKliIp71M6vKzEPd+
+73l0v89CEMXeLNMWkTp32QTVXp/wu6Nh6B9Rh2gPGWrZLi0eRIg=
 -----END RSA PRIVATE KEY-----
diff --git a/integration-tests/certs/cacert.pem b/integration-tests/certs/cacert.pem
index 432b6a7..fd9f36e 100644
--- a/integration-tests/certs/cacert.pem
+++ b/integration-tests/certs/cacert.pem
@@ -1,81 +1,83 @@
 Certificate:
     Data:
         Version: 3 (0x2)
-        Serial Number: 15605511759691510625 (0xd891ea438e97f361)
+        Serial Number: 12259044356400093384 (0xaa20dfe414fc18c8)
     Signature Algorithm: sha256WithRSAEncryption
         Issuer: C=US, ST=CA, O=Apache, OU=Apache Pulsar, CN=localhost
         Validity
-            Not Before: Feb 16 19:12:19 2021 GMT
-            Not After : Feb 16 19:12:19 2024 GMT
+            Not Before: Apr 13 08:01:48 2021 GMT
+            Not After : Apr 12 08:01:48 2024 GMT
         Subject: C=US, ST=CA, O=Apache, OU=Apache Pulsar, CN=localhost
         Subject Public Key Info:
             Public Key Algorithm: rsaEncryption
                 Public-Key: (2048 bit)
                 Modulus:
-                    00:e3:31:32:9c:cc:a4:1d:2e:39:ed:7c:ee:14:3b:
-                    29:71:e0:90:56:f0:ca:00:86:66:4b:32:1e:e3:a5:
-                    3e:e6:bc:0c:98:ea:ff:0c:da:3e:00:7a:82:f5:a4:
-                    59:1c:f2:4c:35:2d:52:83:d6:f8:6d:33:2c:f7:ec:
-                    48:30:da:13:dd:a9:8f:98:88:21:80:ee:45:b5:44:
-                    73:62:69:2b:c4:47:1c:fb:9e:8f:0b:01:b7:94:28:
-                    62:f1:44:b3:64:44:fc:02:62:61:26:05:7a:26:45:
-                    4a:6f:6d:27:13:e4:07:6c:7c:01:d3:1e:ab:53:92:
-                    dc:2d:d0:bf:ce:eb:eb:52:cf:ed:e3:2a:c7:fd:dc:
-                    72:8b:09:6e:3f:23:1a:b8:ee:cc:bb:7f:be:02:16:
-                    8b:e0:e7:d3:10:88:54:68:01:64:c4:06:0a:c7:40:
-                    a0:65:f9:66:25:9b:8d:a2:37:e2:12:5d:fa:94:f4:
-                    27:98:b4:48:95:ef:f0:9c:88:32:9c:a9:7e:6e:8f:
-                    83:d6:d9:44:b4:8a:d1:68:cf:ad:aa:e7:64:b8:04:
-                    2f:02:1d:c0:85:77:b8:39:da:e7:21:28:4a:67:7c:
-                    56:f4:ae:62:e0:9f:74:dc:f4:b5:08:1f:99:50:73:
-                    08:55:90:01:0d:15:dc:ab:a3:cb:75:8c:73:7a:43:
-                    34:0f
+                    00:c6:d2:63:e1:12:01:3d:7f:6f:4a:50:fb:25:f4:
+                    eb:2e:73:e7:a8:4e:cc:0a:8d:a4:cb:30:b7:89:7a:
+                    69:14:c4:0c:94:a1:4a:b8:39:24:1b:fc:1c:b6:c6:
+                    39:2a:94:26:69:a7:d4:13:59:e8:35:b5:34:d9:a4:
+                    36:b6:03:c4:ad:4f:a4:78:94:3c:6b:94:99:70:b5:
+                    4f:6c:ee:5b:ca:ce:15:2d:ab:38:04:8f:52:0d:48:
+                    d5:ce:74:08:00:f2:69:ae:59:08:b6:75:3d:13:c9:
+                    33:32:dc:47:de:fd:1e:bb:c7:50:a1:b1:ca:84:43:
+                    c4:7d:d9:fd:85:dc:93:d1:b3:59:58:e2:85:c6:25:
+                    d2:bf:3a:7c:6f:8a:04:ae:c8:45:be:74:d7:80:d2:
+                    b7:03:3c:11:71:92:9e:e3:f8:37:a1:94:11:12:ee:
+                    2c:88:5a:f8:55:27:1d:fd:6f:22:41:33:9f:e0:d5:
+                    e5:13:30:64:1f:63:a6:1c:f2:7f:e1:1c:e5:a8:3a:
+                    63:45:7b:1b:5c:4e:6a:1b:c9:0e:23:ae:08:3c:b7:
+                    17:d4:07:3c:0f:89:54:4b:58:f2:36:d7:53:10:9a:
+                    d2:5f:d9:82:ac:fd:6e:80:6b:30:4c:96:bd:39:bd:
+                    9f:28:49:31:1d:08:23:5c:18:a0:c3:ce:25:f3:49:
+                    0c:0d
                 Exponent: 65537 (0x10001)
         X509v3 extensions:
             X509v3 Basic Constraints: 
                 CA:TRUE
             X509v3 Authority Key Identifier: 
                 DirName:/C=US/ST=CA/O=Apache/OU=Apache Pulsar/CN=localhost
-                serial:D8:91:EA:43:8E:97:F3:61
+                serial:AA:20:DF:E4:14:FC:18:C8
 
             X509v3 Subject Key Identifier: 
-                AE:7E:50:17:5D:13:6E:22:E5:C6:97:43:B9:8F:3A:B6:FA:FB:48:D2
+                59:A0:A4:78:56:E1:56:61:4E:DC:E9:80:FF:EC:66:B6:2F:04:6D:AA
+            X509v3 Subject Alternative Name: 
+                DNS:localhost
     Signature Algorithm: sha256WithRSAEncryption
-         bd:b5:15:07:c1:be:04:dd:3a:09:5d:16:14:ff:67:ad:df:c3:
-         23:f0:92:1c:ce:35:c7:9d:38:91:bb:00:d3:51:6a:65:32:0e:
-         c9:5b:a7:cd:eb:51:ff:79:22:09:9b:f1:b3:a2:ae:b6:47:59:
-         5e:ef:2a:7a:47:a7:ca:06:9d:ba:e5:af:b1:06:28:25:91:92:
-         c0:0c:ab:22:22:cb:e5:38:7e:5e:f1:8b:b0:82:29:1a:36:db:
-         95:bc:0e:17:f7:9d:7d:9f:ac:52:b9:c3:64:a2:8f:a6:a4:f4:
-         a5:27:97:87:66:62:44:6c:f3:3a:63:aa:5e:ea:ae:58:74:d5:
-         c3:9c:a0:0c:d0:a3:fe:00:c6:7c:21:bd:ed:a5:8f:a6:ee:89:
-         77:e1:b0:4a:81:97:61:f9:59:df:50:b1:05:73:15:f5:cf:8e:
-         31:ad:4e:e5:00:60:ba:75:5c:92:71:68:bb:ca:04:f5:33:34:
-         a0:49:98:d0:17:cb:e4:49:09:6c:f3:7a:88:ad:f6:fd:f1:ed:
-         8d:ee:0f:45:bd:37:6c:26:0a:ab:b3:bd:c8:f3:cd:83:be:f4:
-         f2:94:6e:ca:d1:89:e0:f7:df:67:ec:6a:ee:2e:8d:40:33:45:
-         9f:f2:6a:8f:61:26:a6:02:4a:92:0f:0f:65:1b:6e:a6:35:9a:
-         04:ee:ba:e4
+         1d:d1:d0:86:49:a8:ca:a7:ec:13:5d:0c:60:9a:36:ba:55:17:
+         04:2a:4b:29:4b:7e:11:68:e1:a9:f5:53:72:a1:04:1e:ee:63:
+         36:2c:0a:fd:4f:e0:c3:64:e2:b4:d3:03:76:80:83:4e:88:3f:
+         f1:18:25:62:e1:82:cf:37:ca:31:95:48:db:a0:58:9f:ae:54:
+         7b:ea:fa:5f:f1:b5:fa:8e:29:b9:d6:7e:de:ee:78:82:f3:bf:
+         20:37:f2:dc:f2:88:46:ae:16:59:a4:7e:67:50:a4:4d:91:bc:
+         39:c2:65:0e:a6:97:5e:ed:18:2e:45:99:67:d3:66:21:93:28:
+         ba:7d:19:a9:47:88:74:2b:ef:27:5d:32:41:ab:96:c5:cc:12:
+         07:ec:f9:72:9c:a9:e5:3e:88:40:f6:11:82:b3:0b:6a:86:95:
+         db:f5:32:a9:e9:9f:fd:1d:67:53:27:08:19:d3:8e:77:2f:74:
+         42:0b:e9:31:13:20:1e:75:5c:c3:24:1f:68:60:a8:90:bc:22:
+         28:3a:92:35:e6:1d:f0:e6:fb:11:23:13:50:bc:fd:46:42:90:
+         d1:00:9c:a6:1b:d8:d6:58:e7:a6:9a:a9:4e:5b:93:37:e2:9b:
+         fa:fb:89:a6:d6:13:f8:a0:fb:84:b2:5f:74:30:7b:5e:51:5d:
+         e9:4e:74:17
 -----BEGIN CERTIFICATE-----
-MIID1TCCAr2gAwIBAgIJANiR6kOOl/NhMA0GCSqGSIb3DQEBCwUAMFcxCzAJBgNV
+MIID6zCCAtOgAwIBAgIJAKog3+QU/BjIMA0GCSqGSIb3DQEBCwUAMFcxCzAJBgNV
 BAYTAlVTMQswCQYDVQQIDAJDQTEPMA0GA1UECgwGQXBhY2hlMRYwFAYDVQQLDA1B
-cGFjaGUgUHVsc2FyMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMjEwMjE2MTkxMjE5
-WhcNMjQwMjE2MTkxMjE5WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExDzAN
+cGFjaGUgUHVsc2FyMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMjEwNDEzMDgwMTQ4
+WhcNMjQwNDEyMDgwMTQ4WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExDzAN
 BgNVBAoMBkFwYWNoZTEWMBQGA1UECwwNQXBhY2hlIFB1bHNhcjESMBAGA1UEAwwJ
-bG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4zEynMyk
-HS457XzuFDspceCQVvDKAIZmSzIe46U+5rwMmOr/DNo+AHqC9aRZHPJMNS1Sg9b4
-bTMs9+xIMNoT3amPmIghgO5FtURzYmkrxEcc+56PCwG3lChi8USzZET8AmJhJgV6
-JkVKb20nE+QHbHwB0x6rU5LcLdC/zuvrUs/t4yrH/dxyiwluPyMauO7Mu3++AhaL
-4OfTEIhUaAFkxAYKx0CgZflmJZuNojfiEl36lPQnmLRIle/wnIgynKl+bo+D1tlE
-tIrRaM+tqudkuAQvAh3AhXe4OdrnIShKZ3xW9K5i4J903PS1CB+ZUHMIVZABDRXc
-q6PLdYxzekM0DwIDAQABo4GjMIGgMAwGA1UdEwQFMAMBAf8wcQYDVR0jBGowaKFb
+bG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxtJj4RIB
+PX9vSlD7JfTrLnPnqE7MCo2kyzC3iXppFMQMlKFKuDkkG/wctsY5KpQmaafUE1no
+NbU02aQ2tgPErU+keJQ8a5SZcLVPbO5bys4VLas4BI9SDUjVznQIAPJprlkItnU9
+E8kzMtxH3v0eu8dQobHKhEPEfdn9hdyT0bNZWOKFxiXSvzp8b4oErshFvnTXgNK3
+AzwRcZKe4/g3oZQREu4siFr4VScd/W8iQTOf4NXlEzBkH2OmHPJ/4RzlqDpjRXsb
+XE5qG8kOI64IPLcX1Ac8D4lUS1jyNtdTEJrSX9mCrP1ugGswTJa9Ob2fKEkxHQgj
+XBigw84l80kMDQIDAQABo4G5MIG2MAwGA1UdEwQFMAMBAf8wcQYDVR0jBGowaKFb
 pFkwVzELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMQ8wDQYDVQQKDAZBcGFjaGUx
-FjAUBgNVBAsMDUFwYWNoZSBQdWxzYXIxEjAQBgNVBAMMCWxvY2FsaG9zdIIJANiR
-6kOOl/NhMB0GA1UdDgQWBBSuflAXXRNuIuXGl0O5jzq2+vtI0jANBgkqhkiG9w0B
-AQsFAAOCAQEAvbUVB8G+BN06CV0WFP9nrd/DI/CSHM41x504kbsA01FqZTIOyVun
-zetR/3kiCZvxs6KutkdZXu8qekenygaduuWvsQYoJZGSwAyrIiLL5Th+XvGLsIIp
-GjbblbwOF/edfZ+sUrnDZKKPpqT0pSeXh2ZiRGzzOmOqXuquWHTVw5ygDNCj/gDG
-fCG97aWPpu6Jd+GwSoGXYflZ31CxBXMV9c+OMa1O5QBgunVcknFou8oE9TM0oEmY
-0BfL5EkJbPN6iK32/fHtje4PRb03bCYKq7O9yPPNg7708pRuytGJ4PffZ+xq7i6N
-QDNFn/Jqj2EmpgJKkg8PZRtupjWaBO665A==
+FjAUBgNVBAsMDUFwYWNoZSBQdWxzYXIxEjAQBgNVBAMMCWxvY2FsaG9zdIIJAKog
+3+QU/BjIMB0GA1UdDgQWBBRZoKR4VuFWYU7c6YD/7Ga2LwRtqjAUBgNVHREEDTAL
+gglsb2NhbGhvc3QwDQYJKoZIhvcNAQELBQADggEBAB3R0IZJqMqn7BNdDGCaNrpV
+FwQqSylLfhFo4an1U3KhBB7uYzYsCv1P4MNk4rTTA3aAg06IP/EYJWLhgs83yjGV
+SNugWJ+uVHvq+l/xtfqOKbnWft7ueILzvyA38tzyiEauFlmkfmdQpE2RvDnCZQ6m
+l17tGC5FmWfTZiGTKLp9GalHiHQr7yddMkGrlsXMEgfs+XKcqeU+iED2EYKzC2qG
+ldv1Mqnpn/0dZ1MnCBnTjncvdEIL6TETIB51XMMkH2hgqJC8Iig6kjXmHfDm+xEj
+E1C8/UZCkNEAnKYb2NZY56aaqU5bkzfim/r7iabWE/ig+4SyX3Qwe15RXelOdBc=
 -----END CERTIFICATE-----
diff --git a/integration-tests/certs/client-cert.pem b/integration-tests/certs/client-cert.pem
index dcc387e..d4c4811 100644
--- a/integration-tests/certs/client-cert.pem
+++ b/integration-tests/certs/client-cert.pem
@@ -1,78 +1,81 @@
 Certificate:
     Data:
         Version: 3 (0x2)
-        Serial Number: 11302484053501346330 (0x9cda7d5d1077021a)
+        Serial Number: 9806047019225563731 (0x881613cf2fb7c253)
     Signature Algorithm: sha1WithRSAEncryption
         Issuer: C=US, ST=CA, O=Apache, OU=Apache Pulsar, CN=localhost
         Validity
-            Not Before: Feb 16 19:12:19 2021 GMT
-            Not After : Feb 14 19:12:19 2031 GMT
+            Not Before: Apr 13 08:01:48 2021 GMT
+            Not After : Apr 11 08:01:48 2031 GMT
         Subject: C=US, ST=CA, O=Apache, OU=Apache Pulsar, CN=localhost
         Subject Public Key Info:
             Public Key Algorithm: rsaEncryption
                 Public-Key: (2048 bit)
                 Modulus:
-                    00:be:80:9c:b6:0e:88:4b:61:3f:c9:fc:60:45:63:
-                    34:e0:1c:49:4a:0c:f0:0a:6a:e6:93:1d:38:db:03:
-                    14:68:56:54:41:be:8f:e9:48:3a:21:68:b3:f5:e4:
-                    60:e5:a7:90:af:be:37:0d:08:09:3e:bc:d4:9e:8c:
-                    7a:ae:ca:44:c6:cc:b5:96:9b:3f:8e:a0:50:da:7d:
-                    49:98:23:98:d5:b1:64:e6:f7:42:e0:21:ac:0a:75:
-                    58:72:bd:97:1a:72:bd:77:a6:00:38:d9:a3:e0:73:
-                    11:5b:a9:f1:c7:b8:5f:96:e9:d6:87:ae:82:37:1a:
-                    1d:81:78:74:46:a7:0a:ec:1d:4d:ec:95:b9:13:e0:
-                    e3:1f:af:7a:cd:61:2b:d2:af:9e:a4:4c:ca:00:8d:
-                    6d:24:39:7f:12:bc:c9:26:7a:68:f3:7b:c7:fa:2e:
-                    61:d4:88:30:37:b5:2a:ca:39:8d:10:ca:6b:00:b9:
-                    dd:6b:98:1f:84:bb:70:fc:fa:35:43:fe:9a:5d:da:
-                    7f:eb:8a:3a:22:0c:bf:70:d6:14:4d:50:89:7e:a7:
-                    92:1b:a9:2c:e4:14:a3:7f:e4:c3:aa:83:a1:c2:d8:
-                    54:52:07:5b:2b:dd:f4:db:88:0b:37:58:13:a9:2a:
-                    00:36:c1:ea:a5:84:90:19:ce:2c:71:47:9d:c2:94:
-                    71:8f
+                    00:a8:90:05:5f:14:7a:de:41:98:14:82:fe:f2:c3:
+                    4a:81:1b:47:91:36:83:5e:9d:b2:78:7b:26:50:1c:
+                    b9:45:8f:de:b5:61:2e:3a:6d:86:e3:e4:64:2b:13:
+                    4b:52:34:89:20:2d:0b:40:b8:76:aa:4f:cc:b5:dc:
+                    4d:c7:dc:11:32:e9:84:7f:51:b0:ae:1d:c8:70:7e:
+                    16:e4:cf:da:c3:56:4a:c8:9f:93:4a:7f:fb:fd:32:
+                    20:0a:bd:78:3a:b5:65:cf:fb:12:a2:ca:c2:71:a0:
+                    ac:9d:c2:7b:94:b3:b8:18:c0:2b:f4:ab:ce:72:d4:
+                    7a:00:3f:b3:4a:be:97:ea:7c:8c:8e:7a:50:58:22:
+                    0c:00:3a:26:8c:47:8e:64:dd:b5:86:8f:c9:bf:61:
+                    48:00:55:81:7e:f4:5b:12:54:00:b2:7e:15:c0:b7:
+                    76:0c:88:9a:7a:89:8b:2b:be:f8:04:af:71:5a:43:
+                    ca:e3:9e:d1:71:ff:10:65:69:db:c3:c7:58:cb:ac:
+                    b2:34:34:be:ef:26:67:de:48:71:3d:ba:a4:6e:6e:
+                    fa:27:8b:97:37:ab:e2:f7:d1:ee:3f:58:e0:65:f5:
+                    04:d8:05:ac:49:a5:c0:88:6b:91:5f:c8:79:aa:28:
+                    c9:36:e2:12:b0:c1:71:16:c2:38:b0:6f:b8:6f:f8:
+                    fa:d9
                 Exponent: 65537 (0x10001)
         X509v3 extensions:
             X509v3 Basic Constraints: 
                 CA:FALSE
             X509v3 Authority Key Identifier: 
-                keyid:AE:7E:50:17:5D:13:6E:22:E5:C6:97:43:B9:8F:3A:B6:FA:FB:48:D2
+                keyid:59:A0:A4:78:56:E1:56:61:4E:DC:E9:80:FF:EC:66:B6:2F:04:6D:AA
 
             X509v3 Subject Key Identifier: 
-                11:32:C6:4D:77:0F:D5:45:6C:73:CF:4C:26:97:30:21:EB:09:BB:1B
+                E0:D9:A8:7D:49:EC:BF:DF:1D:88:45:9B:D7:E5:BE:5A:90:C6:98:59
+            X509v3 Subject Alternative Name: 
+                DNS:localhost
     Signature Algorithm: sha1WithRSAEncryption
-         22:3d:be:97:7a:1e:fc:4a:1e:4f:e6:33:52:18:bc:df:aa:b5:
-         50:45:46:03:0c:fb:89:d3:08:ca:a0:f6:87:bd:27:24:96:87:
-         43:85:b7:33:ea:d6:e9:77:18:a5:2a:c9:78:bb:9d:da:2e:0d:
-         f1:1c:e2:dd:73:39:e3:1d:8d:a6:df:d5:29:06:28:72:de:75:
-         d5:06:0a:4c:42:83:83:a1:6c:ad:3d:37:47:d1:c8:5f:8b:2f:
-         1c:f4:0a:44:25:c1:e8:51:0a:1f:ab:29:bc:29:4d:5d:a4:5f:
-         50:30:cd:18:d6:66:c0:3c:d4:1d:41:72:5d:4d:42:36:7d:2b:
-         07:71:3b:51:a6:19:0a:24:46:5b:f2:4c:97:0d:ff:56:a1:4e:
-         49:0c:53:c3:9d:6e:97:9e:09:de:16:20:e9:ef:b5:2a:66:f5:
-         ad:a5:38:c8:ed:4d:63:19:cd:2d:34:c5:57:ee:13:e4:ac:b8:
-         4b:a0:f7:05:ab:75:5e:d9:40:af:cf:0d:a9:55:a2:c9:74:6a:
-         f1:44:a0:41:0b:80:30:ed:32:bf:e1:ae:81:0f:5e:78:c7:01:
-         cd:b0:05:c5:f0:d3:eb:fa:7a:12:5b:13:92:4d:1d:09:3c:d3:
-         be:70:5e:d3:17:a3:70:1b:b3:f2:88:1a:55:cd:a1:a8:2f:4c:
-         77:c2:f0:d1
+         73:02:b5:ce:ba:8d:2c:34:1b:04:1d:40:00:1f:71:01:aa:b5:
+         2f:db:b3:31:35:a0:74:f2:ca:24:e5:df:18:f2:85:a3:be:cc:
+         62:4d:67:b5:ff:db:4e:33:38:4b:58:69:34:8b:cc:91:22:a4:
+         aa:cf:c1:fe:61:46:c6:f7:f3:cf:65:b4:d3:47:b8:07:be:bb:
+         dc:8d:ad:c4:c9:cb:23:6b:64:14:78:1e:16:a5:af:ca:28:d4:
+         8c:b5:ec:31:76:74:41:91:a0:50:6b:66:99:8d:d5:a6:3a:4e:
+         7b:5b:00:74:85:22:e7:6f:82:d8:ba:42:0f:71:44:3c:48:27:
+         a9:d5:af:86:e7:4e:a3:11:83:92:12:a5:da:46:d1:9b:c3:a8:
+         70:f7:18:cb:1a:91:55:6c:04:bb:4b:7c:b3:49:89:71:39:cd:
+         e0:f4:41:58:bb:db:66:97:4a:c2:2f:ca:65:b6:e5:d2:dc:00:
+         9f:7e:35:68:ef:08:40:c8:f1:a8:45:f3:5e:4b:93:9d:ee:82:
+         19:3a:27:a8:a1:32:95:89:0c:a9:00:ee:ae:a9:02:69:f3:0b:
+         1f:42:3a:ed:67:64:f4:88:fe:03:7e:93:90:f3:a3:d2:45:73:
+         77:8d:2a:42:65:b9:d7:a7:29:9a:df:da:23:c9:2a:c4:f9:8f:
+         a4:66:13:72
 -----BEGIN CERTIFICATE-----
-MIIDfjCCAmagAwIBAgIJAJzafV0QdwIaMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
+MIIDlDCCAnygAwIBAgIJAIgWE88vt8JTMA0GCSqGSIb3DQEBBQUAMFcxCzAJBgNV
 BAYTAlVTMQswCQYDVQQIDAJDQTEPMA0GA1UECgwGQXBhY2hlMRYwFAYDVQQLDA1B
-cGFjaGUgUHVsc2FyMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMjEwMjE2MTkxMjE5
-WhcNMzEwMjE0MTkxMjE5WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExDzAN
+cGFjaGUgUHVsc2FyMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMjEwNDEzMDgwMTQ4
+WhcNMzEwNDExMDgwMTQ4WjBXMQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0ExDzAN
 BgNVBAoMBkFwYWNoZTEWMBQGA1UECwwNQXBhY2hlIFB1bHNhcjESMBAGA1UEAwwJ
-bG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvoCctg6I
-S2E/yfxgRWM04BxJSgzwCmrmkx042wMUaFZUQb6P6Ug6IWiz9eRg5aeQr743DQgJ
-PrzUnox6rspExsy1lps/jqBQ2n1JmCOY1bFk5vdC4CGsCnVYcr2XGnK9d6YAONmj
-4HMRW6nxx7hflunWh66CNxodgXh0RqcK7B1N7JW5E+DjH696zWEr0q+epEzKAI1t
-JDl/ErzJJnpo83vH+i5h1IgwN7UqyjmNEMprALnda5gfhLtw/Po1Q/6aXdp/64o6
-Igy/cNYUTVCJfqeSG6ks5BSjf+TDqoOhwthUUgdbK93024gLN1gTqSoANsHqpYSQ
-Gc4scUedwpRxjwIDAQABo00wSzAJBgNVHRMEAjAAMB8GA1UdIwQYMBaAFK5+UBdd
-E24i5caXQ7mPOrb6+0jSMB0GA1UdDgQWBBQRMsZNdw/VRWxzz0wmlzAh6wm7GzAN
-BgkqhkiG9w0BAQUFAAOCAQEAIj2+l3oe/EoeT+YzUhi836q1UEVGAwz7idMIyqD2
-h70nJJaHQ4W3M+rW6XcYpSrJeLud2i4N8Rzi3XM54x2Npt/VKQYoct511QYKTEKD
-g6FsrT03R9HIX4svHPQKRCXB6FEKH6spvClNXaRfUDDNGNZmwDzUHUFyXU1CNn0r
-B3E7UaYZCiRGW/JMlw3/VqFOSQxTw51ul54J3hYg6e+1Kmb1raU4yO1NYxnNLTTF
-V+4T5Ky4S6D3Bat1XtlAr88NqVWiyXRq8USgQQuAMO0yv+GugQ9eeMcBzbAFxfDT
-6/p6ElsTkk0dCTzTvnBe0xejcBuz8ogaVc2hqC9Md8Lw0Q==
+bG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqJAFXxR6
+3kGYFIL+8sNKgRtHkTaDXp2yeHsmUBy5RY/etWEuOm2G4+RkKxNLUjSJIC0LQLh2
+qk/MtdxNx9wRMumEf1Gwrh3IcH4W5M/aw1ZKyJ+TSn/7/TIgCr14OrVlz/sSosrC
+caCsncJ7lLO4GMAr9KvOctR6AD+zSr6X6nyMjnpQWCIMADomjEeOZN21ho/Jv2FI
+AFWBfvRbElQAsn4VwLd2DIiaeomLK774BK9xWkPK457Rcf8QZWnbw8dYy6yyNDS+
+7yZn3khxPbqkbm76J4uXN6vi99HuP1jgZfUE2AWsSaXAiGuRX8h5qijJNuISsMFx
+FsI4sG+4b/j62QIDAQABo2MwYTAJBgNVHRMEAjAAMB8GA1UdIwQYMBaAFFmgpHhW
+4VZhTtzpgP/sZrYvBG2qMB0GA1UdDgQWBBTg2ah9Sey/3x2IRZvX5b5akMaYWTAU
+BgNVHREEDTALgglsb2NhbGhvc3QwDQYJKoZIhvcNAQEFBQADggEBAHMCtc66jSw0
+GwQdQAAfcQGqtS/bszE1oHTyyiTl3xjyhaO+zGJNZ7X/204zOEtYaTSLzJEipKrP
+wf5hRsb3889ltNNHuAe+u9yNrcTJyyNrZBR4Hhalr8oo1Iy17DF2dEGRoFBrZpmN
+1aY6TntbAHSFIudvgti6Qg9xRDxIJ6nVr4bnTqMRg5ISpdpG0ZvDqHD3GMsakVVs
+BLtLfLNJiXE5zeD0QVi722aXSsIvymW25dLcAJ9+NWjvCEDI8ahF815Lk53ughk6
+J6ihMpWJDKkA7q6pAmnzCx9COu1nZPSI/gN+k5Dzo9JFc3eNKkJludenKZrf2iPJ
+KsT5j6RmE3I=
 -----END CERTIFICATE-----
diff --git a/integration-tests/certs/client-key.pem b/integration-tests/certs/client-key.pem
index 92e3794..d4ad41e 100644
--- a/integration-tests/certs/client-key.pem
+++ b/integration-tests/certs/client-key.pem
@@ -1,27 +1,27 @@
 -----BEGIN RSA PRIVATE KEY-----
-MIIEpAIBAAKCAQEAvoCctg6IS2E/yfxgRWM04BxJSgzwCmrmkx042wMUaFZUQb6P
-6Ug6IWiz9eRg5aeQr743DQgJPrzUnox6rspExsy1lps/jqBQ2n1JmCOY1bFk5vdC
-4CGsCnVYcr2XGnK9d6YAONmj4HMRW6nxx7hflunWh66CNxodgXh0RqcK7B1N7JW5
-E+DjH696zWEr0q+epEzKAI1tJDl/ErzJJnpo83vH+i5h1IgwN7UqyjmNEMprALnd
-a5gfhLtw/Po1Q/6aXdp/64o6Igy/cNYUTVCJfqeSG6ks5BSjf+TDqoOhwthUUgdb
-K93024gLN1gTqSoANsHqpYSQGc4scUedwpRxjwIDAQABAoIBADkDiOM7yjK1MkAj
-VjSuEj8YeB6/bH/RqpxQ3Jk/fLYVUDY08CEjSrKBAYsQBKf21GikhxlRVGMAZKnm
-5Ti2tIrC7X3bjNQEyu35HTN5Q96ArvApBEzMBw9CO2xrrXNu76GFfv+Es3UTOxix
-7fiWB5XE+j21TqdrG3WKNVKH3872GMentMRlealvV5joKAtlrEL9kmwPYYoZONwD
-IKXz8NxXXxiU9v3ALcpq2BLv8zR2vgHJqAlBtBTMTFXF4CRMXVCdkfS0+qBIgQRT
-khevQzwF2YvgCdq3k7jaZd5p0dLATL9RmRpT05H/uDGf6EZrIPhCGL41VjVysOpq
-NMAcVkECgYEA/fQNoOeZUkX4dGsxB4/4mTfJzve0jp3tOV2np+Ho2noi5lksPt9O
-nxMLl+Dvn5lOdQDwqpA32ixE2sohvem/SC8U27xDncAiXbdYpy3749uAX14XSNMY
-rvyzwnWMvySCIKpEThAiPrbMDI+A52564XwIX9ALqygDpa6kHWmsYx8CgYEAwAmm
-PSJ2G69IfOrsSRKiqigEdpAD5L59TxAAk+AjuisIYnOqTYYcn7OL3IAxzksbwyRc
-m/Bd7iBPGlx6d7CKN8dqX/93sdl0yxGVJi6VbLbY6yQweZrskwJXa4sPDeQi/h8T
-8P7ZrhRnE5l0vJQ6VvBaqwQRBZcRk4K40B3LE5ECgYAqKtYAe7RHpvqkShAwE0XS
-GyNVTGgcrjhvqA9onuhNLq6Q1Qm1vtqNK1pnc6AWPhx67RcJGz1K6sLcGWgU4rUy
-RS8PdWk8zmRmb0U3LTmAEjh0nWj8XKDinW8zwNUIAAXjaqJA3AjzpOKoy0M6KmwY
-+MFjN2ZYmyJP5nm1sJBBPQKBgQCFFyrrO7L3ROfMC1Ru4Edz/f/5252XLp3R6Zk8
-4x5fq+tHsDnbOJMl612HGCFsg4leGRlFybSPkDzCXE1UHjnspnwGxBrEW50iD7F6
-oNZWXfLAJHikDsbeq8TCd8GbMv09eXRslXNvvhwh7TyGZYR5NGLiGxNg+G3p+bL0
-KXHQwQKBgQDbWF9IHCJOm8VI1PIVaPAqQ+ZmYaH9og6pJtiNgrCMTDSuvCONhd4K
-SqNr9+ap4YgkCg4JIbjJw3WNhoOQRe4GPiU9E1zi2XBVTyfg0bs7edNCYn9+Dgnp
-x5E+KfZWh0dLWwq6zt4LtGKEJk5msgdlqZaoaDCbeMmTCvnSmhrTqQ==
+MIIEpAIBAAKCAQEAqJAFXxR63kGYFIL+8sNKgRtHkTaDXp2yeHsmUBy5RY/etWEu
+Om2G4+RkKxNLUjSJIC0LQLh2qk/MtdxNx9wRMumEf1Gwrh3IcH4W5M/aw1ZKyJ+T
+Sn/7/TIgCr14OrVlz/sSosrCcaCsncJ7lLO4GMAr9KvOctR6AD+zSr6X6nyMjnpQ
+WCIMADomjEeOZN21ho/Jv2FIAFWBfvRbElQAsn4VwLd2DIiaeomLK774BK9xWkPK
+457Rcf8QZWnbw8dYy6yyNDS+7yZn3khxPbqkbm76J4uXN6vi99HuP1jgZfUE2AWs
+SaXAiGuRX8h5qijJNuISsMFxFsI4sG+4b/j62QIDAQABAoIBAEhOp482yNIojyVV
+YCohLged/SBEoyvmN9XI9hFtkOhPI/jofB0iOLfvlCyKNUhJMv1zdR1JQ9YhLy5g
+F5mBRHtzgpvfHgxCyGXBIO78XtpmXOQc1+Egb6cYZnhU+Isun0MNb9wXYDu/8ewm
+rj6GrZUocnKddIz7qm4K5U1N3vr9ygjpzY4ltpLagZFD+kY8l4eh9ayUwx7qIRv1
+036TRajcDabN/T8dGwQ/87lUQtSPcmTPRNXNRePRLCFC96J8cLE2cGG8iQdEpSrJ
+ehzpkXWBShGY5NjJEhgIdyVGMOMfealsRjQH9mX3wyLV7JSE/2NSvBIJPBc4arwW
+if+dvvUCgYEA28ErIxPL+rNl7qdzjGAQFFtESjeBUhrUYulbJk1TOz914D87oRCo
+GDEGnQdMNgXDe4ddPiDKk9ukSsCxR7Ow+7Xtag/9BHLZers1DfAniEdLEo51fr2H
+CrAazeEaUsvg0ThNRrZWd17a2RLX4Npqa4JQFddsb+TJnUqmSQmIBocCgYEAxF1X
+kYHmcDeObzTZAJGmDCVHyggxsx2BMS2gIbmasuPQVqBBq7kilzywuZ/5BqFRSMmI
+kaZ/vRvCLCD0amFLgVbrHPoX9ZqsHhsQyyeWPTUX1XgBEMkO5nUYV3QPPIABPE/j
+PD1yW82elmueJXzaxNHrCrXlvGmh04OSCGZJ658CgYEAyr2hfaAzDV1bil2k5fpu
+e0lkbJBe2A/2qDagqGqogj3fE39jFQrX+lGox1DHSAPzQNE2/i2kl9sI7QpvxDYM
+73z9x19u0B7hdDCpFu3cmwcjRMB3t/FFWF/2qsCl14NPK9ckvDtW9JRnleOjlUgM
+7JjyJ+Ryn5zVQS9w0GthupsCgYEAkzF9AL+k4x1iEY5F1B1ZoOUZKSJ3TtKhEyIm
+VOFySwJmG2lJkNXYThHaT2rRGt01tyAK1VM/kZwLM1UIoe2I0c07tJ7r8tkch/y1
+2/MytXLlCMlJ0zFDkBDaTEPdX/Z79p/TsEZHQPWjGRJWm7c5rivEymSLb26nNYv8
+nXs/kfUCgYBRl7KEAHwtjc6d8Hqe+bfSNRe6tiWxXfXeZD9OATzTDDnkvl9jChwD
+B8Vj1hR/lzvG3SRs9uS+ehYfnRxQ/ZQJjCFtUy9Q/bfUZ2ajNj4PL5v0IV6pmeE9
+Ek3E8iazpIcNmNvg+S6MB0Cf2BrLssc5RHkDnbAAuKEtqfqpn/pAVA==
 -----END RSA PRIVATE KEY-----
diff --git a/integration-tests/certs/openssl.cnf b/integration-tests/certs/openssl.cnf
index 53b095e..82dbbd9 100644
--- a/integration-tests/certs/openssl.cnf
+++ b/integration-tests/certs/openssl.cnf
@@ -11,9 +11,9 @@ CN = localhost
 basicConstraints = CA:FALSE
 authorityKeyIdentifier=keyid,issuer
 subjectKeyIdentifier=hash
+subjectAltName=DNS:localhost
 [v3_ca]
 basicConstraints = CA:TRUE
 authorityKeyIdentifier=keyid,issuer
 subjectKeyIdentifier=hash
-
-
+subjectAltName=DNS:localhost
\ No newline at end of file
diff --git a/pulsar-test-service-start.sh b/pulsar-test-service-start.sh
index 47735ec..c0bdadd 100755
--- a/pulsar-test-service-start.sh
+++ b/pulsar-test-service-start.sh
@@ -36,7 +36,7 @@ else
     docker run -d --rm --name pulsar-client-go-test \
                 -p 8080:8080 \
                 -p 6650:6650 \
-                -p 8443:8843 \
+                -p 8443:8443 \
                 -p 6651:6651 \
                 ${IMAGE_NAME} \
                 /pulsar/bin/pulsar standalone \
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index dc70bb6..ee879c8 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -67,9 +67,9 @@ func newClient(options ClientOptions) (Client, error) {
 
 	var tlsConfig *internal.TLSOptions
 	switch url.Scheme {
-	case "pulsar":
+	case "pulsar", "http":
 		tlsConfig = nil
-	case "pulsar+ssl":
+	case "pulsar+ssl", "https":
 		tlsConfig = &internal.TLSOptions{
 			AllowInsecureConnection: options.TLSAllowInsecureConnection,
 			TrustCertsFilePath:      options.TLSTrustCertsFilePath,
@@ -126,8 +126,24 @@ func newClient(options ClientOptions) (Client, error) {
 	serviceNameResolver := internal.NewPulsarServiceNameResolver(url)
 
 	c.rpcClient = internal.NewRPCClient(url, serviceNameResolver, c.cnxPool, operationTimeout, logger, metrics)
-	c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver,
-		tlsConfig != nil, options.ListenerName, logger, metrics)
+
+	switch url.Scheme {
+	case "pulsar", "pulsar+ssl":
+		c.lookupService = internal.NewLookupService(c.rpcClient, url, serviceNameResolver,
+			tlsConfig != nil, options.ListenerName, logger, metrics)
+	case "http", "https":
+		httpClient, err := internal.NewHTTPClient(url, serviceNameResolver, tlsConfig,
+			operationTimeout, logger, metrics, authProvider)
+		if err != nil {
+			return nil, newError(InvalidConfiguration, fmt.Sprintf("Failed to init http client with err: '%s'",
+				err.Error()))
+		}
+		c.lookupService = internal.NewHTTPLookupService(httpClient, url, serviceNameResolver,
+			tlsConfig != nil, logger, metrics)
+	default:
+		return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
+	}
+
 	c.handlers = internal.NewClientHandlers()
 
 	return c, nil
@@ -170,13 +186,9 @@ func (c *client) TopicPartitions(topic string) ([]string, error) {
 		return nil, err
 	}
 	if r != nil {
-		if r.Error != nil {
-			return nil, newError(LookupError, r.GetError().String())
-		}
-
-		if r.GetPartitions() > 0 {
-			partitions := make([]string, r.GetPartitions())
-			for i := 0; i < int(r.GetPartitions()); i++ {
+		if r.Partitions > 0 {
+			partitions := make([]string, r.Partitions)
+			for i := 0; i < r.Partitions; i++ {
 				partitions[i] = fmt.Sprintf("%s-partition-%d", topic, i)
 			}
 			return partitions, nil
@@ -190,6 +202,7 @@ func (c *client) TopicPartitions(topic string) ([]string, error) {
 func (c *client) Close() {
 	c.handlers.Close()
 	c.cnxPool.Close()
+	c.lookupService.Close()
 }
 
 func (c *client) namespaceTopics(namespace string) ([]string, error) {
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 65732ea..3fb7248 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -514,3 +514,421 @@ func TestRetryWithMultipleHosts(t *testing.T) {
 	assert.Nil(t, err)
 
 }
+
+func TestHTTPSConnectionCAError(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL:              webServiceURLTLS,
+		OperationTimeout: 5 * time.Second,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newTopicName(),
+	})
+
+	// The client should fail because it wouldn't trust the
+	// broker certificate
+	assert.Error(t, err)
+	assert.Nil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPSInsecureConnection(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL:                        webServiceURLTLS,
+		TLSAllowInsecureConnection: true,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPSConnection(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL:                   webServiceURLTLS,
+		TLSTrustCertsFilePath: caCertsPath,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPSConnectionHostNameVerification(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL:                   webServiceURLTLS,
+		TLSTrustCertsFilePath: caCertsPath,
+		TLSValidateHostname:   true,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPSConnectionHostNameVerificationError(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL:                   "https://127.0.0.1:8443",
+		OperationTimeout:      5 * time.Second,
+		TLSTrustCertsFilePath: caCertsPath,
+		TLSValidateHostname:   true,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newTopicName(),
+	})
+
+	assert.Error(t, err)
+	assert.Nil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPTopicPartitions(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: webServiceURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// Create topic with 5 partitions
+	err = httpPut("admin/v2/persistent/public/default/TestHTTPTopicPartitions/partitions", 5)
+	assert.Nil(t, err)
+
+	partitionedTopic := "persistent://public/default/TestHTTPTopicPartitions"
+
+	partitions, err := client.TopicPartitions(partitionedTopic)
+	assert.Nil(t, err)
+	assert.Equal(t, len(partitions), 5)
+	for i := 0; i < 5; i++ {
+		assert.Equal(t, partitions[i],
+			fmt.Sprintf("%s-partition-%d", partitionedTopic, i))
+	}
+
+	// Non-Partitioned topic
+	topic := "persistent://public/default/TestHTTPTopicPartitions-nopartitions"
+
+	partitions, err = client.TopicPartitions(topic)
+	assert.Nil(t, err)
+	assert.Equal(t, len(partitions), 1)
+	assert.Equal(t, partitions[0], topic)
+}
+
+func TestRetryWithMultipleHttpHosts(t *testing.T) {
+	// Multi hosts included an unreached port and the actual port for verify retry logic
+	client, err := NewClient(ClientOptions{
+		URL: "http://localhost:8081,localhost:8080",
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+	var msgIDs [][]byte
+
+	for i := 0; i < 10; i++ {
+		if msgID, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			assert.Nil(t, err)
+		} else {
+			assert.NotNil(t, msgID)
+			msgIDs = append(msgIDs, msgID.Serialize())
+		}
+	}
+
+	assert.Equal(t, 10, len(msgIDs))
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		SubscriptionName:            "retry-multi-hosts-sub",
+		Type:                        Shared,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+	defer consumer.Close()
+
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		assert.Nil(t, err)
+		assert.Contains(t, msgIDs, msg.ID().Serialize())
+		consumer.Ack(msg)
+	}
+
+	err = consumer.Unsubscribe()
+	assert.Nil(t, err)
+
+}
+
+func TestHTTPSAuthError(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL:                   webServiceURLTLS,
+		TLSTrustCertsFilePath: caCertsPath,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+
+	assert.Error(t, err)
+	assert.Nil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPSAuth(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL:                   webServiceURLTLS,
+		TLSTrustCertsFilePath: caCertsPath,
+		Authentication:        NewAuthenticationTLS(tlsClientCertPath, tlsClientKeyPath),
+	})
+	assert.NoError(t, err)
+	t.Logf("TestHTTPSAuth client %v", client)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+	t.Logf("TestHTTPSAuth err %v", err)
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPSAuthWithCertSupplier(t *testing.T) {
+	supplier := func() (*tls.Certificate, error) {
+		cert, err := tls.LoadX509KeyPair(tlsClientCertPath, tlsClientKeyPath)
+		return &cert, err
+	}
+	client, err := NewClient(ClientOptions{
+		URL:                   webServiceURLTLS,
+		TLSTrustCertsFilePath: caCertsPath,
+		Authentication:        NewAuthenticationFromTLSCertSupplier(supplier),
+	})
+	assert.NoError(t, err)
+	t.Logf("TestHTTPSAuthWithCertSupplier client %v", client)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+	t.Logf("TestHTTPSAuthWithCertSupplier err %v", err)
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPTokenAuth(t *testing.T) {
+	token, err := ioutil.ReadFile(tokenFilePath)
+	assert.NoError(t, err)
+
+	client, err := NewClient(ClientOptions{
+		URL:            webServiceURL,
+		Authentication: NewAuthenticationToken(string(token)),
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPTokenAuthWithSupplier(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: webServiceURL,
+		Authentication: NewAuthenticationTokenFromSupplier(func() (s string, err error) {
+			token, err := ioutil.ReadFile(tokenFilePath)
+			if err != nil {
+				return "", err
+			}
+
+			return string(token), nil
+		}),
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPTokenAuthFromFile(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL:            serviceURL,
+		Authentication: NewAuthenticationTokenFromFile(tokenFilePath),
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPSTokenAuthFromFile(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL:                   webServiceURLTLS,
+		TLSTrustCertsFilePath: caCertsPath,
+		TLSValidateHostname:   true,
+		Authentication:        NewAuthenticationTokenFromFile(tokenFilePath),
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPOAuth2Auth(t *testing.T) {
+	server := mockOAuthServer()
+	defer server.Close()
+	kf, err := mockKeyFile(server.URL)
+	defer os.Remove(kf)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	params := map[string]string{
+		auth.ConfigParamType:      auth.ConfigParamTypeClientCredentials,
+		auth.ConfigParamIssuerURL: server.URL,
+		auth.ConfigParamClientID:  "client-id",
+		auth.ConfigParamAudience:  "audience",
+		auth.ConfigParamKeyFile:   kf,
+	}
+
+	oauth := NewAuthenticationOAuth2(params)
+	client, err := NewClient(ClientOptions{
+		URL:            webServiceURL,
+		Authentication: oauth,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPSOAuth2Auth(t *testing.T) {
+	server := mockOAuthServer()
+	defer server.Close()
+	kf, err := mockKeyFile(server.URL)
+	defer os.Remove(kf)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	params := map[string]string{
+		auth.ConfigParamType:      auth.ConfigParamTypeClientCredentials,
+		auth.ConfigParamIssuerURL: server.URL,
+		auth.ConfigParamClientID:  "client-id",
+		auth.ConfigParamAudience:  "audience",
+		auth.ConfigParamKeyFile:   kf,
+	}
+
+	oauth := NewAuthenticationOAuth2(params)
+	client, err := NewClient(ClientOptions{
+		URL:                   webServiceURLTLS,
+		TLSTrustCertsFilePath: caCertsPath,
+		TLSValidateHostname:   true,
+		Authentication:        oauth,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
+func TestHTTPOAuth2AuthFailed(t *testing.T) {
+	server := mockOAuthServer()
+	defer server.Close()
+	kf, err := mockKeyFile(server.URL)
+	defer os.Remove(kf)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	params := map[string]string{
+		auth.ConfigParamType:      auth.ConfigParamTypeClientCredentials,
+		auth.ConfigParamIssuerURL: "error-url",
+		auth.ConfigParamClientID:  "client-id",
+		auth.ConfigParamAudience:  "audience",
+	}
+
+	oauth := NewAuthenticationOAuth2(params)
+	client, err := NewClient(ClientOptions{
+		URL:            webServiceURL,
+		Authentication: oauth,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+
+	assert.Error(t, err)
+	assert.Nil(t, producer)
+
+	client.Close()
+}
diff --git a/pulsar/internal/auth/athenz.go b/pulsar/internal/auth/athenz.go
index 3d650f7..003428c 100644
--- a/pulsar/internal/auth/athenz.go
+++ b/pulsar/internal/auth/athenz.go
@@ -22,6 +22,7 @@ import (
 	"encoding/base64"
 	"errors"
 	"io/ioutil"
+	"net/http"
 	"regexp"
 	"strings"
 	"time"
@@ -31,8 +32,9 @@ import (
 )
 
 const (
-	minExpire = 2 * time.Hour
-	maxExpire = 24 * time.Hour
+	minExpire            = 2 * time.Hour
+	maxExpire            = 24 * time.Hour
+	AthenzRoleAuthHeader = "Athenz-Role-Auth"
 )
 
 type athenzAuthProvider struct {
@@ -47,6 +49,8 @@ type athenzAuthProvider struct {
 	roleToken          zts.RoleToken
 	zmsNewTokenBuilder func(domain, name string, privateKeyPEM []byte, keyVersion string) (zms.TokenBuilder, error)
 	ztsNewRoleToken    func(tok zms.Token, domain string, opts zts.RoleTokenOptions) zts.RoleToken
+
+	T http.RoundTripper
 }
 
 type privateKeyURI struct {
@@ -177,3 +181,21 @@ func parseURI(uri string) privateKeyURI {
 
 	return uriSt
 }
+
+func (p *athenzAuthProvider) RoundTrip(req *http.Request) (*http.Response, error) {
+	tok, err := p.roleToken.RoleTokenValue()
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Add(AthenzRoleAuthHeader, tok)
+	return p.T.RoundTrip(req)
+}
+
+func (p *athenzAuthProvider) Transport() http.RoundTripper {
+	return p.T
+}
+
+func (p *athenzAuthProvider) WithTransport(tripper http.RoundTripper) error {
+	p.T = tripper
+	return nil
+}
diff --git a/pulsar/internal/auth/disabled.go b/pulsar/internal/auth/disabled.go
index e23bb35..0389a39 100644
--- a/pulsar/internal/auth/disabled.go
+++ b/pulsar/internal/auth/disabled.go
@@ -17,7 +17,10 @@
 
 package auth
 
-import "crypto/tls"
+import (
+	"crypto/tls"
+	"net/http"
+)
 
 type disabled struct{}
 
@@ -45,3 +48,15 @@ func (disabled) GetTLSCertificate() (*tls.Certificate, error) {
 func (disabled) Close() error {
 	return nil
 }
+
+func (d disabled) RoundTrip(req *http.Request) (*http.Response, error) {
+	return nil, nil
+}
+
+func (d disabled) Transport() http.RoundTripper {
+	return nil
+}
+
+func (d disabled) WithTransport(tripper http.RoundTripper) error {
+	return nil
+}
diff --git a/pulsar/internal/auth/oauth2.go b/pulsar/internal/auth/oauth2.go
index 9ee63ab..ec65ba8 100644
--- a/pulsar/internal/auth/oauth2.go
+++ b/pulsar/internal/auth/oauth2.go
@@ -20,6 +20,9 @@ package auth
 import (
 	"crypto/tls"
 	"fmt"
+	"net/http"
+
+	xoauth2 "golang.org/x/oauth2"
 
 	"github.com/apache/pulsar-client-go/oauth2"
 	"github.com/apache/pulsar-client-go/oauth2/cache"
@@ -37,10 +40,12 @@ const (
 )
 
 type oauth2AuthProvider struct {
-	clock  clock.Clock
-	issuer oauth2.Issuer
-	store  store.Store
-	source cache.CachingTokenSource
+	clock            clock.Clock
+	issuer           oauth2.Issuer
+	store            store.Store
+	source           cache.CachingTokenSource
+	defaultTransport http.RoundTripper
+	tokenTransport   *transport
 }
 
 // NewAuthenticationOAuth2WithParams return a interface of Provider with string map.
@@ -143,3 +148,56 @@ func (p *oauth2AuthProvider) getRefresher(t oauth2.AuthorizationGrantType) (oaut
 		return nil, store.ErrUnsupportedAuthData
 	}
 }
+
+type transport struct {
+	source  cache.CachingTokenSource
+	wrapped *xoauth2.Transport
+}
+
+func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
+	if len(req.Header.Get("Authorization")) != 0 {
+		return t.wrapped.Base.RoundTrip(req)
+	}
+
+	res, err := t.wrapped.RoundTrip(req)
+	if err != nil {
+		return nil, err
+	}
+
+	if res.StatusCode == 401 {
+		err := t.source.InvalidateToken()
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return res, nil
+}
+
+func (t *transport) WrappedRoundTripper() http.RoundTripper { return t.wrapped.Base }
+
+func (p *oauth2AuthProvider) RoundTrip(req *http.Request) (*http.Response, error) {
+	return p.tokenTransport.RoundTrip(req)
+}
+
+func (p *oauth2AuthProvider) Transport() http.RoundTripper {
+	return &transport{
+		source: p.source,
+		wrapped: &xoauth2.Transport{
+			Source: p.source,
+			Base:   p.defaultTransport,
+		},
+	}
+}
+
+func (p *oauth2AuthProvider) WithTransport(tripper http.RoundTripper) error {
+	p.defaultTransport = tripper
+	p.tokenTransport = &transport{
+		source: p.source,
+		wrapped: &xoauth2.Transport{
+			Source: p.source,
+			Base:   p.defaultTransport,
+		},
+	}
+	return nil
+}
diff --git a/pulsar/internal/auth/provider.go b/pulsar/internal/auth/provider.go
index 669056d..de71839 100644
--- a/pulsar/internal/auth/provider.go
+++ b/pulsar/internal/auth/provider.go
@@ -22,6 +22,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"io"
+	"net/http"
 
 	"github.com/pkg/errors"
 )
@@ -41,6 +42,18 @@ type Provider interface {
 	GetData() ([]byte, error)
 
 	io.Closer
+
+	HTTPAuthProvider
+}
+
+type HTTPAuthProvider interface {
+	RoundTrip(req *http.Request) (*http.Response, error)
+	Transport() http.RoundTripper
+	WithTransport(tripper http.RoundTripper) error
+}
+
+type HTTPTransport struct {
+	T http.RoundTripper
 }
 
 // NewProvider get/create an authentication data provider which provides the data
diff --git a/pulsar/internal/auth/tls.go b/pulsar/internal/auth/tls.go
index 8dc6ee7..4b5b9cd 100644
--- a/pulsar/internal/auth/tls.go
+++ b/pulsar/internal/auth/tls.go
@@ -17,12 +17,16 @@
 
 package auth
 
-import "crypto/tls"
+import (
+	"crypto/tls"
+	"net/http"
+)
 
 type tlsAuthProvider struct {
 	certificatePath string
 	privateKeyPath  string
 	tlsCertSupplier func() (*tls.Certificate, error)
+	T               http.RoundTripper
 }
 
 // NewAuthenticationTLSWithParams initialize the authentication provider with map param.
@@ -72,3 +76,26 @@ func (p *tlsAuthProvider) GetData() ([]byte, error) {
 func (tlsAuthProvider) Close() error {
 	return nil
 }
+
+func (p *tlsAuthProvider) RoundTrip(req *http.Request) (*http.Response, error) {
+	return p.T.RoundTrip(req)
+}
+
+func (p *tlsAuthProvider) Transport() http.RoundTripper {
+	return p.T
+}
+
+func (p *tlsAuthProvider) WithTransport(tripper http.RoundTripper) error {
+	p.T = tripper
+	return p.configTLS()
+}
+
+func (p *tlsAuthProvider) configTLS() error {
+	cert, err := p.GetTLSCertificate()
+	if err != nil {
+		return err
+	}
+	transport := p.T.(*http.Transport)
+	transport.TLSClientConfig.Certificates = []tls.Certificate{*cert}
+	return nil
+}
diff --git a/pulsar/internal/auth/token.go b/pulsar/internal/auth/token.go
index d8d0cd3..b5af86b 100644
--- a/pulsar/internal/auth/token.go
+++ b/pulsar/internal/auth/token.go
@@ -19,7 +19,9 @@ package auth
 
 import (
 	"crypto/tls"
+	"fmt"
 	"io/ioutil"
+	"net/http"
 	"strings"
 
 	"github.com/pkg/errors"
@@ -27,6 +29,7 @@ import (
 
 type tokenAuthProvider struct {
 	tokenSupplier func() (string, error)
+	T             http.RoundTripper
 }
 
 // NewAuthenticationTokenWithParams return a interface of Provider with string map.
@@ -105,3 +108,18 @@ func (p *tokenAuthProvider) GetData() ([]byte, error) {
 func (p *tokenAuthProvider) Close() error {
 	return nil
 }
+
+func (p *tokenAuthProvider) RoundTrip(req *http.Request) (*http.Response, error) {
+	token, _ := p.tokenSupplier()
+	req.Header.Add("Authorization", strings.TrimSpace(fmt.Sprintf("Bearer %s", token)))
+	return p.T.RoundTrip(req)
+}
+
+func (p *tokenAuthProvider) Transport() http.RoundTripper {
+	return p.T
+}
+
+func (p *tokenAuthProvider) WithTransport(tripper http.RoundTripper) error {
+	p.T = tripper
+	return nil
+}
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
new file mode 100644
index 0000000..b60aa89
--- /dev/null
+++ b/pulsar/internal/http_client.go
@@ -0,0 +1,352 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+	"bytes"
+	"crypto/tls"
+	"crypto/x509"
+	"encoding/json"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"path"
+	"time"
+
+	"github.com/apache/pulsar-client-go/pulsar/internal/auth"
+
+	"github.com/apache/pulsar-client-go/pulsar/log"
+
+	"github.com/pkg/errors"
+)
+
+// httpClient is a base client that is used to make http httpRequest to the ServiceURL
+type httpClient struct {
+	ServiceNameResolver ServiceNameResolver
+	HTTPClient          *http.Client
+	requestTimeout      time.Duration
+	log                 log.Logger
+	metrics             *Metrics
+}
+
+func (c *httpClient) Close() {
+	if c.HTTPClient != nil {
+		CloseIdleConnections(c.HTTPClient)
+	}
+}
+
+type HTTPClient interface {
+	Get(endpoint string, obj interface{}) error
+	Closable
+}
+
+func NewHTTPClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsConfig *TLSOptions,
+	requestTimeout time.Duration, logger log.Logger, metrics *Metrics,
+	authProvider auth.Provider) (HTTPClient, error) {
+	h := &httpClient{
+		ServiceNameResolver: serviceNameResolver,
+		requestTimeout:      requestTimeout,
+		log:                 logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
+		metrics:             metrics,
+	}
+	c := &http.Client{Timeout: requestTimeout}
+	transport, err := getDefaultTransport(tlsConfig)
+	if err != nil {
+		return nil, err
+	}
+	c.Transport = transport
+	if authProvider.Name() != "" {
+		err = authProvider.WithTransport(c.Transport)
+		if err != nil {
+			return nil, err
+		}
+		c.Transport = authProvider
+	}
+	h.HTTPClient = c
+	return h, nil
+}
+
+func (c *httpClient) newRequest(method, path string) (*httpRequest, error) {
+	base, err := c.ServiceNameResolver.ResolveHost()
+	if err != nil {
+		return nil, err
+	}
+
+	u, err := url.Parse(path)
+	if err != nil {
+		return nil, err
+	}
+
+	req := &httpRequest{
+		method: method,
+		url: &url.URL{
+			Scheme: base.Scheme,
+			User:   base.User,
+			Host:   base.Host,
+			Path:   endpoint(base.Path, u.Path),
+		},
+		params: make(url.Values),
+	}
+	return req, nil
+}
+
+func (c *httpClient) doRequest(r *httpRequest) (*http.Response, error) {
+	req, err := r.toHTTP()
+	if err != nil {
+		return nil, err
+	}
+
+	if r.contentType != "" {
+		req.Header.Set("Content-Type", r.contentType)
+	} else if req.Body != nil {
+		req.Header.Set("Content-Type", "application/json")
+	}
+
+	req.Header.Set("Accept", "application/json")
+	req.Header.Set("User-Agent", c.useragent())
+	hc := c.HTTPClient
+	if hc == nil {
+		hc = http.DefaultClient
+	}
+
+	return hc.Do(req)
+}
+
+// MakeRequest can make a simple httpRequest and handle the response by yourself
+func (c *httpClient) MakeRequest(method, endpoint string) (*http.Response, error) {
+	req, err := c.newRequest(method, endpoint)
+	if err != nil {
+		return nil, err
+	}
+
+	resp, err := checkSuccessful(c.doRequest(req))
+	if err != nil {
+		return nil, err
+	}
+
+	return resp, nil
+}
+
+func (c *httpClient) Get(endpoint string, obj interface{}) error {
+	_, err := c.GetWithQueryParams(endpoint, obj, nil, true)
+	if _, ok := err.(*url.Error); ok {
+		// We can retry this kind of requests over a connection error because they're
+		// not specific to a particular broker.
+		backoff := Backoff{100 * time.Millisecond}
+		startTime := time.Now()
+		var retryTime time.Duration
+
+		for time.Since(startTime) < c.requestTimeout {
+			retryTime = backoff.Next()
+			c.log.Debugf("Retrying httpRequest in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
+			time.Sleep(retryTime)
+			_, err = c.GetWithQueryParams(endpoint, obj, nil, true)
+			if _, ok := err.(*url.Error); ok {
+				continue
+			} else {
+				// We either succeeded or encountered a non connection error
+				break
+			}
+		}
+	}
+	return err
+}
+
+func (c *httpClient) GetWithQueryParams(endpoint string, obj interface{}, params map[string]string,
+	decode bool) ([]byte, error) {
+	return c.GetWithOptions(endpoint, obj, params, decode, nil)
+}
+
+func (c *httpClient) GetWithOptions(endpoint string, obj interface{}, params map[string]string,
+	decode bool, file io.Writer) ([]byte, error) {
+
+	req, err := c.newRequest(http.MethodGet, endpoint)
+	if err != nil {
+		return nil, err
+	}
+
+	if params != nil {
+		query := req.url.Query()
+		for k, v := range params {
+			query.Add(k, v)
+		}
+		req.params = query
+	}
+
+	resp, err := checkSuccessful(c.doRequest(req))
+	if err != nil {
+		return nil, err
+	}
+	defer safeRespClose(resp)
+
+	if obj != nil {
+		if err := decodeJSONBody(resp, &obj); err != nil {
+			if err == io.EOF {
+				return nil, nil
+			}
+			return nil, err
+		}
+	} else if !decode {
+		if file != nil {
+			_, err := io.Copy(file, resp.Body)
+			if err != nil {
+				return nil, err
+			}
+		} else {
+			body, err := ioutil.ReadAll(resp.Body)
+			if err != nil {
+				return nil, err
+			}
+			return body, err
+		}
+	}
+
+	return nil, err
+}
+
+func (c *httpClient) useragent() string {
+	return "Pulsar-httpClient-Go-v2"
+}
+
+type httpRequest struct {
+	method      string
+	contentType string
+	url         *url.URL
+	params      url.Values
+
+	obj  interface{}
+	body io.Reader
+}
+
+func (r *httpRequest) toHTTP() (*http.Request, error) {
+	r.url.RawQuery = r.params.Encode()
+
+	// add a httpRequest body if there is one
+	if r.body == nil && r.obj != nil {
+		body, err := encodeJSONBody(r.obj)
+		if err != nil {
+			return nil, err
+		}
+		r.body = body
+	}
+
+	req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body)
+	if err != nil {
+		return nil, err
+	}
+
+	req.URL.Host = r.url.Host
+	req.URL.Scheme = r.url.Scheme
+	req.Host = r.url.Host
+	return req, nil
+}
+
+// respIsOk is used to validate a successful http status code
+func respIsOk(resp *http.Response) bool {
+	return resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusNoContent
+}
+
+// checkSuccessful checks for a valid response and parses an error
+func checkSuccessful(resp *http.Response, err error) (*http.Response, error) {
+	if err != nil {
+		safeRespClose(resp)
+		return nil, err
+	}
+
+	if !respIsOk(resp) {
+		defer safeRespClose(resp)
+		return nil, responseError(resp)
+	}
+
+	return resp, nil
+}
+
+func endpoint(parts ...string) string {
+	return path.Join(parts...)
+}
+
+// encodeJSONBody is used to JSON encode a body
+func encodeJSONBody(obj interface{}) (io.Reader, error) {
+	buf := bytes.NewBuffer(nil)
+	enc := json.NewEncoder(buf)
+	if err := enc.Encode(obj); err != nil {
+		return nil, err
+	}
+	return buf, nil
+}
+
+// decodeJSONBody is used to JSON decode a body
+func decodeJSONBody(resp *http.Response, out interface{}) error {
+	if resp.ContentLength == 0 {
+		return nil
+	}
+	dec := json.NewDecoder(resp.Body)
+	return dec.Decode(out)
+}
+
+// safeRespClose is used to close a response body
+func safeRespClose(resp *http.Response) {
+	if resp != nil {
+		// ignore error since it is closing a response body
+		_ = resp.Body.Close()
+	}
+}
+
+// responseError is used to parse a response into a client error
+func responseError(resp *http.Response) error {
+	var e error
+	body, err := ioutil.ReadAll(resp.Body)
+	reason := ""
+	code := resp.StatusCode
+	if err != nil {
+		reason = err.Error()
+		return errors.Errorf("Code: %d, Reason: %s", code, reason)
+	}
+
+	err = json.Unmarshal(body, &e)
+	if err != nil {
+		reason = string(body)
+	}
+
+	if reason == "" {
+		reason = "Unknown error"
+	}
+
+	return errors.Errorf("Code: %d, Reason: %s", code, reason)
+}
+
+func getDefaultTransport(tlsConfig *TLSOptions) (http.RoundTripper, error) {
+	transport := http.DefaultTransport.(*http.Transport)
+	if tlsConfig != nil {
+		cfg := &tls.Config{
+			InsecureSkipVerify: tlsConfig.AllowInsecureConnection,
+		}
+		if len(tlsConfig.TrustCertsFilePath) > 0 {
+			rootCA, err := ioutil.ReadFile(tlsConfig.TrustCertsFilePath)
+			if err != nil {
+				return nil, err
+			}
+			cfg.RootCAs = x509.NewCertPool()
+			cfg.RootCAs.AppendCertsFromPEM(rootCA)
+		}
+		transport.TLSClientConfig = cfg
+	}
+	transport.MaxIdleConnsPerHost = 10
+	return transport, nil
+}
diff --git a/pulsar/internal/auth/disabled.go b/pulsar/internal/http_client_go_1.11.go
similarity index 64%
copy from pulsar/internal/auth/disabled.go
copy to pulsar/internal/http_client_go_1.11.go
index e23bb35..d9e8762 100644
--- a/pulsar/internal/auth/disabled.go
+++ b/pulsar/internal/http_client_go_1.11.go
@@ -15,33 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package auth
+// +build !go1.12
 
-import "crypto/tls"
+package internal
 
-type disabled struct{}
+import "net/http"
 
-// NewAuthDisabled return a interface of Provider
-func NewAuthDisabled() Provider {
-	return &disabled{}
-}
-
-func (disabled) Init() error {
-	return nil
-}
-
-func (disabled) GetData() ([]byte, error) {
-	return nil, nil
-}
-
-func (disabled) Name() string {
-	return ""
-}
-
-func (disabled) GetTLSCertificate() (*tls.Certificate, error) {
-	return nil, nil
-}
+func CloseIdleConnections(c *http.Client) {
+	type closeIdler interface {
+		CloseIdleConnections()
+	}
 
-func (disabled) Close() error {
-	return nil
+	if tr, ok := c.Transport.(closeIdler); ok {
+		tr.CloseIdleConnections()
+	}
 }
diff --git a/pulsar/internal/auth/disabled.go b/pulsar/internal/http_client_go_1.12.go
similarity index 64%
copy from pulsar/internal/auth/disabled.go
copy to pulsar/internal/http_client_go_1.12.go
index e23bb35..59ce0b4 100644
--- a/pulsar/internal/auth/disabled.go
+++ b/pulsar/internal/http_client_go_1.12.go
@@ -15,33 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package auth
+// +build go1.12
 
-import "crypto/tls"
+package internal
 
-type disabled struct{}
+import "net/http"
 
-// NewAuthDisabled return a interface of Provider
-func NewAuthDisabled() Provider {
-	return &disabled{}
-}
-
-func (disabled) Init() error {
-	return nil
-}
-
-func (disabled) GetData() ([]byte, error) {
-	return nil, nil
-}
-
-func (disabled) Name() string {
-	return ""
-}
-
-func (disabled) GetTLSCertificate() (*tls.Certificate, error) {
-	return nil, nil
-}
-
-func (disabled) Close() error {
-	return nil
+func CloseIdleConnections(c *http.Client) {
+	c.CloseIdleConnections()
 }
diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go
index e5efa28..eb63077 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -19,6 +19,7 @@ package internal
 
 import (
 	"errors"
+	"fmt"
 	"net/url"
 
 	"github.com/gogo/protobuf/proto"
@@ -33,6 +34,11 @@ type LookupResult struct {
 	PhysicalAddr *url.URL
 }
 
+// PartitionedTopicMetadata encapsulates a struct for metadata of a partitioned topic
+type PartitionedTopicMetadata struct {
+	Partitions int `json:"partitions"` // Number of partitions for the topic
+}
+
 // LookupService is a interface of lookup service.
 type LookupService interface {
 	// Lookup perform a lookup for the given topic, confirm the location of the broker
@@ -41,7 +47,10 @@ type LookupService interface {
 
 	// GetPartitionedTopicMetadata perform a CommandPartitionedTopicMetadata request for
 	// the given topic, returns the CommandPartitionedTopicMetadataResponse as the result.
-	GetPartitionedTopicMetadata(topic string) (*pb.CommandPartitionedTopicMetadataResponse, error)
+	GetPartitionedTopicMetadata(topic string) (*PartitionedTopicMetadata, error)
+
+	// Allow Lookup Service's internal client to be able to closed
+	Closable
 }
 
 type lookupService struct {
@@ -162,7 +171,7 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
 	return nil, errors.New("exceeded max number of redirection during topic lookup")
 }
 
-func (ls *lookupService) GetPartitionedTopicMetadata(topic string) (*pb.CommandPartitionedTopicMetadataResponse,
+func (ls *lookupService) GetPartitionedTopicMetadata(topic string) (*PartitionedTopicMetadata,
 	error) {
 	ls.metrics.PartitionedTopicMetadataRequestsCount.Inc()
 	topicName, err := ParseTopicName(topic)
@@ -181,5 +190,121 @@ func (ls *lookupService) GetPartitionedTopicMetadata(topic string) (*pb.CommandP
 	}
 	ls.log.Debugf("Got topic{%s} partitioned metadata response: %+v", topic, res)
 
-	return res.Response.PartitionMetadataResponse, nil
+	if res.Response.PartitionMetadataResponse.Error != nil {
+		return nil, errors.New(res.Response.PartitionMetadataResponse.GetError().String())
+	}
+
+	return &PartitionedTopicMetadata{Partitions: int(res.Response.PartitionMetadataResponse.GetPartitions())}, nil
+}
+
+func (ls *lookupService) Close() {}
+
+const HTTPLookupServiceBasePathV1 string = "/lookup/v2/destination/"
+const HTTPLookupServiceBasePathV2 string = "/lookup/v2/topic/"
+const HTTPAdminServiceV1Format string = "/admin/%s/partitions"
+const HTTPAdminServiceV2Format string = "/admin/v2/%s/partitions"
+
+type httpLookupData struct {
+	BrokerURL    string `json:"brokerUrl"`
+	BrokerURLTLS string `json:"brokerUrlTls"`
+	HTTPURL      string `json:"httpUrl"`
+	HTTPURLTLS   string `json:"httpUrlTls"`
+}
+
+type httpLookupService struct {
+	httpClient          HTTPClient
+	serviceNameResolver ServiceNameResolver
+	tlsEnabled          bool
+	log                 log.Logger
+	metrics             *Metrics
+}
+
+func (h *httpLookupService) getBrokerAddress(ld *httpLookupData) (logicalAddress *url.URL,
+	physicalAddress *url.URL, err error) {
+	if h.tlsEnabled {
+		logicalAddress, err = url.ParseRequestURI(ld.BrokerURLTLS)
+	} else {
+		logicalAddress, err = url.ParseRequestURI(ld.BrokerURL)
+	}
+
+	if err != nil {
+		return nil, nil, err
+	}
+
+	return logicalAddress, logicalAddress, nil
+}
+
+func (h *httpLookupService) Lookup(topic string) (*LookupResult, error) {
+	topicName, err := ParseTopicName(topic)
+	if err != nil {
+		return nil, err
+	}
+
+	basePath := HTTPLookupServiceBasePathV2
+	if !IsV2TopicName(topicName) {
+		basePath = HTTPLookupServiceBasePathV1
+	}
+
+	lookupData := &httpLookupData{}
+	err = h.httpClient.Get(basePath+GetTopicRestPath(topicName), lookupData)
+	if err != nil {
+		return nil, err
+	}
+
+	h.log.Debugf("Successfully looked up topic{%s} on http broker. %+v",
+		topic, lookupData)
+
+	logicalAddress, physicalAddress, err := h.getBrokerAddress(lookupData)
+	if err != nil {
+		return nil, err
+	}
+
+	return &LookupResult{
+		LogicalAddr:  logicalAddress,
+		PhysicalAddr: physicalAddress,
+	}, nil
+
+}
+
+func (h *httpLookupService) GetPartitionedTopicMetadata(topic string) (*PartitionedTopicMetadata,
+	error) {
+	topicName, err := ParseTopicName(topic)
+	if err != nil {
+		return nil, err
+	}
+
+	format := HTTPAdminServiceV2Format
+	if !IsV2TopicName(topicName) {
+		format = HTTPAdminServiceV1Format
+	}
+
+	path := fmt.Sprintf(format, GetTopicRestPath(topicName))
+
+	tMetadata := &PartitionedTopicMetadata{}
+
+	err = h.httpClient.Get(path, tMetadata)
+	if err != nil {
+		return nil, err
+	}
+
+	h.log.Debugf("Got topic{%s} partitioned metadata response: %+v", topic, tMetadata)
+
+	return tMetadata, nil
+}
+
+func (h *httpLookupService) Close() {
+	h.httpClient.Close()
+}
+
+// NewHTTPLookupService init a http based lookup service struct and return an object of LookupService.
+func NewHTTPLookupService(httpClient HTTPClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
+	tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService {
+
+	return &httpLookupService{
+		httpClient:          httpClient,
+		serviceNameResolver: serviceNameResolver,
+		tlsEnabled:          tlsEnabled,
+		log:                 logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
+		metrics:             metrics,
+	}
 }
diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go
index de5677e..92255a6 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -18,10 +18,15 @@
 package internal
 
 import (
+	"bytes"
+	"encoding/json"
+	"io/ioutil"
 	"net/url"
+	"strings"
 	"testing"
 
 	"github.com/gogo/protobuf/proto"
+	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
 
 	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -483,7 +488,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
 	metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
 	assert.NoError(t, err)
 	assert.NotNil(t, metadata)
-	assert.Equal(t, metadata.GetPartitions(), uint32(1))
+	assert.Equal(t, metadata.Partitions, 1)
 }
 
 func TestLookupSuccessWithMultipleHosts(t *testing.T) {
@@ -519,3 +524,79 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
 	assert.Equal(t, "pulsar://broker-1:6650", lr.LogicalAddr.String())
 	assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String())
 }
+
+type MockHTTPClient struct {
+	ServiceNameResolver ServiceNameResolver
+}
+
+func (c *MockHTTPClient) Close() {}
+
+func (c *MockHTTPClient) Get(endpoint string, obj interface{}) error {
+	if strings.Contains(endpoint, HTTPLookupServiceBasePathV1) || strings.Contains(endpoint,
+		HTTPLookupServiceBasePathV2) {
+		return mockHTTPGetLookupResult(obj)
+	} else if strings.Contains(endpoint, "partitions") {
+		return mockHTTPGetPartitionedTopicMetadataResult(obj)
+	}
+	return errors.New("not supported request")
+}
+
+func mockHTTPGetLookupResult(obj interface{}) error {
+	jsonResponse := `{
+   		"brokerUrl": "pulsar://broker-1:6650",
+   		"brokerUrlTls": "",
+		"httpUrl": "http://broker-1:8080",
+		"httpUrlTls": ""
+  	}`
+	r := ioutil.NopCloser(bytes.NewReader([]byte(jsonResponse)))
+	dec := json.NewDecoder(r)
+	err := dec.Decode(obj)
+	return err
+}
+
+func mockHTTPGetPartitionedTopicMetadataResult(obj interface{}) error {
+	jsonResponse := `{
+   		"partitions": 1
+  	}`
+	r := ioutil.NopCloser(bytes.NewReader([]byte(jsonResponse)))
+	dec := json.NewDecoder(r)
+	err := dec.Decode(obj)
+	return err
+}
+
+func NewMockHTTPClient(serviceNameResolver ServiceNameResolver) HTTPClient {
+	h := &MockHTTPClient{}
+	h.ServiceNameResolver = serviceNameResolver
+	return h
+}
+
+func TestHttpLookupSuccess(t *testing.T) {
+	url, err := url.Parse("http://broker-1:8080")
+	assert.NoError(t, err)
+	serviceNameResolver := NewPulsarServiceNameResolver(url)
+	httpClient := NewMockHTTPClient(serviceNameResolver)
+	ls := NewHTTPLookupService(httpClient, url, serviceNameResolver, false,
+		log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+
+	lr, err := ls.Lookup("my-topic")
+	assert.NoError(t, err)
+	assert.NotNil(t, lr)
+
+	assert.Equal(t, "pulsar://broker-1:6650", lr.LogicalAddr.String())
+	assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String())
+}
+
+func TestHttpGetPartitionedTopicMetadataSuccess(t *testing.T) {
+	url, err := url.Parse("http://broker-1:8080")
+	assert.NoError(t, err)
+	serviceNameResolver := NewPulsarServiceNameResolver(url)
+	httpClient := NewMockHTTPClient(serviceNameResolver)
+	ls := NewHTTPLookupService(httpClient, url, serviceNameResolver, false,
+		log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
+
+	tMetadata, err := ls.GetPartitionedTopicMetadata("my-topic")
+	assert.NoError(t, err)
+	assert.NotNil(t, tMetadata)
+
+	assert.Equal(t, 1, tMetadata.Partitions)
+}
diff --git a/pulsar/internal/topic_name.go b/pulsar/internal/topic_name.go
index fb57fd9..86a1ebe 100644
--- a/pulsar/internal/topic_name.go
+++ b/pulsar/internal/topic_name.go
@@ -20,6 +20,7 @@ package internal
 import (
 	"errors"
 	"fmt"
+	"net/url"
 	"strconv"
 	"strings"
 )
@@ -29,6 +30,7 @@ type TopicName struct {
 	Domain    string
 	Tenant    string
 	Namespace string
+	Topic     string
 	Name      string
 	Partition int
 }
@@ -86,9 +88,11 @@ func ParseTopicName(topic string) (*TopicName, error) {
 	if len(parts) == 3 {
 		// New topic name without cluster name
 		tn.Namespace = parts[0] + "/" + parts[1]
+		tn.Topic = parts[2]
 	} else if len(parts) == 4 {
 		// Legacy topic name that includes cluster name
 		tn.Namespace = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[2])
+		tn.Topic = parts[3]
 	} else {
 		return nil, errors.New("Invalid topic name: " + topic)
 	}
@@ -120,3 +124,14 @@ func getPartitionIndex(topic string) (int, error) {
 	}
 	return -1, nil
 }
+
+func IsV2TopicName(tn *TopicName) bool {
+	parts := strings.Split(tn.Namespace, "/")
+	// Legacy topic name that includes cluster name
+	// tn.Namespace = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[2])
+	return len(parts) != 3
+}
+
+func GetTopicRestPath(tn *TopicName) string {
+	return fmt.Sprintf("%s/%s/%s", tn.Domain, tn.Namespace, url.QueryEscape(tn.Topic))
+}
diff --git a/pulsar/internal/topic_name_test.go b/pulsar/internal/topic_name_test.go
index 7ade7e0..f08fcd0 100644
--- a/pulsar/internal/topic_name_test.go
+++ b/pulsar/internal/topic_name_test.go
@@ -37,6 +37,7 @@ func TestParseTopicName(t *testing.T) {
 	assert.Equal(t, "persistent://public/default/my-topic", topic.Name)
 	assert.Equal(t, "public", topic.Tenant)
 	assert.Equal(t, "public/default", topic.Namespace)
+	assert.Equal(t, "my-topic", topic.Topic)
 	assert.Equal(t, -1, topic.Partition)
 
 	topic, err = ParseTopicName("my-tenant/my-namespace/my-topic")
@@ -44,6 +45,7 @@ func TestParseTopicName(t *testing.T) {
 	assert.Equal(t, "persistent://my-tenant/my-namespace/my-topic", topic.Name)
 	assert.Equal(t, "my-tenant", topic.Tenant)
 	assert.Equal(t, "my-tenant/my-namespace", topic.Namespace)
+	assert.Equal(t, "my-topic", topic.Topic)
 	assert.Equal(t, -1, topic.Partition)
 
 	topic, err = ParseTopicName("non-persistent://my-tenant/my-namespace/my-topic")
@@ -51,6 +53,7 @@ func TestParseTopicName(t *testing.T) {
 	assert.Equal(t, "non-persistent://my-tenant/my-namespace/my-topic", topic.Name)
 	assert.Equal(t, "my-tenant", topic.Tenant)
 	assert.Equal(t, "my-tenant/my-namespace", topic.Namespace)
+	assert.Equal(t, "my-topic", topic.Topic)
 	assert.Equal(t, -1, topic.Partition)
 
 	topic, err = ParseTopicName("my-topic-partition-5")
@@ -58,6 +61,7 @@ func TestParseTopicName(t *testing.T) {
 	assert.Equal(t, "persistent://public/default/my-topic-partition-5", topic.Name)
 	assert.Equal(t, "public", topic.Tenant)
 	assert.Equal(t, "public/default", topic.Namespace)
+	assert.Equal(t, "my-topic-partition-5", topic.Topic)
 	assert.Equal(t, 5, topic.Partition)
 
 	// V1 topic name
@@ -66,6 +70,7 @@ func TestParseTopicName(t *testing.T) {
 	assert.Equal(t, "persistent://my-tenant/my-cluster/my-ns/my-topic", topic.Name)
 	assert.Equal(t, "my-tenant", topic.Tenant)
 	assert.Equal(t, "my-tenant/my-cluster/my-ns", topic.Namespace)
+	assert.Equal(t, "my-topic", topic.Topic)
 	assert.Equal(t, -1, topic.Partition)
 
 	topic, err = ParseTopicName("my-tenant/my-cluster/my-ns/my-topic")
@@ -73,6 +78,7 @@ func TestParseTopicName(t *testing.T) {
 	assert.Equal(t, "persistent://my-tenant/my-cluster/my-ns/my-topic", topic.Name)
 	assert.Equal(t, "my-tenant", topic.Tenant)
 	assert.Equal(t, "my-tenant/my-cluster/my-ns", topic.Namespace)
+	assert.Equal(t, "my-topic", topic.Topic)
 	assert.Equal(t, -1, topic.Partition)
 }
 
@@ -114,3 +120,67 @@ func TestTopicNameWithoutPartitionPart(t *testing.T) {
 		assert.Equal(t, test.expected, TopicNameWithoutPartitionPart(&test.tn))
 	}
 }
+
+func TestIsV2TopicName(t *testing.T) {
+	topic, err := ParseTopicName("persistent://my-tenant/my-ns/my-topic")
+
+	assert.Nil(t, err)
+	assert.True(t, IsV2TopicName(topic))
+
+	topic, err = ParseTopicName("my-topic")
+	assert.Nil(t, err)
+	assert.True(t, IsV2TopicName(topic))
+
+	topic, err = ParseTopicName("my-tenant/my-namespace/my-topic")
+	assert.Nil(t, err)
+	assert.True(t, IsV2TopicName(topic))
+
+	topic, err = ParseTopicName("non-persistent://my-tenant/my-namespace/my-topic")
+	assert.Nil(t, err)
+	assert.True(t, IsV2TopicName(topic))
+
+	topic, err = ParseTopicName("my-topic-partition-5")
+	assert.Nil(t, err)
+	assert.True(t, IsV2TopicName(topic))
+
+	// V1 topic name
+	topic, err = ParseTopicName("persistent://my-tenant/my-cluster/my-ns/my-topic")
+	assert.Nil(t, err)
+	assert.False(t, IsV2TopicName(topic))
+
+	topic, err = ParseTopicName("my-tenant/my-cluster/my-ns/my-topic")
+	assert.Nil(t, err)
+	assert.False(t, IsV2TopicName(topic))
+}
+
+func TestGetTopicRestPath(t *testing.T) {
+	topic, err := ParseTopicName("persistent://my-tenant/my-ns/my-topic")
+
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent/my-tenant/my-ns/my-topic", GetTopicRestPath(topic))
+
+	topic, err = ParseTopicName("my-topic")
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent/public/default/my-topic", GetTopicRestPath(topic))
+
+	topic, err = ParseTopicName("my-tenant/my-namespace/my-topic")
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent/my-tenant/my-namespace/my-topic", GetTopicRestPath(topic))
+
+	topic, err = ParseTopicName("non-persistent://my-tenant/my-namespace/my-topic")
+	assert.Nil(t, err)
+	assert.Equal(t, "non-persistent/my-tenant/my-namespace/my-topic", GetTopicRestPath(topic))
+
+	topic, err = ParseTopicName("my-topic-partition-5")
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent/public/default/my-topic-partition-5", GetTopicRestPath(topic))
+
+	// V1 topic name
+	topic, err = ParseTopicName("persistent://my-tenant/my-cluster/my-ns/my-topic")
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent/my-tenant/my-cluster/my-ns/my-topic", GetTopicRestPath(topic))
+
+	topic, err = ParseTopicName("my-tenant/my-cluster/my-ns/my-topic")
+	assert.Nil(t, err)
+	assert.Equal(t, "persistent/my-tenant/my-cluster/my-ns/my-topic", GetTopicRestPath(topic))
+}
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 3471bca..273169a 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -38,7 +38,8 @@ const (
 	serviceURL    = "pulsar://localhost:6650"
 	serviceURLTLS = "pulsar+ssl://localhost:6651"
 
-	webServiceURL = "http://localhost:8080"
+	webServiceURL    = "http://localhost:8080"
+	webServiceURLTLS = "https://localhost:8443"
 
 	caCertsPath       = "../integration-tests/certs/cacert.pem"
 	tlsClientCertPath = "../integration-tests/certs/client-cert.pem"