You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/26 12:15:40 UTC

[GitHub] [pulsar-client-cpp] shibd opened a new pull request, #157: [feat] Support auto download schema when create producer.

shibd opened a new pull request, #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157

   ### Motivation
    If a schema exists in a topic, but the producer does not know it. He still wanted to send data to the topic. (The scenario in which the DLQ sends a message: Please refer #139 **[Verifying this change][5]**)
   
   
   ### Modifications
   - When creating a producer with `autoDownloadSchema` param, try to get the schema of that topic and use it.
   - Support `getSchema` on `LookupService`(HTTP and Binary).
   
   
   ### Verifying this change
   
     - Add `LookupServiceTest` unit test to cover `getSchema` logic.
     - Add `SchemaTest.testAutoDownloadSchema` unit test to cover creating producer success when the topic has a schema.
   
   ### Documentation
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157#discussion_r1065716501


##########
include/pulsar/Result.h:
##########
@@ -94,6 +94,7 @@ enum Result
     ResultInterrupted,  /// Interrupted while waiting to dequeue
 
     ResultDisconnected,  /// Client connection has been disconnected
+    ResultNotFound       /// The generic was not found

Review Comment:
   It seems to be "The schema was not found". If yes, it's better to use `ResultSchemaNotFound` for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157#discussion_r1065720801


##########
lib/ClientConnection.cc:
##########
@@ -1308,6 +1308,52 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
                     break;
                 }
 
+                case BaseCommand::GET_SCHEMA_RESPONSE: {
+                    const auto& response = incomingCmd.getschemaresponse();
+                    LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from server. req_id: "
+                                         << response.request_id());
+                    Lock lock(mutex_);
+                    PendingGetSchemaMap::iterator it = pendingGetSchemaRequests_.find(response.request_id());
+                    if (it != pendingGetSchemaRequests_.end()) {
+                        Promise<Result, boost::optional<SchemaInfo>> getSchemaPromise = it->second;
+                        pendingGetSchemaRequests_.erase(it);
+                        lock.unlock();
+
+                        if (response.has_error_code()) {
+                            if (response.error_code() == proto::TopicNotFound) {
+                                getSchemaPromise.setValue(boost::none);
+                            } else {
+                                Result result = getResult(response.error_code(), response.error_message());
+                                LOG_WARN(cnxString_ << "Received error GetSchemaResponse from server "
+                                                    << result
+                                                    << (response.has_error_message()
+                                                            ? (" (" + response.error_message() + ")")
+                                                            : "")
+                                                    << " -- req_id: " << response.request_id());
+                                getSchemaPromise.setFailed(result);
+                            }
+                            return;
+                        }
+
+                        auto schema = response.schema();
+                        auto properMap = schema.properties();

Review Comment:
   ```suggestion
                           const auto& schema = response.schema();
                           const auto& properMap = schema.properties();
   ```
   
   Avoid unnecessary copy.



##########
lib/ClientConnection.cc:
##########
@@ -1308,6 +1308,52 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
                     break;
                 }
 
+                case BaseCommand::GET_SCHEMA_RESPONSE: {
+                    const auto& response = incomingCmd.getschemaresponse();
+                    LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from server. req_id: "
+                                         << response.request_id());
+                    Lock lock(mutex_);
+                    PendingGetSchemaMap::iterator it = pendingGetSchemaRequests_.find(response.request_id());

Review Comment:
   ```suggestion
                       auto it = pendingGetSchemaRequests_.find(response.request_id());
   ```



##########
lib/LookupService.h:
##########
@@ -21,6 +21,7 @@
 
 #include <pulsar/Result.h>
 
+#include <boost/optional.hpp>
 #include <memory>

