You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ma...@apache.org on 2014/05/23 18:05:15 UTC

git commit: SAMZA-269: SerdeManager should pass through envelope objects if not modified

Repository: incubator-samza
Updated Branches:
  refs/heads/master 873ea6911 -> 4601fdfc2


SAMZA-269: SerdeManager should pass through envelope objects if not modified


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4601fdfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4601fdfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4601fdfc

Branch: refs/heads/master
Commit: 4601fdfc2bdcc14a3263ebe5f26d9f0e02fa5abb
Parents: 873ea69
Author: Martin Kleppmann <mk...@linkedin.com>
Authored: Fri May 23 15:28:29 2014 +0100
Committer: Martin Kleppmann <mk...@linkedin.com>
Committed: Fri May 23 15:30:17 2014 +0100

----------------------------------------------------------------------
 .../apache/samza/serializers/SerdeManager.scala | 32 ++++++++------
 .../samza/serializers/TestSerdeManager.scala    | 45 ++++++++++++++++++++
 2 files changed, 65 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4601fdfc/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
index 4f3ff6e..066d894 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
@@ -72,13 +72,17 @@ class SerdeManager(
       envelope.getMessage
     }
 
-    new OutgoingMessageEnvelope(
-      envelope.getSystemStream,
-      null,
-      null,
-      envelope.getPartitionKey,
-      key,
-      message)
+    if ((key eq envelope.getKey) && (message eq envelope.getMessage)) {
+      envelope
+    } else {
+      new OutgoingMessageEnvelope(
+        envelope.getSystemStream,
+        null,
+        null,
+        envelope.getPartitionKey,
+        key,
+        message)
+    }
   }
 
   def fromBytes(bytes: Array[Byte], deserializerName: String) = serdes
@@ -114,10 +118,14 @@ class SerdeManager(
       envelope.getMessage
     }
 
-    new IncomingMessageEnvelope(
-      envelope.getSystemStreamPartition,
-      envelope.getOffset,
-      key,
-      message)
+    if ((key eq envelope.getKey) && (message eq envelope.getMessage)) {
+      envelope
+    } else {
+      new IncomingMessageEnvelope(
+        envelope.getSystemStreamPartition,
+        envelope.getOffset,
+        key,
+        message)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4601fdfc/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
new file mode 100644
index 0000000..646d0c8
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.samza.system.OutgoingMessageEnvelope
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+
+class TestSerdeManager {
+  @Test
+  def testNullSerializationReturnsIdenticalObject {
+    val original = new OutgoingMessageEnvelope(new SystemStream("my-system", "my-stream"), "message")
+    val serialized = new SerdeManager().toBytes(original)
+    assertSame(original, serialized)
+  }
+
+  @Test
+  def testNullDeserializationReturnsIdenticalObject {
+    val ssp = new SystemStreamPartition("my-system", "my-stream", new Partition(0))
+    val original = new IncomingMessageEnvelope(ssp, "123", null, "message")
+    val deserialized = new SerdeManager().fromBytes(original)
+    assertSame(original, deserialized)
+  }
+}
\ No newline at end of file