You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/08 08:30:33 UTC

[incubator-servicecomb-saga] 01/04: SCB-174 add kryo serializer

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 828b2a769dd8aa16562fa11fbf9c568f3f4ba00d
Author: Eric Lee <da...@huawei.com>
AuthorDate: Sat Jan 6 19:28:52 2018 +0800

    SCB-174 add kryo serializer
    
    Signed-off-by: Eric Lee <da...@huawei.com>
---
 omega/omega-format/pom.xml                         |  4 ++
 .../saga/omega/format/KryoMessageFormat.java       | 52 ++++++++++++++++
 .../saga/omega/format/MessageFormat.java           | 34 +++++++++++
 .../saga/omega/format/NativeMessageFormat.java     | 14 +----
 .../saga/omega/format/KryoMessageFormatTest.java   | 59 ++++++++++++++++++
 ...eFormatTest.java => MessageFormatTestBase.java} | 27 ++++-----
 .../saga/omega/format/NativeMessageFormatTest.java | 69 ++++++++++------------
 .../saga/omega/spring/OmegaSpringConfig.java       |  5 +-
 pom.xml                                            |  5 ++
 9 files changed, 201 insertions(+), 68 deletions(-)

diff --git a/omega/omega-format/pom.xml b/omega/omega-format/pom.xml
index a842cbb..f34f323 100644
--- a/omega/omega-format/pom.xml
+++ b/omega/omega-format/pom.xml
@@ -33,6 +33,10 @@
       <groupId>org.apache.servicecomb.saga</groupId>
       <artifactId>omega-transaction</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.esotericsoftware</groupId>
+      <artifactId>kryo</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>junit</groupId>
diff --git a/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormat.java b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormat.java
new file mode 100644
index 0000000..2a0dd8d
--- /dev/null
+++ b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormat.java
@@ -0,0 +1,52 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.format;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class KryoMessageFormat implements MessageFormat {
+
+  private static final int DEFAULT_BUFFER_SIZE = 4096;
+
+  private static final Kryo kryo = new Kryo();
+
+  @Override
+  public byte[] serialize(Object[] objects) {
+    Output output = new Output(DEFAULT_BUFFER_SIZE, -1);
+    kryo.writeObjectOrNull(output, objects, Object[].class);
+
+    return output.toBytes();
+  }
+
+  @Override
+  public Object[] deserialize(byte[] message) {
+    try {
+      Input input = new Input(new ByteArrayInputStream(message));
+      return kryo.readObjectOrNull(input, Object[].class);
+    } catch (KryoException e) {
+      throw new OmegaException("Unable to deserialize message", e);
+    }
+  }
+}
diff --git a/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/MessageFormat.java b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/MessageFormat.java
new file mode 100644
index 0000000..8adde72
--- /dev/null
+++ b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/MessageFormat.java
@@ -0,0 +1,34 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.format;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
+import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+
+interface MessageFormat extends MessageSerializer, MessageDeserializer {
+  @Override
+  default byte[] serialize(TxEvent event) {
+    try {
+      return serialize(event.payloads());
+    } catch (OmegaException e) {
+      throw new OmegaException("Unable to serialize event with global tx id " + event.globalTxId(), e);
+    }
+  }
+}
diff --git a/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormat.java b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormat.java
index a486e1d..383afeb 100644
--- a/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormat.java
+++ b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormat.java
@@ -23,21 +23,9 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
-import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
 
