You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/02/05 01:35:37 UTC

[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1580: SAMZA-2689 [Elasticity] generate JobModel and update RunLoop with (elastic) tasks processing keyBuckets of SystemStreamPartition

lakshmi-manasa-g commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r799921230



##########
File path: samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
##########
@@ -187,6 +187,77 @@ public void testDeserializeContainerModelIdFieldOnly() throws IOException {
     deserializeFromObjectNode(jobModelJson);
   }
 
+  @Test
+  public void testSerializeSystemStreamPartition() throws IOException {
+    // case 1: keyBucket not explicitly mentioned
+    SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1));
+    String serializedString = this.samzaObjectMapper.writeValueAsString(ssp);
+
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    ObjectNode sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+
+    // use a plain ObjectMapper to read JSON to make comparison easier
+    ObjectNode serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString);
+    ObjectNode expectedJson = sspJson;
+
+    assertEquals(expectedJson.get("system"), serializedAsJson.get("system"));
+    assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream"));
+    assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition"));
+
+    //Case 2: with non-null keyBucket
+    ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
+    serializedString = this.samzaObjectMapper.writeValueAsString(ssp);
+
+    sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+    sspJson.put("keyBucket", 1);
+
+    // use a plain ObjectMapper to read JSON to make comparison easier
+    serializedAsJson = (ObjectNode) new ObjectMapper().readTree(serializedString);
+    expectedJson = sspJson;
+
+    assertEquals(expectedJson.get("system"), serializedAsJson.get("system"));
+    assertEquals(expectedJson.get("stream"), serializedAsJson.get("stream"));
+    assertEquals(expectedJson.get("partition"), serializedAsJson.get("partition"));
+    assertEquals(expectedJson.get("keyBucket"), serializedAsJson.get("keyBucket"));
+  }
+
+  @Test
+  public void testDeserializeSystemStreamPartition() throws IOException {
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    // case 1: keyBucket not explicitly mentioned
+    ObjectNode sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+
+    SystemStreamPartition ssp = new SystemStreamPartition("foo", "bar", new Partition(1));
+    String jsonString = new ObjectMapper().writeValueAsString(sspJson);
+    SystemStreamPartition deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class);
+
+    assertEquals(ssp, deserSSP);
+
+    // case 2: explicitly set key bucket
+    sspJson = objectMapper.createObjectNode();
+    sspJson.put("system", "foo");
+    sspJson.put("stream", "bar");
+    sspJson.put("partition", 1);
+    sspJson.put("keyBucket", 1);
+
+    ssp = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
+    jsonString = new ObjectMapper().writeValueAsString(sspJson);
+    deserSSP = this.samzaObjectMapper.readValue(jsonString, SystemStreamPartition.class);
+
+    assertEquals(ssp, deserSSP);
+  }
+

Review comment:
       added two scenarios -> one where pre elastic aka old-samza-object-mapper serializes and new one deserializes and another scenario where new one serializes and old one deserializes. pl lmk if this makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org