You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/12/15 18:05:20 UTC

[GitHub] [carbondata] pratyakshsharma commented on a change in pull request #4243: [CARBONDATA-4308]: added docs for streamer tool configs

pratyakshsharma commented on a change in pull request #4243:
URL: https://github.com/apache/carbondata/pull/4243#discussion_r769867806



##########
File path: docs/scd-and-cdc-guide.md
##########
@@ -131,4 +131,88 @@ clauses can have at most one UPDATE and one DELETE action, These clauses have th
 
 * Please refer example class [MergeTestCase](https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala) to understand and implement scd and cdc scenarios using APIs.
 * Please refer example class [DataMergeIntoExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala) to understand and implement scd and cdc scenarios using sql. 
-* Please refer example class [DataUPSERTExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala) to understand and implement cdc using UPSERT APIs.
\ No newline at end of file
+* Please refer example class [DataUPSERTExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataUPSERTExample.scala) to understand and implement cdc using UPSERT APIs.
+
+### Streamer Tool
+
+Carbondata streamer tool is a very powerful tool for incrementally capturing change events from varied sources like kafka or DFS and merging them into target carbondata table. This essentially means one needs to integrate with external solutions like Debezium or Maxwell for moving the change events to kafka, if one wishes to capture changes from primary databases like mysql. The tool currently requires incoming data to be present in avro format and incoming schema to evolve in backwards compatible way.
+
+Below is a high level architecture of how the overall pipeline looks like -
+
+![Carbondata streamer tool pipeline](../docs/images/carbondata-streamer-tool-pipeline.png?raw=true)
+
+#### Configs
+
+Streamer tool exposes below configs for users to cater to their CDC use cases - 
+
+| Parameter                         | Default Value                                              | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
+|-----------------------------------|------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| carbon.streamer.target.database   | (none)                                                     | The database name where the target table is present to merge the incoming data. If not given by user, system will take the current database in the spark session.                                                                                                                                                                                                                                                                                                                                          |
+| carbon.streamer.target.table      | (none)                                                     | The target carbondata table where the data has to be merged. If this is not configured by user, the operation will fail.                                                                                                                                                                                                                                                                                                                                                                                   |
+| carbon.streamer.source.type       | kafka                                                      | Streamer tool currently supports two types of data sources. One can ingest data from either kafka or DFS into target carbondata table using streamer tool.                                                                                                                                                                                                                                                                                                                                        |
+| carbon.streamer.dfs.input.path    | (none)                                                     | An absolute path on a given file system from where data needs to be read to ingest into the target carbondata table. Mandatory if the ingestion source type is DFS.                                                                                                                                                                                                                                                                                                                                        |
+| schema.registry.url               | (none)                                                     | Streamer tool supports 2 different ways to supply schema of incoming data. Schemas can be supplied using avro files (file based schema provider) or using schema registry. This property defines the url to connect to in case schema registry is used as the schema source.                                                                                                                                                                                                                               |
+| carbon.streamer.input.kafka.topic | (none)                                                     | This is a mandatory property to be set in case kafka is chosen as the source of data. This property defines the topics from where streamer tool will consume the data.                                                                                                                                                                                                                                                                                                                                     |
+| bootstrap.servers                 | (none)                                                     | This is another mandatory property in case kafka is chosen as the source of data. This defines the end points for kafka brokers.                                                                                                                                                                                                                                                                                                                                                                           |
+| auto.offset.reset | earliest                                                   | Streamer tool maintains checkpoints to keep a track of the incoming messages which are already consumed. In case of first ingestion using kafka source, this property defines the offset from where ingestion will start. This property can take only 2 valid values - `latest` and `earliest`                                                                                                                                                                                                             |
+| key.deserializer | `org.apache.kafka.common.serialization.StringDeserializer` | Any message in kafka is ultimately a key value pair in the form of serialized bytes. This property defines the deserializer to deserialize the key of a message.                                                                                                                                                                                                                                                                                                                                           |
+| value.deserializer | `io.confluent.kafka.serializers.KafkaAvroDeserializer`     | This property defines the class which will be used for deserializing the values present in kafka topic.                                                                                                                                                                                                                                                                                                                                                                                                    |
+| enable.auto.commit | false                                                      | Kafka maintains an internal topic for storing offsets corresponding to the consumer groups. This property determines if kafka should actually go forward and commit the offsets consumed in this internal topic. We recommend to keep it as false since we use spark streaming checkpointing to take care of the same.                                                                                                                                                                                     |
+| group.id | (none)                                                     | Streamer tool is ultimately a consumer for kafka. This property determines the consumer group id streamer tool belongs to.                                                                                                                                                                                                                                                                                                                                                                                 |
+| carbon.streamer.input.payload.format | avro                                                       | This determines the format of the incoming messages from source. Currently only avro is supported. We have plans to extend this support to json as well in near future. Avro is the most preferred format for CDC use cases since it helps in making the message size very compact and has good support for schema evolution use cases as well.                                                                                                                                                            |
+| carbon.streamer.schema.provider | SchemaRegistry                                             | As discussed earlier, streamer tool supports 2 ways of supplying schema for incoming messages - schema registry and avro files. Confluent schema registry is the preferred way when using avro as the input format.                                                                                                                                                                                                                                                                                        |
+| carbon.streamer.source.schema.path | (none)                                                     | This property defines the absolute path where files containing schemas for incoming messages are present.                                                                                                                                                                                                                                                                                                                                                                                                  |
+| carbon.streamer.merge.operation.type | upsert                                                     | This defines the operation that needs to be performed on the incoming batch of data while writing it to target data set.                                                                                                                                                                                                                                                                                                                                                                                   |
+| carbon.streamer.merge.operation.field | (none)                                                     | This property defines the field in incoming schema which contains the type of operation performed at source. For example, Debezium includes a field called `op` when reading change events from primary database. Do not confuse this property with `carbon.streamer.merge.operation.type` which defines the operation to be performed on the incoming batch of data. However this property is needed so that streamer tool is able to identify rows deleted at source when the operation type is `upsert`. |
+| carbon.streamer.record.key.field | (none)                                                     | This defines the record key for a particular incoming record. This is used by the streamer tool for performing deduplication. In case this is not defined, operation will fail.                                                                                                                                                                                                                                                                                                                            |
+| carbon.streamer.batch.interval | 10                                                         | Minimum batch interval time between 2 continuous ingestion in continuous mode. Should be specified in seconds.                                                                                                                                                                                                                                                                                                                                                                                             |
+| carbon.streamer.source.ordering.field | <none>                                                     | Name of the field from source schema whose value can be used for picking the latest updates for a particular record in the incoming batch in case of multiple updates for the same record key. Useful if the write operation type is UPDATE or UPSERT. This will be used only if `carbon.streamer.upsert.deduplicate` is enabled.                                                                                                                                                                          |
+| carbon.streamer.insert.deduplicate | false                                                      | This property specifies if the incoming batch needs to be deduplicated in case of INSERT operation type. If set to true, the incoming batch will be deduplicated against the existing data in the target carbondata table.                                                                                                                                                                                                                                                                                 |
+| carbon.streamer.upsert.deduplicate | true                                                       | This property specifies if the incoming batch needs to be deduplicated (when multiple updates for the same record key are present in the incoming batch) in case of UPSERT/UPDATE operation type. If set to true, the user needs to provide proper value for the source ordering field as well.                                                                                                                                                                                                            |
+| carbon.streamer.meta.columns | (none)                                                     | Generally when performing CDC operations on primary databases, few metadata columns are added along with the actual columns for book keeping purposes. This property enables users to list down all such metadata fields (comma separated) which should not be merged with the target carboondata table.                                                                                                                                                                                                   |
+| carbon.enable.schema.enforcement | true                                                       | This flag decides if table schema needs to change as per the incoming batch schema. If set to true, incoming schema will be validated with existing table schema. If the schema has evolved, the incoming batch cannot be ingested and job will simply fail.                                                                                                                                                                                                                                               |
+
+#### Commands
+
+1. For kafka source - 
+
+```
+bin/spark-submit --class org.apache.carbondata.streamer.CarbonDataStreamer \
+--master spark://root1-ThinkPad-T490s:7077 \
+jars/apache-carbondata-2.3.0-SNAPSHOT-bin-spark2.4.5-hadoop2.7.2.jar \

Review comment:
       my bad, missed it completely. Thank you for pointing this out. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org