You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/07/02 03:22:20 UTC

[pulsar] branch branch-2.9 updated: [fix][function] Ensure bytes is a well-formed UTF-8 byte sequence when decode the `FunctionState` bytes to string (#16199)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 282a0f2f02f [fix][function] Ensure bytes is a well-formed UTF-8 byte sequence when decode the `FunctionState` bytes to string (#16199)
282a0f2f02f is described below

commit 282a0f2f02f07cc6ff21983e8ffa539bfcee9342
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Jul 1 16:53:31 2022 +0800

    [fix][function] Ensure bytes is a well-formed UTF-8 byte sequence when decode the `FunctionState` bytes to string (#16199)
    
    (cherry picked from commit 172a84624f47bb722e6b419de4e239fbb8ecc154)
---
 .../functions/worker/rest/api/ComponentImpl.java   | 15 +++--
 .../tests/integration/io/TestByteStateSource.java  | 55 +++++++++++++++++
 .../integration/functions/PulsarStateTest.java     | 70 +++++++++++++++++++---
 3 files changed, 127 insertions(+), 13 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 98994c97451..bad35e5443e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -28,7 +28,7 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace
 import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
 import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
 import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
-
+import com.google.common.base.Utf8;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -1098,10 +1098,13 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                     if (kv.isNumber()) {
                         value = new FunctionState(key, null, null, kv.numberValue(), kv.version());
                     } else {
-                        try {
-                            value = new FunctionState(key, new String(ByteBufUtil.getBytes(kv.value(), kv.value().readerIndex(), kv.value().readableBytes()), UTF_8), null, null, kv.version());
-                        } catch (Exception e) {
-                            value = new FunctionState(key, null, ByteBufUtil.getBytes(kv.value()), null, kv.version());
+                        byte[] bytes = ByteBufUtil.getBytes(kv.value());
+                        if (Utf8.isWellFormed(bytes)) {
+                            value = new FunctionState(key, new String(bytes, UTF_8),
+                                    null, null, kv.version());
+                        } else {
+                            value = new FunctionState(
+                                    key, null, bytes, null, kv.version());
                         }
                     }
                 }
@@ -1147,7 +1150,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
             log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
             throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
-        
+
         if (!key.equals(state.getKey())) {
             log.error("{}/{}/{} Bad putFunction Request, path key doesn't match key in json", tenant, namespace, functionName);
             throw new RestException(Status.BAD_REQUEST, "Path key doesn't match key in json");
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestByteStateSource.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestByteStateSource.java
new file mode 100644
index 00000000000..4fe382f5ce7
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestByteStateSource.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import java.util.Map;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
+
+public class TestByteStateSource implements Source<byte[]> {
+
+    private SourceContext sourceContext;
+
+    public static final String VALUE_BASE64 = "0a8001127e0a172e6576656e74732e437573746f6d65724372656174656412630a243"
+                                              + "2336366666263652d623038342d346631352d616565342d326330643135356131666"
+                                              + "36312026e311a3700000000000000000000000000000000000000000000000000000"
+                                              + "000000000000000000000000000000000000000000000000000000000";
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        sourceContext.putState("initial", ByteBuffer.wrap(Base64.getDecoder().decode(VALUE_BASE64)));
+        this.sourceContext = sourceContext;
+    }
+
+    @Override
+    public Record<byte[]> read() throws Exception {
+        Thread.sleep(50);
+        ByteBuffer initial = sourceContext.getState("initial");
+        sourceContext.putState("now", initial);
+        return initial::array;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index c5fb6ecffeb..5b9041b0916 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -18,6 +18,16 @@
  */
 package org.apache.pulsar.tests.integration.functions;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.base.Utf8;
+import java.util.Base64;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -40,13 +50,6 @@ import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
-import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
 /**
  * State related test cases.
  */
@@ -58,6 +61,11 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
 
     public static final String WORDCOUNT_PYTHON_FILE = "wordcount_function.py";
 
+    public static final String VALUE_BASE64 = "0a8001127e0a172e6576656e74732e437573746f6d65724372656174656412630a243"
+                                              + "2336366666263652d623038342d346631352d616565342d326330643135356131666"
+                                              + "36312026e311a3700000000000000000000000000000000000000000000000000000"
+                                              + "000000000000000000000000000000000000000000000000000000000";
+
     @Test(groups = {"python_state", "state", "function", "python_function"})
     public void testPythonWordCountFunction() throws Exception {
         String inputTopicName = "test-wordcount-py-input-" + randomName(8);
@@ -184,6 +192,54 @@ public class PulsarStateTest extends PulsarStandaloneTestSuite {
         getSinkInfoNotFound(sinkName);
     }
 
+    @Test(groups = {"java_state", "state", "function", "java_function"})
+    public void testBytes2StringNotUTF8() {
+        byte[] valueBytes = Base64.getDecoder().decode(VALUE_BASE64);
+        assertFalse(Utf8.isWellFormed(valueBytes));
+        assertNotEquals(valueBytes, new String(valueBytes, UTF_8).getBytes(UTF_8));
+    }
+
+    @Test(groups = {"java_state", "state", "function", "java_function"})
+    public void testSourceByteState() throws Exception {
+        String outputTopicName = "test-state-source-output-" + randomName(8);
+        String sourceName = "test-state-source-" + randomName(8);
+
+        submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestByteStateSource",  JAVAJAR);
+
+        // get source info
+        getSourceInfoSuccess(sourceName);
+
+        // get source status
+        getSourceStatus(sourceName);
+
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+
+            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+                SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+                assertEquals(status.getInstances().size(), 1);
+                assertTrue(status.getInstances().get(0).getStatus().numWritten > 0);
+            });
+
+            {
+                FunctionState functionState =
+                        admin.functions().getFunctionState("public", "default", sourceName, "initial");
+                assertNull(functionState.getStringValue());
+                assertEquals(functionState.getByteValue(), Base64.getDecoder().decode(VALUE_BASE64));
+            }
+
+            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+                FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "now");
+                assertNull(functionState.getStringValue());
+                assertEquals(functionState.getByteValue(), Base64.getDecoder().decode(VALUE_BASE64));
+            });
+        }
+
+        // delete source
+        deleteSource(sourceName);
+
+        getSourceInfoNotFound(sourceName);
+    }
+
     private void submitSourceConnector(String sourceName,
                                          String outputTopicName,
                                          String className,