You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/01 08:53:38 UTC
[pulsar] branch master updated: [fix][function] Ensure bytes is a well-formed UTF-8 byte sequence when decode the `FunctionState` bytes to string (#16199)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 172a84624f4 [fix][function] Ensure bytes is a well-formed UTF-8 byte sequence when decode the `FunctionState` bytes to string (#16199)
172a84624f4 is described below
commit 172a84624f47bb722e6b419de4e239fbb8ecc154
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Jul 1 16:53:31 2022 +0800
[fix][function] Ensure bytes is a well-formed UTF-8 byte sequence when decode the `FunctionState` bytes to string (#16199)
---
.../functions/worker/rest/api/ComponentImpl.java | 11 ++--
.../tests/integration/io/TestByteStateSource.java | 55 +++++++++++++++++
.../integration/functions/PulsarStateTest.java | 69 ++++++++++++++++++++--
3 files changed, 124 insertions(+), 11 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 999cae8ea09..f628ebbb7f9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -30,6 +30,7 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageN
import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
import static org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.downloadPackageFile;
+import com.google.common.base.Utf8;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
@@ -1270,13 +1271,13 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
if (kv.isNumber()) {
value = new FunctionState(key, null, null, kv.numberValue(), kv.version());
} else {
- try {
- value = new FunctionState(key, new String(ByteBufUtil
- .getBytes(kv.value(), kv.value().readerIndex(), kv.value().readableBytes()), UTF_8),
+ byte[] bytes = ByteBufUtil.getBytes(kv.value());
+ if (Utf8.isWellFormed(bytes)) {
+ value = new FunctionState(key, new String(bytes, UTF_8),
null, null, kv.version());
- } catch (Exception e) {
+ } else {
value = new FunctionState(
- key, null, ByteBufUtil.getBytes(kv.value()), null, kv.version());
+ key, null, bytes, null, kv.version());
}
}
}
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestByteStateSource.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestByteStateSource.java
new file mode 100644
index 00000000000..4fe382f5ce7
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestByteStateSource.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import java.util.Map;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+public class TestByteStateSource implements Source<byte[]> {
+
+ private SourceContext sourceContext;
+
+ public static final String VALUE_BASE64 = "0a8001127e0a172e6576656e74732e437573746f6d65724372656174656412630a243"
+ + "2336366666263652d623038342d346631352d616565342d326330643135356131666"
+ + "36312026e311a3700000000000000000000000000000000000000000000000000000"
+ + "000000000000000000000000000000000000000000000000000000000";
+
+ @Override
+ public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+ sourceContext.putState("initial", ByteBuffer.wrap(Base64.getDecoder().decode(VALUE_BASE64)));
+ this.sourceContext = sourceContext;
+ }
+
+ @Override
+ public Record<byte[]> read() throws Exception {
+ Thread.sleep(50);
+ ByteBuffer initial = sourceContext.getState("initial");
+ sourceContext.putState("now", initial);
+ return initial::array;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index 0582a095c85..1dead726f85 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -18,6 +18,16 @@
*/
package org.apache.pulsar.tests.integration.functions;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.base.Utf8;
+import java.util.Base64;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -40,12 +50,6 @@ import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
/**
* State related test cases.
*/
@@ -57,6 +61,11 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
public static final String WORDCOUNT_PYTHON_FILE = "wordcount_function.py";
+ public static final String VALUE_BASE64 = "0a8001127e0a172e6576656e74732e437573746f6d65724372656174656412630a243"
+ + "2336366666263652d623038342d346631352d616565342d326330643135356131666"
+ + "36312026e311a3700000000000000000000000000000000000000000000000000000"
+ + "000000000000000000000000000000000000000000000000000000000";
+
@Test(groups = {"python_state", "state", "function", "python_function"})
public void testPythonWordCountFunction() throws Exception {
String inputTopicName = "test-wordcount-py-input-" + randomName(8);
@@ -183,6 +192,54 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
getSinkInfoNotFound(sinkName);
}
+ @Test(groups = {"java_state", "state", "function", "java_function"})
+ public void testBytes2StringNotUTF8() {
+ byte[] valueBytes = Base64.getDecoder().decode(VALUE_BASE64);
+ assertFalse(Utf8.isWellFormed(valueBytes));
+ assertNotEquals(valueBytes, new String(valueBytes, UTF_8).getBytes(UTF_8));
+ }
+
+ @Test(groups = {"java_state", "state", "function", "java_function"})
+ public void testSourceByteState() throws Exception {
+ String outputTopicName = "test-state-source-output-" + randomName(8);
+ String sourceName = "test-state-source-" + randomName(8);
+
+ submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestByteStateSource", JAVAJAR);
+
+ // get source info
+ getSourceInfoSuccess(sourceName);
+
+ // get source status
+ getSourceStatus(sourceName);
+
+ try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+ Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+ SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+ assertEquals(status.getInstances().size(), 1);
+ assertTrue(status.getInstances().get(0).getStatus().numWritten > 0);
+ });
+
+ {
+ FunctionState functionState =
+ admin.functions().getFunctionState("public", "default", sourceName, "initial");
+ assertNull(functionState.getStringValue());
+ assertEquals(functionState.getByteValue(), Base64.getDecoder().decode(VALUE_BASE64));
+ }
+
+ Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+ FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "now");
+ assertNull(functionState.getStringValue());
+ assertEquals(functionState.getByteValue(), Base64.getDecoder().decode(VALUE_BASE64));
+ });
+ }
+
+ // delete source
+ deleteSource(sourceName);
+
+ getSourceInfoNotFound(sourceName);
+ }
+
private void submitSourceConnector(String sourceName,
String outputTopicName,
String className,