Review Comment:
   It's better to include `<pulsar/Schema.h>` here though it does not affect the compilation because the `.cc` files that includes `LookupService.h` already include `Schema.h`. But for this single file, it might make some static code analysis checks fail. 
   
   ![image](https://user-images.githubusercontent.com/18204803/211551587-193a12cb-1e1a-4e2e-8354-8179e4b48eb7.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157#discussion_r1065715612


##########
include/pulsar/Result.h:
##########
@@ -94,6 +94,7 @@ enum Result
     ResultInterrupted,  /// Interrupted while waiting to dequeue
 
     ResultDisconnected,  /// Client connection has been disconnected
+    ResultNotFound       /// The generic was not found

Review Comment:
   What does "generic" mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower merged pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
BewareMyPower merged PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157#discussion_r1073564343


##########
include/pulsar/Result.h:
##########
@@ -94,6 +94,7 @@ enum Result
     ResultInterrupted,  /// Interrupted while waiting to dequeue
 
     ResultDisconnected,  /// Client connection has been disconnected
+    ResultNotFound       /// The generic was not found

Review Comment:
   Could this `Result` be returned with binary protocol? If it's an error that is only related to the HTTP protocol, I think it would not be a good idea to expose it as a `Result`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157#discussion_r1065732705


##########
tests/LookupServiceTest.cc:
##########
@@ -274,3 +277,82 @@ TEST(LookupServiceTest, testTimeout) {
 
     ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
 }
+
+class LookupServiceTest : public ::testing::TestWithParam<std::string> {
+   public:
+    void TearDown() override { client_.close(); }
+
+   protected:
+    Client client_{GetParam()};
+};
+
+TEST_P(LookupServiceTest, testGetSchema) {
+    const std::string topic = "testGetSchema" + std::to_string(time(nullptr)) + GetParam().substr(0, 4);
+    std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+
+    StringMap properties;
+    properties.emplace("key1", "value1");
+    properties.emplace("key2", "value2");
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setSchema(SchemaInfo(SchemaType::JSON, "json", jsonSchema, properties));
+    Producer producer;
+    ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer));
+
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
+    auto lookup = clientImplPtr->getLookup();
+
+    boost::optional<SchemaInfo> schemaInfo;
+    auto future = lookup->getSchema(TopicName::get(topic));
+    ASSERT_EQ(ResultOk, future.get(schemaInfo));
+    ASSERT_EQ(jsonSchema, schemaInfo->getSchema());
+    ASSERT_EQ(SchemaType::JSON, schemaInfo->getSchemaType());
+    ASSERT_EQ(properties, schemaInfo->getProperties());
+}
+
+TEST_P(LookupServiceTest, testGetSchemaNotFund) {
+    const std::string topic =
+        "testGetSchemaNotFund" + std::to_string(time(nullptr)) + GetParam().substr(0, 4);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client_.createProducer(topic, producer));
+
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
+    auto lookup = clientImplPtr->getLookup();
+
+    boost::optional<SchemaInfo> schemaInfo;
+    auto future = lookup->getSchema(TopicName::get(topic));
+    ASSERT_EQ(ResultOk, future.get(schemaInfo));
+    ASSERT_TRUE(!schemaInfo);
+}
+
+TEST_P(LookupServiceTest, testGetKeyValueSchema) {
+    const std::string topic =
+        "testGetKeyValueSchema" + std::to_string(time(nullptr)) + GetParam().substr(0, 4);
+    StringMap properties;
+    properties.emplace("key1", "value1");
+    properties.emplace("key2", "value2");
+    std::string jsonSchema =
+        R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+    SchemaInfo keySchema(JSON, "key-json", jsonSchema, properties);
+    SchemaInfo valueSchema(JSON, "value-json", jsonSchema, properties);
+    SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setSchema(keyValueSchema);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer));
+
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client_);
+    auto lookup = clientImplPtr->getLookup();
+
+    boost::optional<SchemaInfo> schemaInfo;
+    auto future = lookup->getSchema(TopicName::get(topic));
+    ASSERT_EQ(ResultOk, future.get(schemaInfo));
+    ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo->getSchema());
+    ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo->getSchemaType());
+    ASSERT_TRUE(!schemaInfo->getProperties().empty());

Review Comment:
   Replace with `ASSERT_FALSE`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157#discussion_r1073574213


##########
lib/HTTPLookupService.cc:
##########
@@ -409,4 +430,72 @@ void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const std
     }
 }
 
+void HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise, const std::string completeUrl) {
+    std::string responseData;
+    Result result = sendHTTPRequest(completeUrl, responseData);
+
+    if (result == ResultNotFound) {
+        promise.setValue(boost::none);

Review Comment:
   If the `ResultNotFound` is only used internally, it should not be exposed to users. `ResultRetryable`, which was added by me, is a bad example but I cannot think of a better way because of the deeply coupling code. IMO, it would be acceptable to remove `ResultRetryable` from the enum if there was a better way to achieve the same goal.
   
   `sendHTTPRequest` is a private method of `HTTPLookupService`, we can return the `long` response code instead of the `Result`. Then, we can map the response code to a `Result` when the code is not 404.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157#discussion_r1069423672


##########
include/pulsar/Result.h:
##########
@@ -94,6 +94,7 @@ enum Result
     ResultInterrupted,  /// Interrupted while waiting to dequeue
 
     ResultDisconnected,  /// Client connection has been disconnected
+    ResultNotFound       /// The generic was not found

Review Comment:
   This means the requested resource/data was not found. Currently used as HTTP request 404, It can also be used as other scenarios. So define it as generic
   
   ```c++
               case CURLE_HTTP_RETURNED_ERROR:
                   LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
                   if (response_code == 404) {
                       retResult = ResultNotFound;
                   } else {
                       retResult = ResultConnectError;
                   }
                   break;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] BewareMyPower commented on a diff in pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157#discussion_r1073576266


##########
tests/ClientTest.cc:
##########
@@ -305,6 +305,7 @@ TEST(ClientTest, testCloseClient) {
         auto t0 = std::chrono::steady_clock::now();
         while ((std::chrono::steady_clock::now() - t0) < std::chrono::microseconds(i)) {
         }
+        sleep(1);

Review Comment:
   Please move it into another PR if you want to fix the flaky test. BTW, sleeping 1 second is unacceptable here because it's in a 1000 times loop



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157#discussion_r1080735293


##########
tests/ClientTest.cc:
##########
@@ -305,6 +305,7 @@ TEST(ClientTest, testCloseClient) {
         auto t0 = std::chrono::steady_clock::now();
         while ((std::chrono::steady_clock::now() - t0) < std::chrono::microseconds(i)) {
         }
+        sleep(1);

Review Comment:
   Oh, sorry. This was written by my local test and I submitted it by mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on a diff in pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157#discussion_r1080761256


##########
lib/HTTPLookupService.cc:
##########
@@ -409,4 +430,72 @@ void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const std
     }
 }
 
+void HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise, const std::string completeUrl) {
+    std::string responseData;
+    Result result = sendHTTPRequest(completeUrl, responseData);
+
+    if (result == ResultNotFound) {
+        promise.setValue(boost::none);

Review Comment:
   Get it. I add a overload methods: `Result sendHTTPRequest(std::string completeUrl, std::string& responseData, long& responseCode);`, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-client-cpp] shibd commented on pull request #157: [feat] Support auto download schema when create producer.

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #157:
URL: https://github.com/apache/pulsar-client-cpp/pull/157#issuecomment-1385632278

   Job failure seems to have a flaky test, can help review and rerun the job? @BewareMyPower 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org