You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2020/08/27 05:06:33 UTC

[GitHub] [camel-kafka-connector] aliadnani opened a new issue #398: How to have connector output JSON String?

aliadnani opened a new issue #398:
URL: https://github.com/apache/camel-kafka-connector/issues/398


   Hi, I'm using the Camel Websocket Connecter to send data from a topic to a websocket. The data from the input topic is in **Json/JsonSchema** format.
   
   The connector works for sending data to the websocket, but the data is in the format of `Struct{key1=val1,key2=val2}`
   
   Is it possible to have the connector emit a JSON string like  `{"key1":"val1","key2": "val2"}` instead?
   
   The task config can be seen via CURL request below:
   
   ```
   curl -X POST -H "Content-Type: application/json" \
       --data '{
           "name": "camel-sink3",
           "config": {
             "topics": "xxx",
             "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
             "value.converter.schema.registry.url": "http:/xxx-cp-schema-registry:8081",
             "key.converter": "org.apache.kafka.connect.storage.StringConverter",
             "connector.class": "org.apache.camel.kafkaconnector.websocket.CamelWebsocketSinkConnector",  
             "camel.sink.path.host": "0.0.0.0",
             "camel.sink.path.port":"8084",
   	  "camel.component.websocket.host":"0.0.0.0",
             "camel.component.websocket.port": "8084",
   	  "camel.sink.path.resourceUri": "kstreams-windows-average",
             "camel.component.websocket.minThreads": "3",
             "camel.sink.endpoint.sendToAll": "true",
           }
         }' http://xxx:8083/connectors
   ``` 
   
   Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] aliadnani commented on issue #398: How to have connector output JSON String?

Posted by GitBox <gi...@apache.org>.
aliadnani commented on issue #398:
URL: https://github.com/apache/camel-kafka-connector/issues/398#issuecomment-681705905


   If I use `org.apache.kafka.connect.storage.StringConverter` I get the following for my topic data serialized with **JSON Schema**:
   
   ```
   "\u0000\u0000\u0000\u0000\u0001{\"name\":\"temperatureCelsius\",\"value\":23.0,\"unit\":\"C\",\"utc\":1592805755142}"
   ```
   The double quotes are escaped and I'm guessing the `\u0000\u0000\u0000\u0000\u0001` at the beginning is the [Magic Byte/Schema ID](https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format) thats encoded with the message.
   
   I think what's happening when the value converter is set to `JsonSchemaConverter` is that the message is serialized to a Java object (based on the [JsonConverter source](https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java) I'm guessing Jackson JsonNode?) and that object gets passed directly to the websocket output resulting in something like `Struct{key1=val1,key2=val2}`. Whereas, with `StringConverter` the message (inlcuding the schema ID bytes) are deserialized to a String object directly resulting in above ^^ funny looking JSON.
   
   I was poking around the camel connector codebase and saw transforms, would it be possible to write a transform class to take the JsonNode object and format it to a JSON string with the method `jsonNode.toPrettyString()` before sending it to the websocket output? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd commented on issue #398: How to have connector output JSON String?

Posted by GitBox <gi...@apache.org>.
oscerd commented on issue #398:
URL: https://github.com/apache/camel-kafka-connector/issues/398#issuecomment-702851756


   This can be close


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] aliadnani commented on issue #398: How to have connector output JSON String?

