You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/09 14:47:38 UTC
[shardingsphere] branch master updated: Fix unicode char and special char decoding in TestDecodingPlugin for PostgreSQL (#24049)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6c54a27bb9f Fix unicode char and special char decoding in TestDecodingPlugin for PostgreSQL (#24049)
6c54a27bb9f is described below
commit 6c54a27bb9f038b52e22ed6a763005b06bdabb4a
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Feb 9 22:47:29 2023 +0800
Fix unicode char and special char decoding in TestDecodingPlugin for PostgreSQL (#24049)
* Fix unicode cannot be decoded correctly at TestDecodingPlugin
* Remove single quote case now.
* Improve
* Improve json parse
* Fix string parse
---
.../ingest/wal/decode/TestDecodingPlugin.java | 58 +++++++++++++++++++---
.../ingest/wal/decode/TestDecodingPluginTest.java | 23 +++++++--
.../cases/task/PostgreSQLIncrementTask.java | 8 +--
.../framework/helper/PipelineCaseHelper.java | 7 +--
4 files changed, 76 insertions(+), 20 deletions(-)
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index 99e632831fe..bd7ef30e542 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -191,6 +191,9 @@ public final class TestDecodingPlugin implements DecodingPlugin {
}
case "bytea":
return decodeHex(readNextString(data).substring(2));
+ case "json":
+ case "jsonb":
+ return readNextJson(data);
default:
return readNextString(data);
}
@@ -208,26 +211,65 @@ public final class TestDecodingPlugin implements DecodingPlugin {
return eventType.toString();
}
+ private String readNextJson(final ByteBuffer data) {
+ data.get();
+ int offset = 0;
+ int startPosition = data.position();
+ int level = 0;
+ while (data.hasRemaining()) {
+ offset++;
+ char c = (char) data.get();
+ if ('{' == c) {
+ level++;
+ } else if ('}' == c) {
+ level--;
+ if (0 != level) {
+ continue;
+ }
+ if ('\'' != data.get()) {
+ throw new IngestException("Read json data unexpected exception");
+ }
+ if (data.hasRemaining()) {
+ data.get();
+ }
+ return readStringSegment(data, startPosition, offset).replace("''", "'");
+ }
+ }
+ return null;
+ }
+
+ private String readStringSegment(final ByteBuffer data, final int startPosition, final int offset) {
+ byte[] result = new byte[offset];
+ for (int i = 0; i < offset; i++) {
+ result[i] = data.get(startPosition + i);
+ }
+ return new String(result);
+ }
+
private String readNextString(final ByteBuffer data) {
- StringBuilder result = new StringBuilder();
+ int offset = 0;
data.get();
+ int startPosition = data.position();
while (data.hasRemaining()) {
char c = (char) data.get();
+ offset++;
if ('\'' == c) {
if (!data.hasRemaining()) {
- return result.toString();
+ offset--;
+ return readStringSegment(data, startPosition, offset).replace("''", "'");
}
char c2 = (char) data.get();
- if (' ' == c2) {
- return result.toString();
+ if ('\'' == c2) {
+ offset++;
+ continue;
}
- if ('\'' != c2) {
- throw new IngestException("Read character varying data unexpected exception");
+ if (' ' == c2) {
+ offset--;
+ return readStringSegment(data, startPosition, offset).replace("''", "'");
}
}
- result.append(c);
}
- return result.toString();
+ return readStringSegment(data, startPosition, offset);
}
private byte[] decodeHex(final String hexString) {
diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
index 5d07bdcd4c8..1c6a88ffe7a 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPluginTest.java
@@ -33,8 +33,8 @@ import java.sql.SQLException;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNull;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -46,20 +46,26 @@ public final class TestDecodingPluginTest {
@Test
public void assertDecodeWriteRowEvent() {
- ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: data[character varying]:'1 2 3'''".getBytes());
+ ByteBuffer data = ByteBuffer.wrap(("table public.test: INSERT: data[character varying]:' 1 2 3'' 😊ä¸' t_json_empty[json]:'{}' t_json[json]:'{\"test\":\"ä¸ä¸{ä¸ä¸}' ä¸\"}'"
+ + " t_jsonb[jsonb]:'{\"test\":\"😊Emojiä¸\"}'").getBytes());
WriteRowEvent actual = (WriteRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
- assertThat(actual.getAfterRow().get(0), is("1 2 3'"));
+ assertThat(actual.getAfterRow().get(0), is(" 1 2 3' 😊ä¸"));
+ assertThat(actual.getAfterRow().get(1), is("{}"));
+ assertThat(actual.getAfterRow().get(2), is("{\"test\":\"ä¸ä¸{ä¸ä¸}' ä¸\"}"));
+ assertThat(actual.getAfterRow().get(3), is("{\"test\":\"😊Emojiä¸\"}"));
}
@Test
public void assertDecodeUpdateRowEvent() {
- ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE: data[character varying]:'1 2 3'''".getBytes());
+ ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE: unicode[character varying]:' 1 2 3'' ðŸ˜Šä¸ ' t_json_empty[json]:'{}' t_json[json]:'{\"test\":\"ä¸ä¸{ä¸ä¸}' ä¸\"}'".getBytes());
UpdateRowEvent actual = (UpdateRowEvent) new TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
- assertThat(actual.getAfterRow().get(0), is("1 2 3'"));
+ assertThat(actual.getAfterRow().get(0), is(" 1 2 3' ðŸ˜Šä¸ "));
+ assertThat(actual.getAfterRow().get(1), is("{}"));
+ assertThat(actual.getAfterRow().get(2), is("{\"test\":\"ä¸ä¸{ä¸ä¸}' ä¸\"}"));
}
@Test
@@ -112,4 +118,11 @@ public final class TestDecodingPluginTest {
assertNull(actualWriteRowEvent.getAfterRow().get(2));
assertThat(actualWriteRowEvent.getAfterRow().get(3), is("nonnull"));
}
+
+ @Test
+ public void assertDecodeJsonValue() {
+ ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT: id[integer]:123 ".getBytes());
+ AbstractWALEvent actual = new TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual, instanceOf(WriteRowEvent.class));
+ }
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index beffecb7f28..e4ee829895e 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -72,9 +72,8 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
private Object insertOrder() {
ThreadLocalRandom random = ThreadLocalRandom.current();
- String status = 0 == random.nextInt() % 2 ? null : "ä¸æ–‡æµ‹è¯•";
- Object[] orderInsertDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), random.nextInt(0, 6), status, PipelineCaseHelper.generateJsonString(4, true),
- PipelineCaseHelper.generateJsonString(4, false)};
+ Object[] orderInsertDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), random.nextInt(0, 6), "'ä¸æ–‡'" + System.currentTimeMillis(), PipelineCaseHelper.generateJsonString(5, true),
+ PipelineCaseHelper.generateJsonString(10, false)};
String insertSQL = String.format("INSERT INTO %s (order_id,user_id,status,t_json,t_jsonb) VALUES (?, ?, ?, ?, ?)", getTableNameWithSchema(orderTableName));
log.info("insert order sql:{}", insertSQL);
DataSourceExecuteUtil.execute(dataSource, insertSQL, orderInsertDate);
@@ -91,7 +90,8 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
}
private void updateOrderByPrimaryKey(final Object primaryKey) {
- Object[] updateData = {"updated" + Instant.now().getEpochSecond(), PipelineCaseHelper.generateJsonString(4, true), PipelineCaseHelper.generateJsonString(4, false), primaryKey};
+ // TODO openGauss incremental task parse single quote not correctly now
+ Object[] updateData = {"ä¸æ–‡UPDATE" + Instant.now().getEpochSecond(), PipelineCaseHelper.generateJsonString(5, true), PipelineCaseHelper.generateJsonString(5, false), primaryKey};
String updateSql = String.format("UPDATE %s SET status = ?, t_json = ?, t_jsonb = ? WHERE order_id = ?", getTableNameWithSchema(orderTableName));
DataSourceExecuteUtil.execute(dataSource, updateSql, updateData);
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
index 66e2f1dbbb6..ac5b31171d7 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper;
+import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -110,9 +111,9 @@ public final class PipelineCaseHelper {
*/
public static String generateJsonString(final int length, final boolean useUnicodeCharacter) {
String value;
- if (useUnicodeCharacter && length > 1) {
- // TODO need support unicode
- value = generateString(length);
+ if (useUnicodeCharacter) {
+ // TODO openGauss incremental task parse single quote not correctly now
+ value = Strings.repeat("{ä¸ } ABC", length);
} else {
value = generateString(length);
}