You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/09/02 00:01:48 UTC

[GitHub] [samza] Zhangyx39 opened a new pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Zhangyx39 opened a new pull request #1426:
URL: https://github.com/apache/samza/pull/1426


   Symptom: The user provided serde failed to deserialize a message. Then, IntermediateMessageSerde tried to deserialize the message for the second time, which caused OOM and container died.
    
   Direct cause: The user provided serde would construct an array based on the encoded array size. Given wrong size, the serde constructed a huge array and caused OOM.
   
   Root cause: In samza 0.13.1, we added a byte to the head of the payload. The byte represents the message type (event|watermark|EOS). During deserialization, IntermediateMessageSerde will read the first byte, then deserialize the message according to the message byte. For compatibility, if it fails to read the message type, it will try to deserialize again with all bytes (including the first byte). More details in this PR: https://github.com/apache/samza/pull/207
    
   Changes: We should remove the second try. This will make upgrades from 0.13 to master to fail. Workaround is upgrading to 1.4/1.5 instead or resetting the checkpoint of intermediate topic to newest.
    
   Tests: Added unit test: TestIntermediateMessageSerde.testUserMessageSerdeException()


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r482327503



##########
File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -83,21 +83,17 @@ public Object fromBytes(byte[] bytes) {
           throw new UnsupportedOperationException(String.format("Message type %s is not supported", type.name()));
       }
       return object;
-
     } catch (UnsupportedOperationException ue) {
       throw new SamzaException(ue);
     } catch (Exception e) {
       // This will catch the following exceptions:
       // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception
       // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception
-      // For these cases, we fall back to user-provided serde
-      try {
-        return userMessageSerde.fromBytes(bytes);
-      } catch (Exception umse) {
-        LOGGER.error("Error deserializing from both intermediate message serde and user message serde. "
-            + "Original exception: ", e);
-        throw umse;
-      }
+      // For these cases, we WILL NOT fall back to user-provided serde. Thus, we are not compatible with upgrade
+      // directly from samza version older than 0.13.1.
+      LOGGER.error("Error deserializing with intermediate message serde. If you are upgrading from samza version older"
+          + " than 0.13.1, please upgrade to samza 1.5 first.");

Review comment:
       Just to clarify: Is the idea that this PR will go into Samza 1.6? So upgrading to samza 1.5 first will convert all intermediate messages into the new format, which means going to 1.6 after that will then be compatible?
   Maybe consider adding a comment to the code about why upgrading to Samza 1.5 will help.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r482317863



##########
File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -83,21 +83,17 @@ public Object fromBytes(byte[] bytes) {
           throw new UnsupportedOperationException(String.format("Message type %s is not supported", type.name()));
       }
       return object;
-
     } catch (UnsupportedOperationException ue) {
       throw new SamzaException(ue);
     } catch (Exception e) {
       // This will catch the following exceptions:
       // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception
       // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception
-      // For these cases, we fall back to user-provided serde
-      try {
-        return userMessageSerde.fromBytes(bytes);
-      } catch (Exception umse) {
-        LOGGER.error("Error deserializing from both intermediate message serde and user message serde. "
-            + "Original exception: ", e);
-        throw umse;
-      }
+      // For these cases, we WILL NOT fall back to user-provided serde. Thus, we are not compatible with upgrade
+      // directly from samza version older than 0.13.1.

Review comment:
       Consider also adding a little clarification about what versions older than 0.13.1 did to make those versions incompatible with this new code.

##########
File path: samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
##########
@@ -126,4 +128,14 @@ public void testEndOfStreamMessageSerde() {
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getVersion(), 1);
   }
+
+  @Test (expected = SamzaException.class)
+  public void testUserMessageSerdeException() {
+    IntermediateMessageSerde imserde = new IntermediateMessageSerde(new ObjectSerde());
+    IntermediateMessageSerde imserde2 = new IntermediateMessageSerde(new JsonSerdeV2<>());

Review comment:
       Instead of depending on `JsonSerdeV2` for this test, consider creating a mock `Serde` to do what you need.
   In general, using mocks is helpful so that you don't rely on a specific implementation of something which is subject to change in the future. For example, what if `JsonSerdeV2` was changed to stop throwing an exception for certain kinds of data, or if it was changed to throw a different kind of exception than `SamzaException`?

##########
File path: samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
##########
@@ -126,4 +128,14 @@ public void testEndOfStreamMessageSerde() {
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getVersion(), 1);
   }