Posted by GitBox <gi...@apache.org>.
aliadnani commented on issue #398:
URL: https://github.com/apache/camel-kafka-connector/issues/398#issuecomment-682320665


   Thank you! It works!
   
   I was wrong that the `JsonSchemaConverter` deserializes the record into a JsonNode object, I found out its actually a [Kafka Connect Struct Object](https://kafka.apache.org/11/javadoc/org/apache/kafka/connect/data/Struct.html).
   
   Anyways, I installed a [transform library](https://github.com/jcustenborder/kafka-connect-transform-common/tree/master/src) and used the [ToJSON SMT](https://github.com/jcustenborder/kafka-connect-transform-common#tojson) to form a JSON string from the Struct before outputting to websocket.
   
   My CURL request looks like this now:
   
   ```
   curl -X POST -H "Content-Type: application/json" \
       --data '{
           "name": "camel-sink3",
           "config": {
             "topics": "xxx",
             "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
             "value.converter.schema.registry.url": "http:/xxx-cp-schema-registry:8081",
             "key.converter": "org.apache.kafka.connect.storage.StringConverter",
             "connector.class": "org.apache.camel.kafkaconnector.websocket.CamelWebsocketSinkConnector",  
             "camel.sink.path.host": "0.0.0.0",
             "camel.sink.path.port":"8084",
   	  "camel.component.websocket.host":"0.0.0.0",
             "camel.component.websocket.port": "8084",
   	  "camel.sink.path.resourceUri": "kstreams-windows-average",
             "camel.component.websocket.minThreads": "3",
             "camel.sink.endpoint.sendToAll": "true",
             "transforms": "toJson",
             "transforms.toJson.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value",
             "transforms.toJson.schemas.enable": "false"
           }
         }' http://xxx:8083/connectors
   ```
   
   And now the output is a proper JSON: `{"unit":"C","utc":1588264680439,"name":"temperature","value":422.0}`
   
   Thanks again for the help!
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] aliadnani edited a comment on issue #398: How to have connector output JSON String?

Posted by GitBox <gi...@apache.org>.
aliadnani edited a comment on issue #398:
URL: https://github.com/apache/camel-kafka-connector/issues/398#issuecomment-681705905


   If I use `org.apache.kafka.connect.storage.StringConverter` I get the following for my topic data serialized with **JSON Schema**:
   
   ```
   "\u0000\u0000\u0000\u0000\u0001{\"name\":\"temperatureCelsius\",\"value\":23.0,\"unit\":\"C\",\"utc\":1592805755142}"
   ```
   The double quotes are escaped and I'm guessing the `\u0000\u0000\u0000\u0000\u0001` at the beginning is the [Magic Byte/Schema ID](https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format) thats encoded with the message.
   
   I think what's happening when the value converter is set to `JsonSchemaConverter` is that the message is serialized to a Java object (based on the [JsonConverter source](https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java) I'm guessing Jackson JsonNode?) and that object gets passed directly to the websocket output resulting in something like `Struct{key1=val1,key2=val2}`. Whereas, with `StringConverter` the message (inlcuding the schema ID bytes) are deserialized to a String object directly resulting in above ^^ funny looking JSON.
   
   I was poking around the camel connector codebase and saw transforms, would it work to write a transform class to take the JsonNode object and format it to a JSON string with the method `jsonNode.toPrettyString()` before sending it to the websocket output? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] orpiske commented on issue #398: How to have connector output JSON String?

Posted by GitBox <gi...@apache.org>.
orpiske commented on issue #398:
URL: https://github.com/apache/camel-kafka-connector/issues/398#issuecomment-681893274


   ...
   
   > I was poking around the camel connector codebase and saw transforms, would it work to write a transform class to take the JsonNode object and format it to a JSON string with the method `jsonNode.toPrettyString()` before sending it to the websocket output? - or do I misunderstand what transforms are to camel sweat_smile?
   
   I think this would do the trick. We do have a few cases where we do this, in the test code base, for example, so I assume it would be a pretty similar scenario. 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] aliadnani edited a comment on issue #398: How to have connector output JSON String?

