You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/17 07:25:02 UTC

[GitHub] [pulsar] RobertIndie commented on a diff in pull request #17125: [improve][client c++] Support KeyValue Schema.

RobertIndie commented on code in PR #17125:
URL: https://github.com/apache/pulsar/pull/17125#discussion_r947511251


##########
pulsar-client-cpp/examples/SampleKeyValueSchemaProducer.cc:
##########
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <iostream>
+#include <thread>
+#include <pulsar/Client.h>
+#include <lib/LogUtils.h>
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+int main() {
+    Client client("pulsar://localhost:6650");
+
+    std::string jsonSchema =
+        "{\"type\":\"record\",\"name\":\"cpx\",\"fields\":[{\"name\":\"re\",\"type\":\"double\"},{\"name\":"
+        "\"im\",\"type\":\"double\"}]}";
+
+    SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+    SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+    SchemaInfo keyValueSchema(keySchema, valueSchema, SEPARATED);
+    std::cout << keyValueSchema.getSchema() << std::endl;
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setSchema(keyValueSchema);
+
+    Producer producer;
+    Result result =
+        client.createProducer("persistent://public/default/kv-schema", producerConfiguration, producer);
+    if (result != ResultOk) {
+        LOG_ERROR("Error creating producer: " << result);
+        return -1;
+    }
+
+    std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
+
+    KeyValue keyValue(jsonData, jsonData, SEPARATED);
+
+    Message msg = MessageBuilder().setContent(keyValue).setProperty("x", "1").build();
+    producer.send(msg);
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    LOG_ERROR("send message ok");

Review Comment:
   ```suggestion
       LOG_INFO("send message ok");
   ```



##########
pulsar-client-cpp/lib/KeyValue.cc:
##########
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/KeyValue.h>
+#include <pulsar/Schema.h>
+#include "SharedBuffer.h"
+
+using namespace pulsar;
+
+namespace pulsar {
+
+class PULSAR_PUBLIC KeyValueImpl {
+   public:
+    std::string keyContent_;
+    std::string valueContent_;
+    KeyValueEncodingType keyValueEncodingType;
+
+    KeyValueImpl(){};
+    KeyValueImpl(std::string key, std::string value, KeyValueEncodingType keyValueEncodingType)
+        : keyContent_(key), valueContent_(value), keyValueEncodingType(keyValueEncodingType){};
+};
+
+KeyValue::KeyValue() : impl_() {}
+
+KeyValue::KeyValue(const std::string &data, const KeyValueEncodingType &keyValueEncodingType) {
+    impl_ = std::make_shared<KeyValueImpl>();
+    impl_->keyValueEncodingType = keyValueEncodingType;
+    if (impl_->keyValueEncodingType == INLINE) {
+        SharedBuffer buffer = SharedBuffer::copy(data.c_str(), data.length());
+        int keySize = buffer.readUnsignedInt();

Review Comment:
   Seems we need to handle the case of `keySize` or `valueSize` is 0. In the java client, it will return null if the keySize or valueSize is zero. But here it only returns the empty string.



##########
pulsar-client-cpp/tests/MessageTest.cc:
##########
@@ -99,3 +99,24 @@ TEST(MessageTest, testMessageBuilder) {
         ASSERT_EQ(msg.getData(), originalAddress);
     }
 }
+
+TEST(MessageTest, testMessageBuilderSetKeyValueContent) {
+    std::string keyContent = "keyContent";
+    std::string valueContent = "valueContent";
+
+    // test inline encoding type.
+    {
+        KeyValue keyValue(keyContent, valueContent, INLINE);
+        const Message& message = MessageBuilder().setContent(keyValue).build();
+        ASSERT_EQ(message.getDataAsString(), keyValue.getContent());
+        ASSERT_EQ(message.getPartitionKey(), "");
+    }
+
+    // test inline encoding type.

Review Comment:
   ```suggestion
       // test separated encoding type.
   ```



##########
pulsar-client-cpp/include/pulsar/KeyValue.h:
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef KEY_VALUE_HPP_
+#define KEY_VALUE_HPP_
+
+#include <map>
+#include <string>
+#include <memory>
+#include "defines.h"
+#include "Schema.h"
+
+namespace pulsar {
+
+class KeyValueImpl;
+
+class PULSAR_PUBLIC KeyValue {
+   public:
+    KeyValue();
+    KeyValue(const std::string &data, const KeyValueEncodingType &keyValueEncodingType);
+    KeyValue(const std::string &key, const std::string &value,
+             const KeyValueEncodingType &keyValueEncodingType);
+    std::string getContent();
+    std::string getKeyData();
+    std::string getValueData();

Review Comment:
   How to handle data that cannot be parsed as a string?



##########
pulsar-client-cpp/include/pulsar/KeyValue.h:
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef KEY_VALUE_HPP_
+#define KEY_VALUE_HPP_
+
+#include <map>
+#include <string>
+#include <memory>
+#include "defines.h"
+#include "Schema.h"
+
+namespace pulsar {
+
+class KeyValueImpl;
+
+class PULSAR_PUBLIC KeyValue {
+   public:
+    KeyValue();
+    KeyValue(const std::string &data, const KeyValueEncodingType &keyValueEncodingType);
+    KeyValue(const std::string &key, const std::string &value,
+             const KeyValueEncodingType &keyValueEncodingType);
+    std::string getContent();

Review Comment:
   Could `getContent` be achieved by `getValueData`?



##########
pulsar-client-cpp/include/pulsar/KeyValue.h:
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef KEY_VALUE_HPP_
+#define KEY_VALUE_HPP_
+
+#include <map>
+#include <string>
+#include <memory>
+#include "defines.h"
+#include "Schema.h"
+
+namespace pulsar {
+
+class KeyValueImpl;
+
+class PULSAR_PUBLIC KeyValue {
+   public:
+    KeyValue();
+    KeyValue(const std::string &data, const KeyValueEncodingType &keyValueEncodingType);
+    KeyValue(const std::string &key, const std::string &value,
+             const KeyValueEncodingType &keyValueEncodingType);
+    std::string getContent();
+    std::string getKeyData();
+    std::string getValueData();
+    KeyValueEncodingType getEncodingType();

Review Comment:
   Whenever possible, we should declare these methods as const function.



##########
pulsar-client-cpp/lib/Schema.cc:
##########
@@ -90,6 +117,51 @@ SchemaInfo::SchemaInfo(SchemaType schemaType, const std::string &name, const std
                        const StringMap &properties)
     : impl_(std::make_shared<SchemaInfoImpl>(schemaType, name, schema, properties)) {}
 
+SchemaInfo::SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo &valueSchema,
+                       const KeyValueEncodingType &keyValueEncodingType) {
+    auto checkType = [](const SchemaInfo &schemaInfo) {
+        if (schemaInfo.getSchemaType() != JSON && schemaInfo.getSchemaType() != AVRO) {
+            throw std::invalid_argument("Key and value schema just support JSON or AVRO.");

Review Comment:
   It's better to add this information to the PR description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org