You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/19 22:07:13 UTC

[samza] branch master updated: SAMZA-2132: Flatten startpoint key when serialized. (#956)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 98355db  SAMZA-2132: Flatten startpoint key when serialized. (#956)
98355db is described below

commit 98355db1c61e92ceeb64b839f554733ad993f7d0
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Tue Mar 19 15:07:09 2019 -0700

    SAMZA-2132: Flatten startpoint key when serialized. (#956)
    
    * Flatten startpoint key when serialized.
    
    * Provide custom JsonSerializer for StartpointKey.
---
 .../org/apache/samza/startpoint/StartpointKey.java |  4 +-
 .../samza/startpoint/StartpointKeySerializer.java  | 80 ++++++++++++++++++++++
 .../apache/samza/startpoint/TestStartpointKey.java | 19 +++++
 3 files changed, 101 insertions(+), 2 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointKey.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointKey.java
index 1573d41..7d4753d 100644
--- a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointKey.java
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointKey.java
@@ -22,10 +22,10 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.SystemStreamPartition;
-import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
 
 
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
+@JsonSerialize(using = StartpointKeySerializer.class)
 class StartpointKey {
   private final SystemStreamPartition systemStreamPartition;
   private final TaskName taskName;
diff --git a/samza-core/src/main/java/org/apache/samza/startpoint/StartpointKeySerializer.java b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointKeySerializer.java
new file mode 100644
index 0000000..d347fe7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/startpoint/StartpointKeySerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *//*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *//*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.startpoint;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.SerializerProvider;
+
+
+final class StartpointKeySerializer extends JsonSerializer<StartpointKey> {
+  @Override
+  public void serialize(StartpointKey startpointKey, JsonGenerator jsonGenerator, SerializerProvider provider) throws
+                                                                                                               IOException {
+    Map<String, Object> systemStreamPartitionMap = new HashMap<>();
+    SystemStreamPartition systemStreamPartition = startpointKey.getSystemStreamPartition();
+    TaskName taskName = startpointKey.getTaskName();
+    systemStreamPartitionMap.put("system", systemStreamPartition.getSystem());
+    systemStreamPartitionMap.put("stream", systemStreamPartition.getStream());
+    systemStreamPartitionMap.put("partition", systemStreamPartition.getPartition().getPartitionId());
+    if (taskName != null) {
+      systemStreamPartitionMap.put("taskName", taskName.getTaskName());
+    }
+    jsonGenerator.writeObject(systemStreamPartitionMap);
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointKey.java b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointKey.java
index 1bd7d8c..72f922f 100644
--- a/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointKey.java
+++ b/samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointKey.java
@@ -18,15 +18,19 @@
  */
 package org.apache.samza.startpoint;
 
+import java.io.IOException;
+import java.util.LinkedHashMap;
 import org.apache.samza.Partition;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.SystemStreamPartition;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Assert;
 import org.junit.Test;
 
 
 public class TestStartpointKey {
+
   @Test
   public void testStartpointKey() {
     SystemStreamPartition ssp1 = new SystemStreamPartition("system", "stream", new Partition(2));
@@ -61,4 +65,19 @@ public class TestStartpointKey {
     Assert.assertNotEquals(new String(new JsonSerdeV2<>().toBytes(startpointKeyWithTask1)),
         new String(new JsonSerdeV2<>().toBytes(startpointKeyWithDifferentTask)));
   }
+
+  @Test
+  public void testStartpointKeyFormat() throws IOException {
+    SystemStreamPartition ssp = new SystemStreamPartition("system1", "stream1", new Partition(2));
+    StartpointKey startpointKeyWithTask = new StartpointKey(ssp, new TaskName("t1"));
+    ObjectMapper objectMapper = new ObjectMapper();
+    byte[] jsonBytes = new JsonSerdeV2<>().toBytes(startpointKeyWithTask);
+    LinkedHashMap<String, String> deserialized = objectMapper.readValue(jsonBytes, LinkedHashMap.class);
+
+    Assert.assertEquals(4, deserialized.size());
+    Assert.assertEquals("system1", deserialized.get("system"));
+    Assert.assertEquals("stream1", deserialized.get("stream"));
+    Assert.assertEquals(2, deserialized.get("partition"));
+    Assert.assertEquals("t1", deserialized.get("taskName"));
+  }
 }