You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:54 UTC
[pulsar] 08/38: [issue 6694][AVRO ENCODE] Reset cursor if message
encode fails. (#6695)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 14a1a2b9e8d81294625810413ff36191391103b7
Author: Shivji Kumar Jha <sh...@nutanix.com>
AuthorDate: Tue Apr 14 14:13:46 2020 +0530
[issue 6694][AVRO ENCODE] Reset cursor if message encode fails. (#6695)
Fixes #6694
### Motivation
If the avro encode for message fails after writing a few bytes, the cursor in the stream is not reset. The following **flush()** that would normally reset the cursor is skipped in the event of an exception.
### Modifications
Add **flush()** in the finally block.
### Test
Added test for verifying the fix
(cherry picked from commit 7cffe2ab7420b0001f1337ef6007448124fecf18)
---
.../client/impl/schema/writer/AvroWriter.java | 10 ++++++++--
.../pulsar/client/impl/schema/AvroSchemaTest.java | 23 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 2 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
index 41ea50f..a260ebf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
@@ -48,14 +48,20 @@ public class AvroWriter<T> implements SchemaWriter<T> {
@Override
public synchronized byte[] write(T message) {
+ byte[] outputBytes = null;
try {
writer.write(message, this.encoder);
- this.encoder.flush();
- return this.byteArrayOutputStream.toByteArray();
} catch (Exception e) {
throw new SchemaSerializationException(e);
} finally {
+ try {
+ this.encoder.flush();
+ outputBytes = this.byteArrayOutputStream.toByteArray();
+ } catch (Exception ex) {
+ throw new SchemaSerializationException(ex);
+ }
this.byteArrayOutputStream.reset();
}
+ return outputBytes;
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 49d5da9..8fd8154 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -38,6 +38,8 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.BufferedBinaryEncoder;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.avro.SchemaValidationException;
@@ -51,12 +53,14 @@ import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.avro.generated.NasaMission;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.json.JSONException;
import org.skyscreamer.jsonassert.JSONAssert;
+import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -366,7 +370,26 @@ public class AvroSchemaTest {
Foo object1 = avroSchema.decode(byteBuf);
Assert.assertTrue(bytes1.length > 0);
assertEquals(object1, foo1);
+ }
+
+ @Test
+ public void discardBufferIfBadAvroData() {
+ AvroWriter<NasaMission> avroWriter = new AvroWriter<>(
+ ReflectData.AllowNull.get().getSchema(NasaMission.class));
+
+ NasaMission badNasaMissionData = new NasaMission();
+ badNasaMissionData.setId(1);
+ // set null in the non-null field. The java set will accept it but going ahead, the avro encode will crash.
+ badNasaMissionData.setName(null);
+
+ // Because data does not conform to schema expect a crash
+ Assert.assertThrows( SchemaSerializationException.class, () -> avroWriter.write(badNasaMissionData));
+
+ // Get the buffered data using powermock
+ BinaryEncoder encoder = Whitebox.getInternalState(avroWriter, "encoder");
+ // Assert that the buffer position is reset to zero
+ Assert.assertEquals(((BufferedBinaryEncoder)encoder).bytesBuffered(), 0);
}
}