You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/29 18:43:38 UTC

[GitHub] [pulsar] cbornet commented on a change in pull request #14929: Add FULL_MESSAGE_IN_JSON_EXPAND_VALUE message format to Kinesis sink

cbornet commented on a change in pull request #14929:
URL: https://github.com/apache/pulsar/pull/14929#discussion_r837796876



##########
File path: pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
##########
@@ -68,25 +72,25 @@
  *      which accepts json-map of credentials in awsCredentialPluginParam
  *      eg: awsCredentialPluginParam = {"accessKey":"my-access-key","secretKey":"my-secret-key"}
  * 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link AwsCredentialProviderPlugin}
- * 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON"]
+ * 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON","FULL_MESSAGE_IN_FB"]
  *   a. ONLY_RAW_PAYLOAD:     publishes raw payload to stream
  *   b. FULL_MESSAGE_IN_JSON: publish full message (encryptionCtx + properties + payload) in json format
  *   json-schema:
  *   {"type":"object","properties":{"encryptionCtx":{"type":"object","properties":{"metadata":{"type":"object","additionalProperties":{"type":"string"}},"uncompressedMessageSize":{"type":"integer"},"keysMetadataMap":{"type":"object","additionalProperties":{"type":"object","additionalProperties":{"type":"string"}}},"keysMapBase64":{"type":"object","additionalProperties":{"type":"string"}},"encParamBase64":{"type":"string"},"compressionType":{"type":"string","enum":["NONE","LZ4","ZLIB"]},"batchSize":{"type":"integer"},"algorithm":{"type":"string"}}},"payloadBase64":{"type":"string"},"properties":{"type":"object","additionalProperties":{"type":"string"}}}}
  *   Example:
  *   {"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
+ *   c. FULL_MESSAGE_IN_FB: publish full message (encryptionCtx + properties + payload) in flatbuffer format
+ *   d. FULL_MESSAGE_IN_JSON_EXPAND_VALUE: publish full message (topic + key + value + properties + event time) in JSON format using the schema to expand the value in JSON.
  * </pre>
  *
- *
- *
  */
 @Connector(
     name = "kinesis",
     type = IOType.SINK,
     help = "A sink connector that copies messages from Pulsar to Kinesis",
     configClass = KinesisSinkConfig.class
 )
-public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
+public class KinesisSink extends AbstractAwsConnector implements Sink<GenericObject> {

Review comment:
       I don't think it will. The change is backward compatible. We did the same for Elastic Search.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org