You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by GitBox <gi...@apache.org> on 2020/05/22 10:04:24 UTC

[GitHub] [bahir-flink] zhoulii opened a new pull request #84: [bahir-232] support Flink Table API & SQL for flink-connector-activemq

zhoulii opened a new pull request #84:
URL: https://github.com/apache/bahir-flink/pull/84


   flink-connector-activemq does not support Flink Table API & SQL, based on the the existing code, it is not very difficult to support this feature, we just need to implement 4 classes:   AMQTableSource、AMQTableSink、AMQTableSourceFactory and AMQTableSourceFactory. Then we can connect activemq by the following way:
   
   `String TABLE_CREATE_SQL = "CREATE TABLE books (" +
           " id int, " +
           " title varchar, " +
           " author varchar, " +
           " price double, " +
           " qty int " +
           ") with (" +
           " 'connector.type' = 'activemq', " +
           " 'connector.broker-url' = 'vm://localhost?broker.persistent=false', " +
           " 'connector.destination-type' = 'QUEUE', " +
           " 'connector.destination-name' = 'source_queue' " +
           ")";
   
   String INITIALIZE_TABLE_SQL = "INSERT INTO books VALUES\n" +
           "(1001, 'Java public for dummies', 'Tan Ah Teck', 11.11, 11),\n" +
           "(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),\n" +
           "(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),\n" +
           "(1004, 'A Cup of Java', 'Kumar', 44.44, 44),\n" +
           "(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55),\n" +
           "(1006, 'A Teaspoon of Java 1.4', 'Kevin Jones', 66.66, 66),\n" +
           "(1007, 'A Teaspoon of Java 1.5', 'Kevin Jones', 77.77, 77),\n" +
           "(1008, 'A Teaspoon of Java 1.6', 'Kevin Jones', 88.88, 88),\n" +
           "(1009, 'A Teaspoon of Java 1.7', 'Kevin Jones', 99.99, 99),\n" +
           "(1010, 'A Teaspoon of Java 1.8', 'Kevin Jones', 33.33, 100)";
   
   String QUERY_TABLE_SQL = "SELECT * FROM books";
   
   // create activemq source table
   tEnv.sqlUpdate(TABLE_CREATE_SQL);
   
   // produce event to activemq
   tEnv.sqlUpdate(INITIALIZE_TABLE_SQL);
   
   // consume from activemq
   Table table = tEnv.sqlQuery(QUERY_TABLE_SQL);`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org