You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "MonsterChenzhuo (via GitHub)" <gi...@apache.org> on 2023/11/03 03:26:17 UTC

[PR] [flink]support debezium json format [incubator-paimon]

MonsterChenzhuo opened a new pull request, #2251:
URL: https://github.com/apache/incubator-paimon/pull/2251

   <!-- Please specify the module before the PR name: [core] ... or [flink] ... -->
   
   ### Purpose
   
   support debezium json format
   
   <!-- What is the purpose of the change -->
   
   ### Tests
   
   KafkaDebeziumSyncTableActionITCase
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink]support table debezium json format [incubator-paimon]

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo closed pull request #2251: [flink]support table debezium json format
URL: https://github.com/apache/incubator-paimon/pull/2251


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink]support table debezium json format [incubator-paimon]

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #2251:
URL: https://github.com/apache/incubator-paimon/pull/2251#discussion_r1384459632


##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * This class is used to deserialize byte[] messages into String format, and then add primary key
+ * fields to the JSON string.
+ */
+public class JsonPrimaryKeyDeserializationSchema implements DeserializationSchema<String> {
+
+    public static final String PRIMARY_KEY_NAMES = "pkNames";
+    private final List<String> primaryKeyNames;
+
+    public JsonPrimaryKeyDeserializationSchema(List<String> primaryKeyNames) {
+        checkNotNull(primaryKeyNames);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink]support table debezium json format [incubator-paimon]

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo closed pull request #2251: [flink]support table debezium json format
URL: https://github.com/apache/incubator-paimon/pull/2251


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink]support table debezium json format [incubator-paimon]

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin merged PR #2251:
URL: https://github.com/apache/incubator-paimon/pull/2251


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink]support table debezium json format [incubator-paimon]

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #2251:
URL: https://github.com/apache/incubator-paimon/pull/2251#discussion_r1384386613


