You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:54 UTC

[pulsar] 08/38: [issue 6694][AVRO ENCODE] Reset cursor if message encode fails. (#6695)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 14a1a2b9e8d81294625810413ff36191391103b7
Author: Shivji Kumar Jha <sh...@nutanix.com>
AuthorDate: Tue Apr 14 14:13:46 2020 +0530

    [issue 6694][AVRO ENCODE] Reset cursor if message encode fails. (#6695)
    
    Fixes #6694
    
    ### Motivation
    
    If the avro encode for message fails after writing a few bytes, the cursor in the stream is not reset. The following **flush()** that would normally reset the cursor is skipped in the event of an exception.
    
    ### Modifications
    
    Add **flush()** in the finally block.
    
    ### Test
    Added test for verifying the fix
    (cherry picked from commit 7cffe2ab7420b0001f1337ef6007448124fecf18)
---
 .../client/impl/schema/writer/AvroWriter.java      | 10 ++++++++--
 .../pulsar/client/impl/schema/AvroSchemaTest.java  | 23 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
index 41ea50f..a260ebf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
@@ -48,14 +48,20 @@ public class AvroWriter<T> implements SchemaWriter<T> {
 
     @Override
     public synchronized byte[] write(T message) {
+        byte[] outputBytes = null;
         try {
             writer.write(message, this.encoder);
-            this.encoder.flush();
-            return this.byteArrayOutputStream.toByteArray();
         } catch (Exception e) {
             throw new SchemaSerializationException(e);
         } finally {
+            try {
+                this.encoder.flush();
+                outputBytes = this.byteArrayOutputStream.toByteArray();
+            } catch (Exception ex) {
+                throw new SchemaSerializationException(ex);
+            }
             this.byteArrayOutputStream.reset();
         }
+        return outputBytes;
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 49d5da9..8fd8154 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -38,6 +38,8 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.BufferedBinaryEncoder;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.avro.SchemaValidationException;
@@ -51,12 +53,14 @@ import org.apache.pulsar.client.api.schema.SchemaBuilder;
 import org.apache.pulsar.client.avro.generated.NasaMission;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.joda.time.DateTime;
 import org.joda.time.chrono.ISOChronology;
 import org.json.JSONException;
 import org.skyscreamer.jsonassert.JSONAssert;
+import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -366,7 +370,26 @@ public class AvroSchemaTest {
         Foo object1 = avroSchema.decode(byteBuf);
         Assert.assertTrue(bytes1.length > 0);
         assertEquals(object1, foo1);
+    }
+
+    @Test
+    public void discardBufferIfBadAvroData() {
+        AvroWriter<NasaMission> avroWriter = new AvroWriter<>(
+                ReflectData.AllowNull.get().getSchema(NasaMission.class));
+
+        NasaMission badNasaMissionData = new NasaMission();
+        badNasaMissionData.setId(1);
+        // set null in the non-null field. The java set will accept it but going ahead, the avro encode will crash.
+        badNasaMissionData.setName(null);
+
+        // Because data does not conform to schema expect a crash
+        Assert.assertThrows( SchemaSerializationException.class, () -> avroWriter.write(badNasaMissionData));
+
+        // Get the buffered data using powermock
+        BinaryEncoder encoder = Whitebox.getInternalState(avroWriter, "encoder");
 
+        // Assert that the buffer position is reset to zero
+        Assert.assertEquals(((BufferedBinaryEncoder)encoder).bytesBuffered(), 0);
     }
 
 }