You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/15 02:10:34 UTC

[GitHub] [hudi] XuQianJin-Stars commented on a change in pull request #4987: [HUDI-3547] Introduce MaxwellSourcePostProcessor to extract data from Maxwell json string

XuQianJin-Stars commented on a change in pull request #4987:
URL: https://github.com/apache/hudi/pull/4987#discussion_r826520719



##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
##########
@@ -120,6 +132,124 @@ public void testChainedJsonKafkaSourcePostProcessor() {
     assertEquals(0, fetch1.getBatch().get().count());
   }
 
+  @Test
+  public void testMaxwellJsonKafkaSourcePostProcessor() throws IOException {
+    // ------------------------------------------------------------------------
+    //  Maxwell data
+    // ------------------------------------------------------------------------
+
+    // database hudi, table hudi_maxwell_01 (insert, update and delete)
+    String hudiMaxwell01Insert = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"insert\","
+        + "\"ts\":1647074402,\"xid\":6233,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\","
+        + "\"name\":\"mathieu\",\"age\":18,\"insert_time\":\"2022-03-12 08:40:02\","
+        + "\"update_time\":\"2022-03-12 08:40:02\"}}";
+
+    String hudiMaxwell01Update = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"update\","
+        + "\"ts\":1647074482,\"xid\":6440,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\","
+        + "\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"},"
+        + "\"old\":{\"age\":18,\"insert_time\":\"2022-03-12 08:40:02\",\"update_time\":\"2022-03-12 08:40:02\"}}";
+
+    String hudiMaxwell01Delete = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"delete\","
+        + "\"ts\":1647074555,\"xid\":6631,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\","
+        + "\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"}}";
+
+    String hudiMaxwell01Ddl = "{\"type\":\"table-alter\",\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\","
+        + "\"old\":{\"database\":\"hudi\",\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\","
+        + "\"primary-key\":[\"id\"],\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"},"
+        + "{\"type\":\"varchar\",\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\","
+        + "\"signed\":true},{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0},"
+        + "{\"type\":\"timestamp\",\"name\":\"update_time\",\"column-length\":0}]},\"def\":{\"database\":\"hudi\","
+        + "\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\",\"primary-key\":[\"id\"],"
+        + "\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"},{\"type\":\"varchar\","
+        + "\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\",\"signed\":true},"
+        + "{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0},{\"type\":\"timestamp\","
+        + "\"name\":\"update_time\",\"column-length\":0}]},\"ts\":1647072305000,\"sql\":\"/* ApplicationName=DBeaver "
+        + "21.0.4 - Main */ ALTER TABLE hudi.hudi_maxwell_01 MODIFY COLUMN age int(3) NULL\"}";
+
+    // database hudi, table hudi_maxwell_010, insert
+    String hudiMaxwell010Insert = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_010\",\"type\":\"insert\","
+        + "\"ts\":1647073982,\"xid\":5164,\"commit\":true,\"data\":{\"id\":\"f3eaf4cdf7534e47a88cdf93d19b2ee6\","
+        + "\"name\":\"wangxianghu\",\"age\":18,\"insert_time\":\"2022-03-12 08:33:02\","
+        + "\"update_time\":\"2022-03-12 08:33:02\"}}";
+
+    // database hudi_02, table hudi_maxwell_02, insert
+    String hudi02Maxwell02Insert = "{\"database\":\"hudi_02\",\"table\":\"hudi_maxwell_02\",\"type\":\"insert\","
+        + "\"ts\":1647073916,\"xid\":4990,\"commit\":true,\"data\":{\"id\":\"9bb17f316ee8488cb107621ddf0f3cb0\","
+        + "\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 08:31:56\","
+        + "\"update_time\":\"2022-03-12 08:31:56\"}}";
+
+    // ------------------------------------------------------------------------
+    //  Tests
+    // ------------------------------------------------------------------------
+
+    ObjectMapper mapper = new ObjectMapper();
+    TypedProperties props = new TypedProperties();
+    props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.DATABASE_NAME_REGEX_PROP.key(), "hudi(_)?[0-9]{0,2}");
+    props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.TABLE_NAME_REGEX_PROP.key(), "hudi_maxwell(_)?[0-9]{0,2}");
+
+    // test insert and update
+    JavaRDD<String> inputInsertAndUpdate = jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudiMaxwell01Update));
+    MaxwellJsonKafkaSourcePostProcessor processor = new MaxwellJsonKafkaSourcePostProcessor(props);
+    processor.process(inputInsertAndUpdate).map(mapper::readTree).foreach(record -> {
+      // database name should be null
+      JsonNode database = record.get("database");
+      // insert and update records should be tagged as no delete
+      boolean isDelete = record.get(HoodieRecord.HOODIE_IS_DELETED).booleanValue();
+
+      assertFalse(isDelete);
+      assertNull(database);

Review comment:
       Why is the database null?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org