You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2018/01/08 08:30:33 UTC
[incubator-servicecomb-saga] 01/04: SCB-174 add kryo serializer
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 828b2a769dd8aa16562fa11fbf9c568f3f4ba00d
Author: Eric Lee <da...@huawei.com>
AuthorDate: Sat Jan 6 19:28:52 2018 +0800
SCB-174 add kryo serializer
Signed-off-by: Eric Lee <da...@huawei.com>
---
omega/omega-format/pom.xml | 4 ++
.../saga/omega/format/KryoMessageFormat.java | 52 ++++++++++++++++
.../saga/omega/format/MessageFormat.java | 34 +++++++++++
.../saga/omega/format/NativeMessageFormat.java | 14 +----
.../saga/omega/format/KryoMessageFormatTest.java | 59 ++++++++++++++++++
...eFormatTest.java => MessageFormatTestBase.java} | 27 ++++-----
.../saga/omega/format/NativeMessageFormatTest.java | 69 ++++++++++------------
.../saga/omega/spring/OmegaSpringConfig.java | 5 +-
pom.xml | 5 ++
9 files changed, 201 insertions(+), 68 deletions(-)
diff --git a/omega/omega-format/pom.xml b/omega/omega-format/pom.xml
index a842cbb..f34f323 100644
--- a/omega/omega-format/pom.xml
+++ b/omega/omega-format/pom.xml
@@ -33,6 +33,10 @@
<groupId>org.apache.servicecomb.saga</groupId>
<artifactId>omega-transaction</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo</artifactId>
+ </dependency>
<dependency>
<groupId>junit</groupId>
diff --git a/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormat.java b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormat.java
new file mode 100644
index 0000000..2a0dd8d
--- /dev/null
+++ b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormat.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.format;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class KryoMessageFormat implements MessageFormat {
+
+ private static final int DEFAULT_BUFFER_SIZE = 4096;
+
+ private static final Kryo kryo = new Kryo();
+
+ @Override
+ public byte[] serialize(Object[] objects) {
+ Output output = new Output(DEFAULT_BUFFER_SIZE, -1);
+ kryo.writeObjectOrNull(output, objects, Object[].class);
+
+ return output.toBytes();
+ }
+
+ @Override
+ public Object[] deserialize(byte[] message) {
+ try {
+ Input input = new Input(new ByteArrayInputStream(message));
+ return kryo.readObjectOrNull(input, Object[].class);
+ } catch (KryoException e) {
+ throw new OmegaException("Unable to deserialize message", e);
+ }
+ }
+}
diff --git a/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/MessageFormat.java b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/MessageFormat.java
new file mode 100644
index 0000000..8adde72
--- /dev/null
+++ b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/MessageFormat.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.format;
+
+import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
+import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+
+interface MessageFormat extends MessageSerializer, MessageDeserializer {
+ @Override
+ default byte[] serialize(TxEvent event) {
+ try {
+ return serialize(event.payloads());
+ } catch (OmegaException e) {
+ throw new OmegaException("Unable to serialize event with global tx id " + event.globalTxId(), e);
+ }
+ }
+}
diff --git a/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormat.java b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormat.java
index a486e1d..383afeb 100644
--- a/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormat.java
+++ b/omega/omega-format/src/main/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormat.java
@@ -23,21 +23,9 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
-import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
-public class NativeMessageFormat implements MessageSerializer, MessageDeserializer {
- @Override
- public byte[] serialize(TxEvent event) {
- try {
- return serialize(event.payloads());
- } catch (OmegaException e) {
- throw new OmegaException("Unable to serialize event with global tx id " + event.globalTxId(), e);
- }
- }
-
+public class NativeMessageFormat implements MessageFormat {
@Override
public byte[] serialize(Object[] objects) {
try {
diff --git a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormatTest.java b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormatTest.java
new file mode 100644
index 0000000..434067e
--- /dev/null
+++ b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/KryoMessageFormatTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.format;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class KryoMessageFormatTest extends MessageFormatTestBase {
+
+ @BeforeClass
+ public static void setUp() {
+ format = new KryoMessageFormat();
+ }
+
+ @Test
+ @Override
+ public void serializeObjectIntoBytes() throws Exception {
+ super.serializeObjectIntoBytes();
+ }
+
+ @Test
+ @Override
+ public void serializeNullIntoBytes() throws Exception {
+ super.serializeNullIntoBytes();
+ }
+
+ @Test
+ @Override
+ public void blowsUpWhenObjectIsNotDeserializable() throws Exception {
+ super.blowsUpWhenObjectIsNotDeserializable();
+ }
+
+ @Test
+ public void serializeEmptyClassIntoBytes() {
+ byte[] bytes = format.serialize(eventOf(new EmptyClass()));
+
+ Object[] message = format.deserialize(bytes);
+
+ assertThat(message[0] instanceof EmptyClass, is(true));
+ }
+}
diff --git a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
similarity index 76%
copy from omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java
copy to omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
index 1460fd2..2188447 100644
--- a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java
+++ b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/MessageFormatTestBase.java
@@ -20,18 +20,18 @@ package org.apache.servicecomb.saga.omega.format;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertThat;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
-import org.junit.Test;
-public class NativeMessageFormatTest {
+public class MessageFormatTestBase {
- private final NativeMessageFormat format = new NativeMessageFormat();
+ static MessageFormat format;
- @Test
public void serializeObjectIntoBytes() throws Exception {
byte[] bytes = format.serialize(eventOf("hello", "world"));
@@ -40,17 +40,14 @@ public class NativeMessageFormatTest {
assertThat(asList(message), contains("hello", "world"));
}
- @Test
- public void blowsUpWhenObjectIsNotSerializable() throws Exception {
- try {
- format.serialize(eventOf(new NotSerializable()));
- expectFailing(OmegaException.class);
- } catch (OmegaException e) {
- assertThat(e.getMessage(), startsWith("Unable to serialize event with global tx id"));
- }
+ public void serializeNullIntoBytes() throws Exception {
+ byte[] bytes = format.serialize(eventOf((Object[]) null));
+
+ Object[] message = format.deserialize(bytes);
+
+ assertThat(message, is(nullValue()));
}
- @Test
public void blowsUpWhenObjectIsNotDeserializable() throws Exception {
try {
format.deserialize(new byte[0]);
@@ -60,10 +57,10 @@ public class NativeMessageFormatTest {
}
}
- private TxEvent eventOf(Object... payloads) {
+ TxEvent eventOf(Object... payloads) {
return new TxEvent(null, null, null, null, payloads);
}
- private static class NotSerializable {
+ static class EmptyClass {
}
}
diff --git a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java
index 1460fd2..e07bb61 100644
--- a/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java
+++ b/omega/omega-format/src/test/java/org/apache/servicecomb/saga/omega/format/NativeMessageFormatTest.java
@@ -1,69 +1,62 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.servicecomb.saga.omega.format;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static java.util.Arrays.asList;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertThat;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
-import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import org.junit.BeforeClass;
import org.junit.Test;
-public class NativeMessageFormatTest {
+public class NativeMessageFormatTest extends MessageFormatTestBase {
- private final NativeMessageFormat format = new NativeMessageFormat();
+ @BeforeClass
+ public static void setUp() {
+ format = new NativeMessageFormat();
+ }
@Test
+ @Override
public void serializeObjectIntoBytes() throws Exception {
- byte[] bytes = format.serialize(eventOf("hello", "world"));
-
- Object[] message = format.deserialize(bytes);
-
- assertThat(asList(message), contains("hello", "world"));
+ super.serializeObjectIntoBytes();
}
@Test
- public void blowsUpWhenObjectIsNotSerializable() throws Exception {
- try {
- format.serialize(eventOf(new NotSerializable()));
- expectFailing(OmegaException.class);
- } catch (OmegaException e) {
- assertThat(e.getMessage(), startsWith("Unable to serialize event with global tx id"));
- }
+ @Override
+ public void serializeNullIntoBytes() throws Exception {
+ super.serializeNullIntoBytes();
}
@Test
+ @Override
public void blowsUpWhenObjectIsNotDeserializable() throws Exception {
+ super.blowsUpWhenObjectIsNotDeserializable();
+ }
+
+ @Test
+ public void blowsUpWhenSerializeEmptyClass() {
try {
- format.deserialize(new byte[0]);
+ format.serialize(eventOf(new EmptyClass()));
expectFailing(OmegaException.class);
} catch (OmegaException e) {
- assertThat(e.getMessage(), startsWith("Unable to deserialize message"));
+ assertThat(e.getMessage(), startsWith("Unable to serialize event with global tx id"));
}
}
-
- private TxEvent eventOf(Object... payloads) {
- return new TxEvent(null, null, null, null, payloads);
- }
-
- private static class NotSerializable {
- }
}
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 9e0ebb6..7ed1f84 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -29,7 +29,7 @@ import org.apache.servicecomb.saga.omega.context.IdGenerator;
import org.apache.servicecomb.saga.omega.context.OmegaContext;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.context.UniqueIdGenerator;
-import org.apache.servicecomb.saga.omega.format.NativeMessageFormat;
+import org.apache.servicecomb.saga.omega.format.KryoMessageFormat;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.slf4j.Logger;
@@ -76,7 +76,8 @@ class OmegaSpringConfig {
// TODO: 2017/12/26 connect to the one with lowest latency
for (String address : addresses) {
try {
- MessageSender sender = new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), new NativeMessageFormat(), serviceConfig, handler);
+ MessageSender sender = new GrpcClientMessageSender(grpcChannel(address), new KryoMessageFormat(),
+ new KryoMessageFormat(), serviceConfig, handler);
sender.onConnected();
senders.add(sender);
return sender;
diff --git a/pom.xml b/pom.xml
index 145b658..2458bc8 100755
--- a/pom.xml
+++ b/pom.xml
@@ -329,6 +329,11 @@
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo</artifactId>
+ <version>4.0.1</version>
+ </dependency>
<!-- test dependencies -->
<dependency>
--
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>.