Posted by GitBox <gi...@apache.org>.
aliadnani edited a comment on issue #398:
URL: https://github.com/apache/camel-kafka-connector/issues/398#issuecomment-682320665


   Thank you! It works!
   
   I was wrong that the `JsonSchemaConverter` deserializes the record into a JsonNode object, I found out its actually a [Kafka Connect Struct Object](https://kafka.apache.org/11/javadoc/org/apache/kafka/connect/data/Struct.html).
   
   Anyways, I installed a [transform library](https://github.com/jcustenborder/kafka-connect-transform-common/tree/master/src) and used the [ToJSON SMT](https://github.com/jcustenborder/kafka-connect-transform-common#tojson) to form a JSON string from the Struct before outputting to websocket.
   
   My CURL request looks like this now:
   
   ```
   curl -X POST -H "Content-Type: application/json" \
       --data '{
           "name": "camel-sink3",
           "config": {
             "topics": "xxx",
             "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
             "value.converter.schema.registry.url": "http:/xxx-cp-schema-registry:8081",
             "key.converter": "org.apache.kafka.connect.storage.StringConverter",
             "connector.class": "org.apache.camel.kafkaconnector.websocket.CamelWebsocketSinkConnector",  
             "camel.sink.path.host": "0.0.0.0",
             "camel.sink.path.port":"8084",
   	  "camel.component.websocket.host":"0.0.0.0",
             "camel.component.websocket.port": "8084",
   	  "camel.sink.path.resourceUri": "kstreams-windows-average",
             "camel.component.websocket.minThreads": "3",
             "camel.sink.endpoint.sendToAll": "true",
             "transforms": "toJson",
             "transforms.toJson.type": "com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value",
             "transforms.toJson.schemas.enable": "false"
           }
         }' http://xxx:8083/connectors
   ```
   
   And now the output is a proper JSON: `{"unit":"C","utc":1588264680439,"name":"temperature","value":422.0}`
   
   I was thinking maybe this should be the default behavior for Sink Connectors dealing consuming non-string data (JSON in my case) and don't have to send data in a specific format.
   
   To me it makes more sense that non-string data (JSON, Avro) parsed by their converters to a Struct Object is sent to a websocket (or any other sink) as JSON rather than string-ed struct: `Struct{key1=val1,key2=val2}`
   
   Anyways, thanks again for the help!
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] aliadnani edited a comment on issue #398: How to have connector output JSON String?

Posted by GitBox <gi...@apache.org>.
aliadnani edited a comment on issue #398:
URL: https://github.com/apache/camel-kafka-connector/issues/398#issuecomment-681705905


   If I use `org.apache.kafka.connect.storage.StringConverter` I get the following for my topic data serialized with **JSON Schema**:
   
   ```
   "\u0000\u0000\u0000\u0000\u0001{\"name\":\"temperatureCelsius\",\"value\":23.0,\"unit\":\"C\",\"utc\":1592805755142}"
   ```
   The double quotes are escaped and I'm guessing the `\u0000\u0000\u0000\u0000\u0001` at the beginning is the [Magic Byte/Schema ID](https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format) thats encoded with the message.
   
   I think what's happening when the value converter is set to `JsonSchemaConverter` is that the message is serialized to a Java object (based on the [JsonConverter source](https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java) I'm guessing Jackson JsonNode?) and that object gets passed directly to the websocket output resulting in something like `Struct{key1=val1,key2=val2}`. Whereas, with `StringConverter` the message (inlcuding the schema ID bytes) are deserialized to a String object directly resulting in above ^^ funny looking JSON.
   
   I was poking around the camel connector codebase and saw transforms, would it work to write a transform class to take the JsonNode object and format it to a JSON string with the method `jsonNode.toPrettyString()` before sending it to the websocket output? -  or do I misunderstand what transforms are to camel 😅?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] oscerd closed issue #398: How to have connector output JSON String?

Posted by GitBox <gi...@apache.org>.
oscerd closed issue #398:
URL: https://github.com/apache/camel-kafka-connector/issues/398


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [camel-kafka-connector] orpiske commented on issue #398: How to have connector output JSON String?

Posted by GitBox <gi...@apache.org>.
orpiske commented on issue #398:
URL: https://github.com/apache/camel-kafka-connector/issues/398#issuecomment-681634068


   @aliadnani Out of curiosity: what happens if you use `org.apache.kafka.connect.storage.StringConverter` as the value converter? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org