-public class NativeMessageFormat implements MessageSerializer, MessageDeserializer {
-  @Override
-  public byte[] serialize(TxEvent event) {
-    try {
-      return serialize(event.payloads());
-    } catch (OmegaException e) {
-      throw new OmegaException("Unable to serialize event with global tx id " + event.globalTxId(), e);
-    }
-  }
-
+public class NativeMessageFormat implements MessageFormat {
   @Override
   public byte[] serialize(Object[] objects) {
     try {
diff --git a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormatTest.java b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormatTest.java
new file mode 100644
index 0000000..434067e
--- /dev/null
+++ b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormatTest.java
@@ -0,0 +1,59 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.format;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class KryoMessageFormatTest extends MessageFormatTestBase {
+
+  @BeforeClass
+  public static void setUp() {
+    format = new KryoMessageFormat();
+  }
+
+  @Test
+  @Override
+  public void serializeObjectIntoBytes() throws Exception {
+    super.serializeObjectIntoBytes();
+  }
+
+  @Test
+  @Override
+  public void serializeNullIntoBytes() throws Exception {
+    super.serializeNullIntoBytes();
+  }
+
+  @Test
+  @Override
+  public void blowsUpWhenObjectIsNotDeserializable() throws Exception {
+    super.blowsUpWhenObjectIsNotDeserializable();
+  }
+
+  @Test
+  public void serializeEmptyClassIntoBytes() {
+    byte[] bytes = format.serialize(eventOf(new EmptyClass()));
+
+    Object[] message = format.deserialize(bytes);
+
+    assertThat(message[0] instanceof EmptyClass, is(true));
+  }
+}
diff --git a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
similarity index 76%
copy from omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java
copy to omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
index 1460fd2..2188447 100644
--- a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java
+++ b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
@@ -20,18 +20,18 @@ package org.apache.servicecomb.saga.omega.format;
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
 import static java.util.Arrays.asList;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertThat;
 
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
 import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-import org.junit.Test;
 
-public class NativeMessageFormatTest {
+public class MessageFormatTestBase {
 
-  private final NativeMessageFormat format = new NativeMessageFormat();
+  static MessageFormat format;
 
-  @Test
   public void serializeObjectIntoBytes() throws Exception {
     byte[] bytes = format.serialize(eventOf("hello", "world"));
 
@@ -40,17 +40,14 @@ public class NativeMessageFormatTest {
     assertThat(asList(message), contains("hello", "world"));
   }
 
-  @Test
-  public void blowsUpWhenObjectIsNotSerializable() throws Exception {
-    try {
-      format.serialize(eventOf(new NotSerializable()));
-      expectFailing(OmegaException.class);
-    } catch (OmegaException e) {
-      assertThat(e.getMessage(), startsWith("Unable to serialize event with global tx id"));
-    }
+  public void serializeNullIntoBytes() throws Exception {
+    byte[] bytes = format.serialize(eventOf((Object[]) null));
+
+    Object[] message = format.deserialize(bytes);
+
+    assertThat(message, is(nullValue()));
   }
 
-  @Test
   public void blowsUpWhenObjectIsNotDeserializable() throws Exception {
     try {
       format.deserialize(new byte[0]);
@@ -60,10 +57,10 @@ public class NativeMessageFormatTest {
     }
   }
 
-  private TxEvent eventOf(Object... payloads) {
+  TxEvent eventOf(Object... payloads) {
     return new TxEvent(null, null, null, null, payloads);
   }
 
-  private static class NotSerializable {
+  static class EmptyClass {
   }
 }
diff --git a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java
index 1460fd2..e07bb61 100644
--- a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java
+++ b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java
@@ -1,69 +1,62 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 
 package org.apache.servicecomb.saga.omega.format;
 
 import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static java.util.Arrays.asList;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertThat;
 
 import org.apache.servicecomb.saga.omega.transaction.OmegaException;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class NativeMessageFormatTest {
+public class NativeMessageFormatTest extends MessageFormatTestBase {
 
-  private final NativeMessageFormat format = new NativeMessageFormat();
+  @BeforeClass
+  public static void setUp() {
+    format = new NativeMessageFormat();
+  }
 
   @Test
+  @Override
   public void serializeObjectIntoBytes() throws Exception {
-    byte[] bytes = format.serialize(eventOf("hello", "world"));
-
-    Object[] message = format.deserialize(bytes);
-
-    assertThat(asList(message), contains("hello", "world"));
+    super.serializeObjectIntoBytes();
   }
 
   @Test
-  public void blowsUpWhenObjectIsNotSerializable() throws Exception {
-    try {
-      format.serialize(eventOf(new NotSerializable()));
-      expectFailing(OmegaException.class);
-    } catch (OmegaException e) {
-      assertThat(e.getMessage(), startsWith("Unable to serialize event with global tx id"));
-    }
+  @Override
+  public void serializeNullIntoBytes() throws Exception {
+    super.serializeNullIntoBytes();
   }
 
   @Test
+  @Override
   public void blowsUpWhenObjectIsNotDeserializable() throws Exception {
+    super.blowsUpWhenObjectIsNotDeserializable();
+  }
+
+  @Test
+  public void blowsUpWhenSerializeEmptyClass() {
     try {
-      format.deserialize(new byte[0]);
+      format.serialize(eventOf(new EmptyClass()));
       expectFailing(OmegaException.class);
     } catch (OmegaException e) {
-      assertThat(e.getMessage(), startsWith("Unable to deserialize message"));
+      assertThat(e.getMessage(), startsWith("Unable to serialize event with global tx id"));
     }
   }
-
-  private TxEvent eventOf(Object... payloads) {
-    return new TxEvent(null, null, null, null, payloads);
-  }
-
-  private static class NotSerializable {
-  }
 }
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 9e0ebb6..7ed1f84 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -29,7 +29,7 @@ import org.apache.servicecomb.saga.omega.context.IdGenerator;
 import org.apache.servicecomb.saga.omega.context.OmegaContext;
 import org.apache.servicecomb.saga.omega.context.ServiceConfig;
 import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
-import org.apache.servicecomb.saga.omega.format.NativeMessageFormat;
+import org.apache.servicecomb.saga.omega.format.KryoMessageFormat;
 import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
 import org.apache.servicecomb.saga.omega.transaction.MessageSender;
 import org.slf4j.Logger;
@@ -76,7 +76,8 @@ class OmegaSpringConfig {
     // TODO: 2017/12/26 connect to the one with lowest latency
     for (String address : addresses) {
       try {
-        MessageSender sender = new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), new NativeMessageFormat(), serviceConfig, handler);
+        MessageSender sender = new GrpcClientMessageSender(grpcChannel(address), new KryoMessageFormat(),
+            new KryoMessageFormat(), serviceConfig, handler);
         sender.onConnected();
         senders.add(sender);
         return sender;
diff --git a/pom.xml b/pom.xml
index 145b658..2458bc8 100755
--- a/pom.xml
+++ b/pom.xml
@@ -329,6 +329,11 @@
         <artifactId>grpc-stub</artifactId>
         <version>${grpc.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.esotericsoftware</groupId>
+        <artifactId>kryo</artifactId>
+        <version>4.0.1</version>
+      </dependency>
 
       <!-- test dependencies -->
       <dependency>

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.