You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/09 14:47:38 UTC

[shardingsphere] branch master updated: Fix unicode char and special char decoding in TestDecodingPlugin for PostgreSQL (#24049)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c54a27bb9f Fix unicode char and special char decoding in TestDecodingPlugin for PostgreSQL (#24049)
6c54a27bb9f is described below

commit 6c54a27bb9f038b52e22ed6a763005b06bdabb4a
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Feb 9 22:47:29 2023 +0800

    Fix unicode char and special char decoding in TestDecodingPlugin for PostgreSQL (#24049)
    
    * Fix unicode cannot be decoded correctly at TestDecodingPlugin
    
    * Remove single quote case now.
    
    * Improve
    
    * Improve json parse
    
    * Fix string parse
---
 .../ingest/wal/decode/TestDecodingPlugin.java      | 58 +++++++++++++++++++---
 .../ingest/wal/decode/TestDecodingPluginTest.java  | 23 +++++++--
 .../cases/task/PostgreSQLIncrementTask.java        |  8 +--
 .../framework/helper/PipelineCaseHelper.java       |  7 +--
 4 files changed, 76 insertions(+), 20 deletions(-)

diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index 99e632831fe..bd7ef30e542 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -191,6 +191,9 @@ public final class TestDecodingPlugin implements DecodingPlugin {
                 }
             case "bytea":
                 return decodeHex(readNextString(data).substring(2));
+            case "json":
+            case "jsonb":
+                return readNextJson(data);
             default:
                 return readNextString(data);
         }
@@ -208,26 +211,65 @@ public final class TestDecodingPlugin implements DecodingPlugin {
         return eventType.toString();
     }
     
+    private String readNextJson(final ByteBuffer data) {
+        data.get();
+        int offset = 0;
+        int startPosition = data.position();
+        int level = 0;
+        while (data.hasRemaining()) {
+            offset++;
+            char c = (char) data.get();
+            if ('{' == c) {
+                level++;
+            } else if ('}' == c) {
+                level--;
+                if (0 != level) {
+                    continue;
+                }
+                if ('\'' != data.get()) {
+                    throw new IngestException("Read json data unexpected exception");
+                }
+                if (data.hasRemaining()) {
+                    data.get();
+                }
+                return readStringSegment(data, startPosition, offset).replace("''", "'");
+            }
+        }
+        return null;
+    }
+    
+    private String readStringSegment(final ByteBuffer data, final int startPosition, final int offset) {
+        byte[] result = new byte[offset];
+        for (int i = 0; i < offset; i++) {
+            result[i] = data.get(startPosition + i);
+        }
+        return new String(result);
+    }
+    
     private String readNextString(final ByteBuffer data) {
-        StringBuilder result = new StringBuilder();
+        int offset = 0;
         data.get();
+        int startPosition = data.position();
         while (data.hasRemaining()) {
             char c = (char) data.get();
+            offset++;
             if ('\'' == c) {
                 if (!data.hasRemaining()) {
-                    return result.toString();
+                    offset--;
+                    return readStringSegment(data, startPosition, offset).replace("''", "'");
                 }
                 char c2 = (char) data.get();
-                if (' ' == c2) {
-                    return result.toString();
+                if ('\'' == c2) {
+                    offset++;
+                    continue;
                 }
-                if ('\'' != c2) {
-                    throw new IngestException("Read character varying data unexpected exception");
+                if (' ' == c2) {
+                    offset--;
+                    return readStringSegment(data, startPosition, offset).replace("''", "'");
                 }
             }
-            result.append(c);
         }
-        return result.toString();
+        return readStringSegment(data, startPosition, offset);
     }
     
     private byte[] decodeHex(final String hexString) {
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
index 5d07bdcd4c8..1c6a88ffe7a 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
@@ -33,8 +33,8 @@ import java.sql.SQLException;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNull;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -46,20 +46,26 @@ public final class TestDecodingPluginTest {
     
     @Test
     public void assertDecodeWriteRowEvent() {
-        ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[character varying]:'1 2 3'''".getBytes());
+        ByteBuffer data = ByteBuffer.wrap(("table public.test: INSERT: data[character varying]:' 1 2 3'' 😊中' t_json_empty[json]:'{}' t_json[json]:'{\"test\":\"中中{中中}' 中\"}'"
+                + " t_jsonb[jsonb]:'{\"test\":\"😊Emoji中\"}'").getBytes());
         WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
-        assertThat(actual.getAfterRow().get(0), is("1 2 3'"));
+        assertThat(actual.getAfterRow().get(0), is(" 1 2 3' 😊中"));
+        assertThat(actual.getAfterRow().get(1), is("{}"));
+        assertThat(actual.getAfterRow().get(2), is("{\"test\":\"中中{中中}' 中\"}"));
+        assertThat(actual.getAfterRow().get(3), is("{\"test\":\"😊Emoji中\"}"));
     }
     
     @Test
     public void assertDecodeUpdateRowEvent() {
-        ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE: data[character varying]:'1 2 3'''".getBytes());
+        ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE: unicode[character varying]:' 1 2 3'' 😊中 ' t_json_empty[json]:'{}' t_json[json]:'{\"test\":\"中中{中中}' 中\"}'".getBytes());
         UpdateRowEvent actual = (UpdateRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
-        assertThat(actual.getAfterRow().get(0), is("1 2 3'"));
+        assertThat(actual.getAfterRow().get(0), is(" 1 2 3' 😊中 "));
+        assertThat(actual.getAfterRow().get(1), is("{}"));
+        assertThat(actual.getAfterRow().get(2), is("{\"test\":\"中中{中中}' 中\"}"));
     }
     
     @Test
@@ -112,4 +118,11 @@ public final class TestDecodingPluginTest {
         assertNull(actualWriteRowEvent.getAfterRow().get(2));
         assertThat(actualWriteRowEvent.getAfterRow().get(3), is("nonnull"));
     }
+    
+    @Test
+    public void assertDecodeJsonValue() {
+        ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 ".getBytes());
+        AbstractWALEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+        assertThat(actual, instanceOf(WriteRowEvent.class));
+    }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index beffecb7f28..e4ee829895e 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -72,9 +72,8 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
     
     private Object insertOrder() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
-        String status = 0 == random.nextInt() % 2 ? null : "中文测试";
-        Object[] orderInsertDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), random.nextInt(0, 6), status, PipelineCaseHelper.generateJsonString(4, true),
-                PipelineCaseHelper.generateJsonString(4, false)};
+        Object[] orderInsertDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), random.nextInt(0, 6), "'中文'" + System.currentTimeMillis(), PipelineCaseHelper.generateJsonString(5, true),
+                PipelineCaseHelper.generateJsonString(10, false)};
         String insertSQL = String.format("INSERT INTO %s (order_id,user_id,status,t_json,t_jsonb) VALUES (?, ?, ?, ?, ?)", getTableNameWithSchema(orderTableName));
         log.info("insert order sql:{}", insertSQL);
         DataSourceExecuteUtil.execute(dataSource, insertSQL, orderInsertDate);
@@ -91,7 +90,8 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
     }
     
     private void updateOrderByPrimaryKey(final Object primaryKey) {
-        Object[] updateData = {"updated" + Instant.now().getEpochSecond(), PipelineCaseHelper.generateJsonString(4, true), PipelineCaseHelper.generateJsonString(4, false), primaryKey};
+        // TODO openGauss incremental task parse single quote not correctly now
+        Object[] updateData = {"中文UPDATE" + Instant.now().getEpochSecond(), PipelineCaseHelper.generateJsonString(5, true), PipelineCaseHelper.generateJsonString(5, false), primaryKey};
         String updateSql = String.format("UPDATE %s SET status = ?, t_json = ?, t_jsonb = ? WHERE order_id = ?", getTableNameWithSchema(orderTableName));
         DataSourceExecuteUtil.execute(dataSource, updateSql, updateData);
     }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
index 66e2f1dbbb6..ac5b31171d7 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper;
 
+import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -110,9 +111,9 @@ public final class PipelineCaseHelper {
      */
     public static String generateJsonString(final int length, final boolean useUnicodeCharacter) {
         String value;
-        if (useUnicodeCharacter && length > 1) {
-            // TODO need support unicode
-            value = generateString(length);
+        if (useUnicodeCharacter) {
+            // TODO openGauss incremental task parse single quote not correctly now
+            value = Strings.repeat("{中 } ABC", length);
         } else {
             value = generateString(length);
         }