+
+  @Test (expected = SamzaException.class)
+  public void testUserMessageSerdeException() {

Review comment:
       It seems like this test would have succeeded with the previous version of `IntermediateMessageSerde` as well. Could you please try to come up with a test that would only succeed with your new version of `IntermediateMessageSerde`?
   For example, you could mock the `imserde2` and then verify that it only gets called with a certain byte array.

##########
File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -83,21 +83,17 @@ public Object fromBytes(byte[] bytes) {
           throw new UnsupportedOperationException(String.format("Message type %s is not supported", type.name()));
       }
       return object;
-
     } catch (UnsupportedOperationException ue) {
       throw new SamzaException(ue);
     } catch (Exception e) {
       // This will catch the following exceptions:
       // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception
       // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception
-      // For these cases, we fall back to user-provided serde
-      try {
-        return userMessageSerde.fromBytes(bytes);
-      } catch (Exception umse) {
-        LOGGER.error("Error deserializing from both intermediate message serde and user message serde. "
-            + "Original exception: ", e);
-        throw umse;
-      }
+      // For these cases, we WILL NOT fall back to user-provided serde. Thus, we are not compatible with upgrade
+      // directly from samza version older than 0.13.1.
+      LOGGER.error("Error deserializing with intermediate message serde. If you are upgrading from samza version older"
+          + " than 0.13.1, please upgrade to samza 1.5 first.");

Review comment:
       Just to clarify: Is the idea that this PR will go into Samza 1.6? So upgrading to samza 1.5 first will convert all intermediate messages into the new format, which means going to 1.6 after that will then be compatible?
   Maybe consider adding a comment here about why upgrading to Samza 1.5 will help.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Zhangyx39 commented on a change in pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
Zhangyx39 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r485995739



##########
File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -83,21 +83,17 @@ public Object fromBytes(byte[] bytes) {
           throw new UnsupportedOperationException(String.format("Message type %s is not supported", type.name()));
       }
       return object;
-
     } catch (UnsupportedOperationException ue) {
       throw new SamzaException(ue);
     } catch (Exception e) {
       // This will catch the following exceptions:
       // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception
       // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception
-      // For these cases, we fall back to user-provided serde
-      try {
-        return userMessageSerde.fromBytes(bytes);
-      } catch (Exception umse) {
-        LOGGER.error("Error deserializing from both intermediate message serde and user message serde. "
-            + "Original exception: ", e);
-        throw umse;
-      }
+      // For these cases, we WILL NOT fall back to user-provided serde. Thus, we are not compatible with upgrade
+      // directly from samza version older than 0.13.1.
+      LOGGER.error("Error deserializing with intermediate message serde. If you are upgrading from samza version older"
+          + " than 0.13.1, please upgrade to samza 1.5 first.");

Review comment:
       Good suggestion. I have removed this error log and added a new exception when failing to read the message type. Also added comments to explain what's the issue and how to solve it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r482358915



##########
File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -83,21 +83,17 @@ public Object fromBytes(byte[] bytes) {
           throw new UnsupportedOperationException(String.format("Message type %s is not supported", type.name()));
       }
       return object;
-
     } catch (UnsupportedOperationException ue) {
       throw new SamzaException(ue);
     } catch (Exception e) {
       // This will catch the following exceptions:
       // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception
       // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception
-      // For these cases, we fall back to user-provided serde
-      try {
-        return userMessageSerde.fromBytes(bytes);
-      } catch (Exception umse) {
-        LOGGER.error("Error deserializing from both intermediate message serde and user message serde. "
-            + "Original exception: ", e);
-        throw umse;
-      }
+      // For these cases, we WILL NOT fall back to user-provided serde. Thus, we are not compatible with upgrade
+      // directly from samza version older than 0.13.1.
+      LOGGER.error("Error deserializing with intermediate message serde. If you are upgrading from samza version older"
+          + " than 0.13.1, please upgrade to samza 1.5 first.");

Review comment:
       If a user upgraded to 0.13.1+ some time ago, they could theoretically run into an error if they just still happened to have some intermediate messages from before 0.13.1 in their topic (e.g. they have really long retention) and they got processed for some reason. So maybe you can generalize the log message a little bit more to also cover the case if there are still messages from before 0.13.1 in their stream.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Zhangyx39 commented on a change in pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
Zhangyx39 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r498420319



##########
File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -67,7 +67,20 @@ public IntermediateMessageSerde(Serde userMessageSerde) {
   public Object fromBytes(byte[] bytes) {
     try {
       final Object object;
-      final MessageType type = MessageType.values()[bytes[0]];
+      final MessageType type;
+      try {
+        type = MessageType.values()[bytes[0]];
+      } catch (ArrayIndexOutOfBoundsException e) {

Review comment:
       Sorry for the late reply. I moved the exception from the outer catch to here because I think:
   1. If this exception is triggered, the failure must be caused by the old format. The exception will tell the user exactly what is the issue.
   2. The chance of old message having a first byte of 0 to 2 is relatively small. In that case, users will still see the correct exception if they have multiple containers (with multiple exceptions) or if they look at the source code.
   3. New message with user serde error won't have this exception about message type. So users won't be misled by it. And I think this kind of failure is majority.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Zhangyx39 commented on a change in pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
Zhangyx39 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r498409592



##########
File path: samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
##########
@@ -126,4 +132,25 @@ public void testEndOfStreamMessageSerde() {
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getVersion(), 1);
   }
+
+  @Test(expected = SamzaException.class)
+  public void testUserMessageSerdeException() {
+    Serde<?> mockUserMessageSerde = mock(Serde.class);
+    when(mockUserMessageSerde.fromBytes(anyObject())).then(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        byte[] bytes = invocation.getArgumentAt(0, byte[].class);
+        if (Arrays.equals(bytes, new byte[]{1, 2})) {
+          throw new SamzaException("User message serde failed to deserialize this message.");

Review comment:
       Good suggestion. Updated to use IllegalArgumentException.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on a change in pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r486568101



##########
File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -67,7 +67,20 @@ public IntermediateMessageSerde(Serde userMessageSerde) {
   public Object fromBytes(byte[] bytes) {
     try {
       final Object object;
-      final MessageType type = MessageType.values()[bytes[0]];
+      final MessageType type;
+      try {
+        type = MessageType.values()[bytes[0]];
+      } catch (ArrayIndexOutOfBoundsException e) {
+        // The message type was introduced in samza 0.13.1. For samza 0.13.0 or older versions, the first byte of
+        // MessageType doesn't exist in the bytes. Thus, upgrading from those versions will get this exception.
+        // There are two ways to solve this issue:
+        // a) Reset checkpoint or clean all messages for the intermediate topic

Review comment:
       1) Could you please add this point to the PR description as well?
   2) "Reset checkpoint" means "reset to newest message in the stream", right? Could you please make sure that is clear in the doc/description?

##########
File path: samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
##########
@@ -126,4 +132,25 @@ public void testEndOfStreamMessageSerde() {
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getVersion(), 1);
   }
+
+  @Test(expected = SamzaException.class)
+  public void testUserMessageSerdeException() {
+    Serde<?> mockUserMessageSerde = mock(Serde.class);
+    when(mockUserMessageSerde.fromBytes(anyObject())).then(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        byte[] bytes = invocation.getArgumentAt(0, byte[].class);
+        if (Arrays.equals(bytes, new byte[]{1, 2})) {
+          throw new SamzaException("User message serde failed to deserialize this message.");

Review comment:
       Since you have control over it, consider using something other than `SamzaException` for this mock and validation, since `SamzaException` is something that gets thrown a lot by core Samza classes.
   It's just a nice small thing you can do to help make tests a little more robust, in case someone introduces a bug in the future.

##########
File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -67,7 +67,20 @@ public IntermediateMessageSerde(Serde userMessageSerde) {
   public Object fromBytes(byte[] bytes) {
     try {
       final Object object;
-      final MessageType type = MessageType.values()[bytes[0]];
+      final MessageType type;
+      try {
+        type = MessageType.values()[bytes[0]];
+      } catch (ArrayIndexOutOfBoundsException e) {

Review comment:
       Is it possible that this serde gets the old message format but `bytes[0]` is still small enough that there is no `ArrayIndexOutOfBoundsException`? In that case, you wouldn't see your message about the upgrade from `0.13.0`.
   I think it was good to have this exception message where you had it before in the outer `catch`, since then you could at least warn users in all cases. You might also want to mention in the exception message that the error could just be due to a user serde error, so that users aren't misled into thinking that their issue is definitely the `0.13.0` upgrade issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Zhangyx39 commented on a change in pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
Zhangyx39 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r485995915



##########
File path: samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
##########
@@ -126,4 +128,14 @@ public void testEndOfStreamMessageSerde() {
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getVersion(), 1);
   }
+
+  @Test (expected = SamzaException.class)
+  public void testUserMessageSerdeException() {
+    IntermediateMessageSerde imserde = new IntermediateMessageSerde(new ObjectSerde());
+    IntermediateMessageSerde imserde2 = new IntermediateMessageSerde(new JsonSerdeV2<>());

Review comment:
       Good suggestion. I have updated to use a mocked serde.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Zhangyx39 commented on a change in pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
Zhangyx39 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r485994885



##########
File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -83,21 +83,17 @@ public Object fromBytes(byte[] bytes) {
           throw new UnsupportedOperationException(String.format("Message type %s is not supported", type.name()));
       }
       return object;
-
     } catch (UnsupportedOperationException ue) {
       throw new SamzaException(ue);
     } catch (Exception e) {
       // This will catch the following exceptions:
       // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception
       // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception
-      // For these cases, we fall back to user-provided serde
-      try {
-        return userMessageSerde.fromBytes(bytes);
-      } catch (Exception umse) {
-        LOGGER.error("Error deserializing from both intermediate message serde and user message serde. "
-            + "Original exception: ", e);
-        throw umse;
-      }
+      // For these cases, we WILL NOT fall back to user-provided serde. Thus, we are not compatible with upgrade
+      // directly from samza version older than 0.13.1.

Review comment:
       Good point. Clarification added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Zhangyx39 commented on a change in pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
Zhangyx39 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r498413299



##########
File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -67,7 +67,20 @@ public IntermediateMessageSerde(Serde userMessageSerde) {
   public Object fromBytes(byte[] bytes) {
     try {
       final Object object;
-      final MessageType type = MessageType.values()[bytes[0]];
+      final MessageType type;
+      try {
+        type = MessageType.values()[bytes[0]];
+      } catch (ArrayIndexOutOfBoundsException e) {
+        // The message type was introduced in samza 0.13.1. For samza 0.13.0 or older versions, the first byte of
+        // MessageType doesn't exist in the bytes. Thus, upgrading from those versions will get this exception.
+        // There are two ways to solve this issue:
+        // a) Reset checkpoint or clean all messages for the intermediate topic

Review comment:
       Makes sense. Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] Zhangyx39 commented on a change in pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
Zhangyx39 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r485996461



##########
File path: samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
##########
@@ -126,4 +128,14 @@ public void testEndOfStreamMessageSerde() {
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getVersion(), 1);
   }
+
+  @Test (expected = SamzaException.class)
+  public void testUserMessageSerdeException() {

Review comment:
       Updated unit test to fail if the bytes given are not expected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 merged pull request #1426: SAMZA-2587: IntermediateMessageSerde exception handling

Posted by GitBox <gi...@apache.org>.
cameronlee314 merged pull request #1426:
URL: https://github.com/apache/samza/pull/1426


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org