You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2017/12/26 12:04:18 UTC
[incubator-servicecomb-saga] branch SCB-97_alpha_omega_bonding
updated: SCB-97 closed thrift connections on shutdown
This is an automated email from the ASF dual-hosted git repository.
seanyinx pushed a commit to branch SCB-97_alpha_omega_bonding
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
The following commit(s) were added to refs/heads/SCB-97_alpha_omega_bonding by this push:
new 12c437d SCB-97 closed thrift connections on shutdown
12c437d is described below
commit 12c437d844a29b78cc1dc1b1657350bfa56147e9
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Dec 26 20:04:09 2017 +0800
SCB-97 closed thrift connections on shutdown
Signed-off-by: seanyinx <se...@huawei.com>
---
.../alpha/server/SwiftTxEventEndpointImpl.java | 5 +++
.../connector/thrift/ThriftMessageSender.java | 27 ++++---------
.../connector/thrift/ThriftMessageSenderTest.java | 12 +++++-
.../saga/omega/spring/OmegaSpringConfig.java | 46 +++++++++++++++++++++-
.../contracts/thrift/SwiftTxEventEndpoint.java | 2 +-
5 files changed, 69 insertions(+), 23 deletions(-)
diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
index 3ae39f6..78b93b4 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
@@ -43,4 +43,9 @@ class SwiftTxEventEndpointImpl implements SwiftTxEventEndpoint {
message.payloads()
));
}
+
+ @Override
+ public void close() throws Exception {
+
+ }
}
diff --git a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java b/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
index bbf9bfa..abdbec5 100644
--- a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
+++ b/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
@@ -17,35 +17,17 @@
package io.servicecomb.saga.omega.connector.thrift;
-import static com.google.common.net.HostAndPort.fromParts;
-
-import java.util.concurrent.ExecutionException;
-
-import com.facebook.nifty.client.FramedClientConnector;
-import com.facebook.swift.service.ThriftClientManager;
-
import io.servicecomb.saga.omega.transaction.MessageSender;
import io.servicecomb.saga.omega.transaction.MessageSerializer;
import io.servicecomb.saga.omega.transaction.TxEvent;
import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent;
import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
-public class ThriftMessageSender implements MessageSender {
- private static final ThriftClientManager clientManager = new ThriftClientManager();
+public class ThriftMessageSender implements MessageSender, AutoCloseable {
private final SwiftTxEventEndpoint eventService;
private final MessageSerializer serializer;
- public static ThriftMessageSender create(String host, int port, MessageSerializer serializer) {
- FramedClientConnector connector = new FramedClientConnector(fromParts(host, port));
- try {
- SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get();
- return new ThriftMessageSender(endpoint, serializer);
- } catch (InterruptedException | ExecutionException e) {
- throw new IllegalStateException("Failed to create transaction event endpoint client to " + host + ":" + port, e);
- }
- }
-
- ThriftMessageSender(SwiftTxEventEndpoint eventService, MessageSerializer serializer) {
+ public ThriftMessageSender(SwiftTxEventEndpoint eventService, MessageSerializer serializer) {
this.eventService = eventService;
this.serializer = serializer;
}
@@ -61,4 +43,9 @@ public class ThriftMessageSender implements MessageSender {
serializer.serialize(event)
));
}
+
+ @Override
+ public void close() throws Exception {
+ eventService.close();
+ }
}
diff --git a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
index 3e1f833..9d0ea97 100644
--- a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
@@ -52,7 +52,17 @@ public class ThriftMessageSenderTest {
}
};
- private final SwiftTxEventEndpoint eventService = (event) -> swiftTxEvent = event;
+ private final SwiftTxEventEndpoint eventService = new SwiftTxEventEndpoint() {
+ @Override
+ public void handle(SwiftTxEvent message) {
+ swiftTxEvent = message;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+ };
+
private final ThriftMessageSender messageSender = new ThriftMessageSender(eventService, serializer);
@Test
diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 33a0cbc..5f62884 100644
--- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -17,8 +17,15 @@
package io.servicecomb.saga.omega.spring;
+import static com.google.common.net.HostAndPort.fromParts;
+
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,16 +33,23 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import com.facebook.nifty.client.FramedClientConnector;
+import com.facebook.swift.service.ThriftClientManager;
+
import io.servicecomb.saga.omega.connector.thrift.ThriftMessageSender;
import io.servicecomb.saga.omega.context.IdGenerator;
import io.servicecomb.saga.omega.context.OmegaContext;
import io.servicecomb.saga.omega.context.UniqueIdGenerator;
import io.servicecomb.saga.omega.format.NativeMessageFormat;
import io.servicecomb.saga.omega.transaction.MessageSender;
+import io.servicecomb.saga.omega.transaction.MessageSerializer;
+import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
@Configuration
class OmegaSpringConfig {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ThriftClientManager clientManager = new ThriftClientManager();
+ private final List<AutoCloseable> closeables = new ArrayList<>();
@Bean
IdGenerator<String> idGenerator() {
@@ -53,7 +67,9 @@ class OmegaSpringConfig {
for (String address : addresses) {
try {
String[] pair = address.split(":");
- return ThriftMessageSender.create(pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat());
+ ThriftMessageSender sender = createMessageSender(clientManager, pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat());
+ closeables.add(sender);
+ return sender;
} catch (Exception e) {
log.error("Unable to connect to alpha at {}", address, e);
}
@@ -62,4 +78,32 @@ class OmegaSpringConfig {
throw new IllegalArgumentException(
"None of the alpha cluster is reachable: " + Arrays.toString(addresses));
}
+
+ private ThriftMessageSender createMessageSender(ThriftClientManager clientManager,
+ String host,
+ int port,
+ MessageSerializer serializer) {
+
+ FramedClientConnector connector = new FramedClientConnector(fromParts(host, port));
+
+ try {
+ SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get();
+ return new ThriftMessageSender(endpoint, serializer);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IllegalStateException("Failed to create transaction event endpoint client to " + host + ":" + port, e);
+ }
+ }
+
+ @PreDestroy
+ void close() {
+ for (AutoCloseable closeable : closeables) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ log.warn("Failed to close message sender", e);
+ }
+ }
+
+ clientManager.close();
+ }
}
diff --git a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java b/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
index beff7bc..ae1fde9 100644
--- a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
+++ b/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
@@ -21,7 +21,7 @@ import com.facebook.swift.service.ThriftMethod;
import com.facebook.swift.service.ThriftService;
@ThriftService("TxEventEndpoint")
-public interface SwiftTxEventEndpoint {
+public interface SwiftTxEventEndpoint extends AutoCloseable {
@ThriftMethod
void handle(SwiftTxEvent message);
--
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].