You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2021/01/04 09:01:11 UTC

[GitHub] [camel-kafka-connector] orpiske commented on issue #824: Add JDBC examples

orpiske commented on issue #824:
URL: https://github.com/apache/camel-kafka-connector/issues/824#issuecomment-753850103


   Providing these, as taken from the test code for a kick-off. Maybe we can leverage it ...
   
   Base configuration using Postgres, but should work in the same way for other DBs: 
   
   ```
   connector.class=org.apache.camel.kafkaconnector.jdbc.CamelJdbcSinkConnector
   tasks.max=1
   camel.sink.path.dataSourceName=someName
   topics=org.apache.camel.kafkaconnector.jdbc.sink.CamelSinkJDBCITCase
   name=CamelJDBCSinkConnector
   value.converter=org.apache.kafka.connect.storage.StringConverter
   camel.component.jdbc.dataSource=#class:org.apache.camel.kafkaconnector.jdbc.services.TestDataSource
   key.converter=org.apache.kafka.connect.storage.StringConverter
   camel.sink.endpoint.useHeadersAsParameters=true
   ```
   
   
   You need to provide a data source. This should setup the JDBC url, username and password. Here's an example from the test code: 
   
   ```
   public class TestDataSource extends PGSimpleDataSource {
       private static final Logger LOG = LoggerFactory.getLogger(TestDataSource.class);
   
       private static final String URL;
   
       static {
           URL = System.getProperty(JDBCProperties.JDBC_CONNECTION_URL);
       }
   
       public TestDataSource() {
           super();
           setUrl(URL);
   
           setUser("ckc");
           setPassword("ckcDevel123");
   
       }
   }
   ```
   
   The database insertion is handled by the body of the message and the rows will be resolved using Camel headers, such as: 
   
   ```
   String body = "insert into test(test_name, test_data) values(:?TestName, :?TestData)"; 
   
   ... 
   
   
   Map<String, String> jdbcParameters = new HashMap<>();
   
   // The prefix 'CamelHeader' is removed by the SinkTask
   jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestName", "SomeName1");
   jdbcParameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "TestData", "test data ");
   
   
   // Send the data
   kafkaClient.produce("topic-name", body, jdbcParameters);
   ```
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org