You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2018/08/21 14:19:38 UTC
[incubator-servicecomb-saga] branch master updated: [SCB-745]
Applying Builder Pattern on AlphaClusterConfig. - Extending
AlphaClusterConfig by adding MessageHandler,
MessageSerializer and MessageDeserializer property. - Refactoring the
constructor of LoadBalancedClusterMessageSender. - Modifying respective
test cases and references. - Killing some issue detected by SonarLint.
This is an automated email from the ASF dual-hosted git repository.
ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
The following commit(s) were added to refs/heads/master by this push:
new 0e3593e [SCB-745] Applying Builder Pattern on AlphaClusterConfig. - Extending AlphaClusterConfig by adding MessageHandler, MessageSerializer and MessageDeserializer property. - Refactoring the constructor of LoadBalancedClusterMessageSender. - Modifying respective test cases and references. - Killing some issue detected by SonarLint.
0e3593e is described below
commit 0e3593eecbea677bd0a732ef57c64b4df754f7c5
Author: imlijinting <41...@users.noreply.github.com>
AuthorDate: Tue Aug 21 21:35:28 2018 +0800
[SCB-745] Applying Builder Pattern on AlphaClusterConfig.
- Extending AlphaClusterConfig by adding MessageHandler, MessageSerializer and MessageDeserializer property.
- Refactoring the constructor of LoadBalancedClusterMessageSender.
- Modifying respective test cases and references.
- Killing some issue detected by SonarLint.
---
.../omega/connector/grpc/AlphaClusterConfig.java | 122 ++++++++++++++++++++-
.../grpc/LoadBalancedClusterMessageSender.java | 22 ++--
...LoadBalanceClusterMessageSenderWithTLSTest.java | 38 ++++---
.../grpc/LoadBalancedClusterMessageSenderTest.java | 40 ++-----
.../saga/omega/spring/OmegaSpringConfig.java | 22 ++--
5 files changed, 171 insertions(+), 73 deletions(-)
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java
index c7aa664..a8641b4 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/AlphaClusterConfig.java
@@ -17,9 +17,14 @@
package org.apache.servicecomb.saga.omega.connector.grpc;
+import java.util.Collections;
import java.util.List;
+import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
+import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
+import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
public class AlphaClusterConfig {
+
private List<String> addresses;
private boolean enableSSL;
@@ -32,13 +37,23 @@ public class AlphaClusterConfig {
private String certChain;
+ private MessageSerializer messageSerializer;
+
+ private MessageDeserializer messageDeserializer;
+
+ private MessageHandler messageHandler;
+
+ /**
+ * @deprecated Use {@link Builder} instead.
+ */
+ @Deprecated
public AlphaClusterConfig(List<String> addresses,
boolean enableSSL,
boolean enableMutualAuth,
String cert,
String key,
String certChain) {
- this.addresses = addresses;
+ this.addresses = addresses == null ? Collections.<String>emptyList() : addresses;
this.enableMutualAuth = enableMutualAuth;
this.enableSSL = enableSSL;
this.cert = cert;
@@ -46,6 +61,97 @@ public class AlphaClusterConfig {
this.certChain = certChain;
}
+ private AlphaClusterConfig(List<String> addresses, boolean enableSSL, boolean enableMutualAuth,
+ String cert, String key, String certChain,
+ MessageSerializer messageSerializer,
+ MessageDeserializer messageDeserializer,
+ MessageHandler messageHandler) {
+ this.addresses = addresses;
+ this.enableSSL = enableSSL;
+ this.enableMutualAuth = enableMutualAuth;
+ this.cert = cert;
+ this.key = key;
+ this.certChain = certChain;
+ this.messageSerializer = messageSerializer;
+ this.messageDeserializer = messageDeserializer;
+ this.messageHandler = messageHandler;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private List<String> addresses;
+ private boolean enableSSL;
+ private boolean enableMutualAuth;
+ private String cert;
+ private String key;
+ private String certChain;
+ private MessageSerializer messageSerializer;
+ private MessageDeserializer messageDeserializer;
+ private MessageHandler messageHandler;
+
+ public Builder addresses(List<String> addresses) {
+ this.addresses = addresses;
+ return this;
+ }
+
+ public Builder enableSSL(boolean enableSSL) {
+ this.enableSSL = enableSSL;
+ return this;
+ }
+
+ public Builder enableMutualAuth(boolean enableMutualAuth) {
+ this.enableMutualAuth = enableMutualAuth;
+ return this;
+ }
+
+ public Builder cert(String cert) {
+ this.cert = cert;
+ return this;
+ }
+
+ public Builder key(String key) {
+ this.key = key;
+ return this;
+ }
+
+ public Builder certChain(String certChain) {
+ this.certChain = certChain;
+ return this;
+ }
+
+ public Builder messageSerializer(MessageSerializer messageSerializer) {
+ this.messageSerializer = messageSerializer;
+ return this;
+ }
+
+ public Builder messageDeserializer(MessageDeserializer messageDeserializer) {
+ this.messageDeserializer = messageDeserializer;
+ return this;
+ }
+
+ public Builder messageHandler(MessageHandler messageHandler) {
+ this.messageHandler = messageHandler;
+ return this;
+ }
+
+
+ public AlphaClusterConfig build() {
+ return new AlphaClusterConfig(this.addresses,
+ this.enableSSL,
+ this.enableMutualAuth,
+ this.cert,
+ this.key,
+ this.certChain,
+ this.messageSerializer,
+ this.messageDeserializer,
+ messageHandler);
+ }
+ }
+
public List<String> getAddresses() {
return addresses;
}
@@ -69,4 +175,16 @@ public class AlphaClusterConfig {
public String getCertChain() {
return certChain;
}
-}
+
+ public MessageSerializer getMessageSerializer() {
+ return messageSerializer;
+ }
+
+ public MessageDeserializer getMessageDeserializer() {
+ return messageDeserializer;
+ }
+
+ public MessageHandler getMessageHandler() {
+ return messageHandler;
+ }
+}
\ No newline at end of file
diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
index 42c0d52..a293344 100644
--- a/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
+++ b/omega/omega-connector/omega-connector-grpc/src/main/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSender.java
@@ -45,10 +45,7 @@ import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.SSLException;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
-import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
-import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
-import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.slf4j.Logger;
@@ -75,13 +72,10 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public LoadBalancedClusterMessageSender(AlphaClusterConfig clusterConfig,
- MessageSerializer serializer,
- MessageDeserializer deserializer,
ServiceConfig serviceConfig,
- MessageHandler handler,
int reconnectDelay) {
- if (clusterConfig.getAddresses().size() == 0) {
+ if (clusterConfig.getAddresses().isEmpty()) {
throw new IllegalArgumentException("No reachable cluster address provided");
}
@@ -112,11 +106,11 @@ public class LoadBalancedClusterMessageSender implements MessageSender {
new GrpcClientMessageSender(
address,
channel,
- serializer,
- deserializer,
+ clusterConfig.getMessageSerializer(),
+ clusterConfig.getMessageDeserializer(),
serviceConfig,
new ErrorHandlerFactory(),
- handler),
+ clusterConfig.getMessageHandler()),
0L);
}
@@ -257,11 +251,9 @@ class FastestSender implements MessageSenderPicker {
Long min = Long.MAX_VALUE;
MessageSender sender = null;
for (Map.Entry<MessageSender, Long> entry : messageSenders.entrySet()) {
- if (entry.getValue() != Long.MAX_VALUE) {
- if (min > entry.getValue()) {
- min = entry.getValue();
- sender = entry.getKey();
- }
+ if (entry.getValue() != Long.MAX_VALUE && min > entry.getValue()) {
+ min = entry.getValue();
+ sender = entry.getKey();
}
}
if (sender == null) {
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceClusterMessageSenderWithTLSTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceClusterMessageSenderWithTLSTest.java
index 822c7ec..6de7c01 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceClusterMessageSenderWithTLSTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalanceClusterMessageSenderWithTLSTest.java
@@ -24,43 +24,45 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import com.google.common.collect.ImmutableList;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyServerBuilder;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.Callable;
-
import javax.net.ssl.SSLException;
-
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
import org.junit.BeforeClass;
import org.junit.Test;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import io.grpc.netty.GrpcSslContexts;
-import io.grpc.netty.NettyServerBuilder;
-import io.netty.handler.ssl.ClientAuth;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-
public class LoadBalanceClusterMessageSenderWithTLSTest extends LoadBalancedClusterMessageSenderTestBase {
@Override
protected MessageSender newMessageSender(String[] addresses) {
ClassLoader classLoader = getClass().getClassLoader();
- AlphaClusterConfig clusterConfig = new AlphaClusterConfig(Arrays.asList(addresses),
- true, true,
- classLoader.getResource("client.crt").getFile(),
- classLoader.getResource("client.pem").getFile(),
- classLoader.getResource("ca.crt").getFile());
+ AlphaClusterConfig clusterConfig = AlphaClusterConfig.builder()
+ .addresses(ImmutableList.copyOf(addresses))
+ .enableMutualAuth(true)
+ .enableSSL(true)
+ .cert(classLoader.getResource("client.crt").getFile())
+ .messageHandler(handler)
+ .key(classLoader.getResource("client.pem").getFile())
+ .certChain(classLoader.getResource("ca.crt").getFile())
+ .messageSerializer(serializer)
+ .messageDeserializer(deserializer)
+ .build();
+
return new LoadBalancedClusterMessageSender(
clusterConfig,
- serializer,
- deserializer,
new ServiceConfig(serviceName),
- handler,
100);
}
diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
index e353cdf..491e15f 100644
--- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java
@@ -18,7 +18,6 @@
package org.apache.servicecomb.saga.omega.connector.grpc;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
-import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
import static java.lang.Thread.State.WAITING;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
@@ -30,52 +29,35 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import com.google.common.collect.ImmutableList;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.servicecomb.saga.common.EventType;
import org.apache.servicecomb.saga.omega.context.ServiceConfig;
-import org.apache.servicecomb.saga.omega.transaction.MessageDeserializer;
-import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
-import org.apache.servicecomb.saga.omega.transaction.MessageSerializer;
import org.apache.servicecomb.saga.omega.transaction.OmegaException;
import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
import org.apache.servicecomb.saga.omega.transaction.TxEvent;
import org.apache.servicecomb.saga.omega.transaction.TxStartedEvent;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
-import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import org.apache.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase;
-import org.junit.After;
-import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import io.grpc.stub.StreamObserver;
-
public class LoadBalancedClusterMessageSenderTest extends LoadBalancedClusterMessageSenderTestBase {
@Override
protected MessageSender newMessageSender(String[] addresses) {
- AlphaClusterConfig clusterConfig = new AlphaClusterConfig(Arrays.asList(addresses),
- false, false, null,null,null);
+ AlphaClusterConfig clusterConfig = AlphaClusterConfig.builder()
+ .addresses(ImmutableList.copyOf(addresses))
+ .enableSSL(false)
+ .enableMutualAuth(false)
+ .messageSerializer(serializer)
+ .messageDeserializer(deserializer)
+ .messageHandler(handler)
+ .build();
return new LoadBalancedClusterMessageSender(
clusterConfig,
- serializer,
- deserializer,
new ServiceConfig(serviceName),
- handler,
100);
}
diff --git a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 51324e7..217e102 100644
--- a/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/org/apache/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -17,9 +17,7 @@
package org.apache.servicecomb.saga.omega.spring;
-import java.util.ArrayList;
-import java.util.Arrays;
-
+import com.google.common.collect.ImmutableList;
import org.apache.servicecomb.saga.omega.connector.grpc.AlphaClusterConfig;
import org.apache.servicecomb.saga.omega.connector.grpc.LoadBalancedClusterMessageSender;
import org.apache.servicecomb.saga.omega.context.CompensationContext;
@@ -31,7 +29,6 @@ import org.apache.servicecomb.saga.omega.format.KryoMessageFormat;
import org.apache.servicecomb.saga.omega.format.MessageFormat;
import org.apache.servicecomb.saga.omega.transaction.MessageHandler;
import org.apache.servicecomb.saga.omega.transaction.MessageSender;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
@@ -74,14 +71,21 @@ class OmegaSpringConfig {
@Lazy MessageHandler handler) {
MessageFormat messageFormat = new KryoMessageFormat();
- AlphaClusterConfig clusterConfig = new AlphaClusterConfig(Arrays.asList(addresses),
- enableSSL, mutualAuth, cert, key, certChain);
+ AlphaClusterConfig clusterConfig = AlphaClusterConfig.builder()
+ .addresses(ImmutableList.copyOf(addresses))
+ .enableSSL(enableSSL)
+ .enableMutualAuth(mutualAuth)
+ .cert(cert)
+ .key(key)
+ .certChain(certChain)
+ .messageDeserializer(messageFormat)
+ .messageSerializer(messageFormat)
+ .messageHandler(handler)
+ .build();
+
final MessageSender sender = new LoadBalancedClusterMessageSender(
clusterConfig,
- messageFormat,
- messageFormat,
serviceConfig,
- handler,
reconnectDelay);
sender.onConnected();