##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * This class is used to deserialize byte[] messages into String format, and then add primary key
+ * fields to the JSON string.
+ */
+public class JsonPrimaryKeyDeserializationSchema implements DeserializationSchema<String> {
+
+    public static final String PRIMARY_KEY_NAMES = "pkNames";
+    private final List<String> primaryKeyNames;
+
+    public JsonPrimaryKeyDeserializationSchema(List<String> primaryKeyNames) {
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    @Override
+    public String deserialize(byte[] message) {
+        try {
+            String value = new String(message, StandardCharsets.UTF_8);
+            return JsonSerdeUtil.addPrimaryKeysToJson(value, primaryKeyNames, PRIMARY_KEY_NAMES);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to deserialize message", e);
+        }
+    }
+
+    @Override
+    public boolean isEndOfStream(String nextElement) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return TypeInformation.of(new TypeHint<String>() {});

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink]support table debezium json format [incubator-paimon]

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #2251:
URL: https://github.com/apache/incubator-paimon/pull/2251#discussion_r1384388962


##########
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for {@link KafkaDebeziumSyncTableActionITCase}. */
+public class KafkaDebeziumSyncTableActionITCase extends KafkaActionITCaseBase {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink]support table debezium json format [incubator-paimon]

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin commented on code in PR #2251:
URL: https://github.com/apache/incubator-paimon/pull/2251#discussion_r1384331436


##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * This class is used to deserialize byte[] messages into String format, and then add primary key
+ * fields to the JSON string.
+ */
+public class JsonPrimaryKeyDeserializationSchema implements DeserializationSchema<String> {
+
+    public static final String PRIMARY_KEY_NAMES = "pkNames";
+    private final List<String> primaryKeyNames;
+
+    public JsonPrimaryKeyDeserializationSchema(List<String> primaryKeyNames) {
+        this.primaryKeyNames = primaryKeyNames;

Review Comment:
   Check that `primaryKeyNames` is not empty.



##########
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for {@link KafkaDebeziumSyncTableActionITCase}. */
+public class KafkaDebeziumSyncTableActionITCase extends KafkaActionITCaseBase {

Review Comment:
   We can only test dbz format parsing, so just keep `testSchemaEvolution` and `testComputedColumn` and delete other tests.



##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * This class is used to deserialize byte[] messages into String format, and then add primary key
+ * fields to the JSON string.
+ */
+public class JsonPrimaryKeyDeserializationSchema implements DeserializationSchema<String> {
+
+    public static final String PRIMARY_KEY_NAMES = "pkNames";
+    private final List<String> primaryKeyNames;
+
+    public JsonPrimaryKeyDeserializationSchema(List<String> primaryKeyNames) {
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    @Override
+    public String deserialize(byte[] message) {
+        try {
+            String value = new String(message, StandardCharsets.UTF_8);
+            return JsonSerdeUtil.addPrimaryKeysToJson(value, primaryKeyNames, PRIMARY_KEY_NAMES);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to deserialize message", e);
+        }
+    }
+
+    @Override
+    public boolean isEndOfStream(String nextElement) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return TypeInformation.of(new TypeHint<String>() {});

Review Comment:
   `return BasicTypeInfo.STRING_TYPE_INFO;`



##########
paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java:
##########
@@ -219,6 +221,43 @@ public static <T> JsonNode toTree(T value) {
         return OBJECT_MAPPER_INSTANCE.valueToTree(value);
     }
 
+    /**
+     * Adds primary key fields to a JSON string.
+     *
+     * @param value The original JSON string.
+     * @param pkNames A list of primary key names to be added to the JSON string.
+     * @param pkNamesKey The key under which the primary key names will be added.
+     * @return The JSON string with the added primary key names. If the JSON string is not a valid
+     *     JSON object, or if the list of primary key names is empty or null, the original JSON
+     *     string will be returned.
+     * @throws RuntimeException If an error occurs while parsing the JSON string or adding the
+     *     primary key names.
+     */
+    public static String addPrimaryKeysToJson(
+            String value, List<String> pkNames, String pkNamesKey) {

Review Comment:
   We can make this method more common. How about `String putArrayToJsonString(String origin, String key, List<String> values)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink]support table debezium json format [incubator-paimon]

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin commented on code in PR #2251:
URL: https://github.com/apache/incubator-paimon/pull/2251#discussion_r1384438080


##########
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/JsonPrimaryKeyDeserializationSchema.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * This class is used to deserialize byte[] messages into String format, and then add primary key
+ * fields to the JSON string.
+ */
+public class JsonPrimaryKeyDeserializationSchema implements DeserializationSchema<String> {
+
+    public static final String PRIMARY_KEY_NAMES = "pkNames";
+    private final List<String> primaryKeyNames;
+
+    public JsonPrimaryKeyDeserializationSchema(List<String> primaryKeyNames) {
+        checkNotNull(primaryKeyNames);

Review Comment:
   It should be not null and not empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink]support table debezium json format [incubator-paimon]

Posted by "MonsterChenzhuo (via GitHub)" <gi...@apache.org>.
MonsterChenzhuo commented on code in PR #2251:
URL: https://github.com/apache/incubator-paimon/pull/2251#discussion_r1384387079


##########
paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java:
##########
@@ -219,6 +221,43 @@ public static <T> JsonNode toTree(T value) {
         return OBJECT_MAPPER_INSTANCE.valueToTree(value);
     }
 
+    /**
+     * Adds primary key fields to a JSON string.
+     *
+     * @param value The original JSON string.
+     * @param pkNames A list of primary key names to be added to the JSON string.
+     * @param pkNamesKey The key under which the primary key names will be added.
+     * @return The JSON string with the added primary key names. If the JSON string is not a valid
+     *     JSON object, or if the list of primary key names is empty or null, the original JSON
+     *     string will be returned.
+     * @throws RuntimeException If an error occurs while parsing the JSON string or adding the
+     *     primary key names.
+     */
+    public static String addPrimaryKeysToJson(
+            String value, List<String> pkNames, String pkNamesKey) {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org