You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by se...@apache.org on 2017/12/26 12:04:18 UTC

[incubator-servicecomb-saga] branch SCB-97_alpha_omega_bonding updated: SCB-97 closed thrift connections on shutdown

This is an automated email from the ASF dual-hosted git repository.

seanyinx pushed a commit to branch SCB-97_alpha_omega_bonding
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git


The following commit(s) were added to refs/heads/SCB-97_alpha_omega_bonding by this push:
     new 12c437d  SCB-97 closed thrift connections on shutdown
12c437d is described below

commit 12c437d844a29b78cc1dc1b1657350bfa56147e9
Author: seanyinx <se...@huawei.com>
AuthorDate: Tue Dec 26 20:04:09 2017 +0800

    SCB-97 closed thrift connections on shutdown
    
    Signed-off-by: seanyinx <se...@huawei.com>
---
 .../alpha/server/SwiftTxEventEndpointImpl.java     |  5 +++
 .../connector/thrift/ThriftMessageSender.java      | 27 ++++---------
 .../connector/thrift/ThriftMessageSenderTest.java  | 12 +++++-
 .../saga/omega/spring/OmegaSpringConfig.java       | 46 +++++++++++++++++++++-
 .../contracts/thrift/SwiftTxEventEndpoint.java     |  2 +-
 5 files changed, 69 insertions(+), 23 deletions(-)

diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
index 3ae39f6..78b93b4 100644
--- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
+++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java
@@ -43,4 +43,9 @@ class SwiftTxEventEndpointImpl implements SwiftTxEventEndpoint {
         message.payloads()
     ));
   }
+
+  @Override
+  public void close() throws Exception {
+    
+  }
 }
diff --git a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java b/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
index bbf9bfa..abdbec5 100644
--- a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
+++ b/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java
@@ -17,35 +17,17 @@
 
 package io.servicecomb.saga.omega.connector.thrift;
 
-import static com.google.common.net.HostAndPort.fromParts;
-
-import java.util.concurrent.ExecutionException;
-
-import com.facebook.nifty.client.FramedClientConnector;
-import com.facebook.swift.service.ThriftClientManager;
-
 import io.servicecomb.saga.omega.transaction.MessageSender;
 import io.servicecomb.saga.omega.transaction.MessageSerializer;
 import io.servicecomb.saga.omega.transaction.TxEvent;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent;
 import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
 
-public class ThriftMessageSender implements MessageSender {
-  private static final ThriftClientManager clientManager = new ThriftClientManager();
+public class ThriftMessageSender implements MessageSender, AutoCloseable {
   private final SwiftTxEventEndpoint eventService;
   private final MessageSerializer serializer;
 
-  public static ThriftMessageSender create(String host, int port, MessageSerializer serializer) {
-    FramedClientConnector connector = new FramedClientConnector(fromParts(host, port));
-    try {
-      SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get();
-      return new ThriftMessageSender(endpoint, serializer);
-    } catch (InterruptedException | ExecutionException e) {
-      throw new IllegalStateException("Failed to create transaction event endpoint client to " + host + ":" + port, e);
-    }
-  }
-
-  ThriftMessageSender(SwiftTxEventEndpoint eventService, MessageSerializer serializer) {
+  public ThriftMessageSender(SwiftTxEventEndpoint eventService, MessageSerializer serializer) {
     this.eventService = eventService;
     this.serializer = serializer;
   }
@@ -61,4 +43,9 @@ public class ThriftMessageSender implements MessageSender {
         serializer.serialize(event)
     ));
   }
+
+  @Override
+  public void close() throws Exception {
+    eventService.close();
+  }
 }
diff --git a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
index 3e1f833..9d0ea97 100644
--- a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
+++ b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java
@@ -52,7 +52,17 @@ public class ThriftMessageSenderTest {
     }
   };
 
-  private final SwiftTxEventEndpoint eventService = (event) -> swiftTxEvent = event;
+  private final SwiftTxEventEndpoint eventService = new SwiftTxEventEndpoint() {
+    @Override
+    public void handle(SwiftTxEvent message) {
+      swiftTxEvent = message;
+    }
+
+    @Override
+    public void close() throws Exception {
+    }
+  };
+
   private final ThriftMessageSender messageSender = new ThriftMessageSender(eventService, serializer);
 
   @Test
diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 33a0cbc..5f62884 100644
--- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -17,8 +17,15 @@
 
 package io.servicecomb.saga.omega.spring;
 
+import static com.google.common.net.HostAndPort.fromParts;
+
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import javax.annotation.PreDestroy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,16 +33,23 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import com.facebook.nifty.client.FramedClientConnector;
+import com.facebook.swift.service.ThriftClientManager;
+
 import io.servicecomb.saga.omega.connector.thrift.ThriftMessageSender;
 import io.servicecomb.saga.omega.context.IdGenerator;
 import io.servicecomb.saga.omega.context.OmegaContext;
 import io.servicecomb.saga.omega.context.UniqueIdGenerator;
 import io.servicecomb.saga.omega.format.NativeMessageFormat;
 import io.servicecomb.saga.omega.transaction.MessageSender;
+import io.servicecomb.saga.omega.transaction.MessageSerializer;
+import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint;
 
 @Configuration
 class OmegaSpringConfig {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final ThriftClientManager clientManager = new ThriftClientManager();
+  private final List<AutoCloseable> closeables = new ArrayList<>();
 
   @Bean
   IdGenerator<String> idGenerator() {
@@ -53,7 +67,9 @@ class OmegaSpringConfig {
     for (String address : addresses) {
       try {
         String[] pair = address.split(":");
-        return ThriftMessageSender.create(pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat());
+        ThriftMessageSender sender = createMessageSender(clientManager, pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat());
+        closeables.add(sender);
+        return sender;
       } catch (Exception e) {
         log.error("Unable to connect to alpha at {}", address, e);
       }
@@ -62,4 +78,32 @@ class OmegaSpringConfig {
     throw new IllegalArgumentException(
         "None of the alpha cluster is reachable: " + Arrays.toString(addresses));
   }
+
+  private ThriftMessageSender createMessageSender(ThriftClientManager clientManager,
+      String host,
+      int port,
+      MessageSerializer serializer) {
+
+    FramedClientConnector connector = new FramedClientConnector(fromParts(host, port));
+
+    try {
+      SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get();
+      return new ThriftMessageSender(endpoint, serializer);
+    } catch (InterruptedException | ExecutionException e) {
+      throw new IllegalStateException("Failed to create transaction event endpoint client to " + host + ":" + port, e);
+    }
+  }
+
+  @PreDestroy
+  void close() {
+    for (AutoCloseable closeable : closeables) {
+      try {
+        closeable.close();
+      } catch (Exception e) {
+        log.warn("Failed to close message sender", e);
+      }
+    }
+
+    clientManager.close();
+  }
 }
diff --git a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java b/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
index beff7bc..ae1fde9 100644
--- a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
+++ b/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java
@@ -21,7 +21,7 @@ import com.facebook.swift.service.ThriftMethod;
 import com.facebook.swift.service.ThriftService;
 
 @ThriftService("TxEventEndpoint")
-public interface SwiftTxEventEndpoint {
+public interface SwiftTxEventEndpoint extends AutoCloseable {
 
   @ThriftMethod
   void handle(SwiftTxEvent message);

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].