You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/10/01 22:26:54 UTC

[samza] branch master updated: SAMZA-2587: IntermediateMessageSerde exception handling (#1426)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d12af34  SAMZA-2587: IntermediateMessageSerde exception handling (#1426)
d12af34 is described below

commit d12af34de53338048e9eb81f8e46019cd9f5ba06
Author: Yixing Zhang <to...@gmail.com>
AuthorDate: Thu Oct 1 15:26:44 2020 -0700

    SAMZA-2587: IntermediateMessageSerde exception handling (#1426)
    
    Upgrade Instructions: For users that are upgrading directly from samza version 0.13.0 or older versions: A message type of intermediate messages was introduced in samza 0.13.1. For samza 0.13.0 or older versions, the first byte of MessageType doesn't exist in the bytes. Thus, upgrading from those versions will fail. There are three ways to fix this issue:
    a) Reset checkpoint to consume from newest message in the intermediate stream
    b) Clean all existing messages in the intermediate stream
    c) Run the application in any version between 0.13.1 and 1.5 until all old messages in intermediate stream has reached retention time.
    
    Co-authored-by: Yixing Zhang <yi...@linkedin.com>
---
 .../serializers/IntermediateMessageSerde.java      | 29 ++++++++++++----------
 .../serializers/TestIntermediateMessageSerde.java  | 27 ++++++++++++++++++++
 2 files changed, 43 insertions(+), 13 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
index a8f9852..83a0a35 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
@@ -67,7 +67,21 @@ public class IntermediateMessageSerde implements Serde<Object> {
   public Object fromBytes(byte[] bytes) {
     try {
       final Object object;
-      final MessageType type = MessageType.values()[bytes[0]];
+      final MessageType type;
+      try {
+        type = MessageType.values()[bytes[0]];
+      } catch (ArrayIndexOutOfBoundsException e) {
+        // The message type was introduced in samza 0.13.1. For samza 0.13.0 or older versions, the first byte of
+        // MessageType doesn't exist in the bytes. Thus, upgrading from those versions will get this exception.
+        // There are three ways to solve this issue:
+        // a) Reset checkpoint to consume from newest message in the intermediate stream
+        // b) clean all existing messages in the intermediate stream
+        // c) Run the application in any version between 0.13.1 and 1.5 until all old messages in intermediate stream
+        // has reached retention time.
+        throw new SamzaException("Error reading the message type from intermediate message. This may happen if you "
+            + "have recently upgraded from samza version older than 0.13.1 or there are still old messages in the "
+            + "intermediate stream.", e);
+      }
       final byte[] data = Arrays.copyOfRange(bytes, 1, bytes.length);
       switch (type) {
         case USER_MESSAGE:
@@ -83,21 +97,10 @@ public class IntermediateMessageSerde implements Serde<Object> {
           throw new UnsupportedOperationException(String.format("Message type %s is not supported", type.name()));
       }
       return object;
-
     } catch (UnsupportedOperationException ue) {
       throw new SamzaException(ue);
     } catch (Exception e) {
-      // This will catch the following exceptions:
-      // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception
-      // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception
-      // For these cases, we fall back to user-provided serde
-      try {
-        return userMessageSerde.fromBytes(bytes);
-      } catch (Exception umse) {
-        LOGGER.error("Error deserializing from both intermediate message serde and user message serde. "
-            + "Original exception: ", e);
-        throw umse;
-      }
+      throw e;
     }
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
index 7a3faca..22250fc 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
@@ -24,15 +24,20 @@ import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.Arrays;
 import org.apache.samza.serializers.IntermediateMessageSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.system.MessageType;
 import org.apache.samza.system.WatermarkMessage;
+import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
 
 
 public class TestIntermediateMessageSerde {
@@ -126,4 +131,26 @@ public class TestIntermediateMessageSerde {
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getVersion(), 1);
   }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testUserMessageSerdeException() {
+    Serde<?> mockUserMessageSerde = mock(Serde.class);
+    when(mockUserMessageSerde.fromBytes(anyObject())).then(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        byte[] bytes = invocation.getArgumentAt(0, byte[].class);
+        if (Arrays.equals(bytes, new byte[]{1, 2})) {
+          throw new IllegalArgumentException("User message serde failed to deserialize this message.");
+        } else {
+          // Intermediate message serde shouldn't try to deserialize user message with wrong bytes
+          Assert.fail();
+          return null;
+        }
+      }
+    });
+
+    IntermediateMessageSerde imserde = new IntermediateMessageSerde(mockUserMessageSerde);
+    byte[] bytes = new byte[]{0, 1, 2};
+    imserde.fromBytes(bytes);
+  }
 }