You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/08/01 07:39:05 UTC

[incubator-servicecomb-java-chassis] 01/03: SCB-687 add highway server connection protection

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit fb839b4a415005669ab5ae689a262fad73fe9d25
Author: zhengyangyong <ya...@huawei.com>
AuthorDate: Thu Jul 19 16:42:07 2018 +0800

    SCB-687 add highway server connection protection
    
    Signed-off-by: zhengyangyong <ya...@huawei.com>
---
 .../foundation/vertx/ClientClosedEvent.java        | 28 +++++----
 .../foundation/vertx/ClientConnectedEvent.java}    | 31 +++++-----
 .../foundation/vertx/server/TcpServer.java         | 26 ++++++++-
 .../vertx/server/TcpServerConnection.java          |  9 ++-
 .../foundation/vertx/server/TestTcpServer.java     |  5 +-
 .../vertx/server/TestTcpServerConnection.java      |  4 +-
 integration-tests/pom.xml                          |  1 +
 .../spring-pojo-connection-limit-test/pom.xml      | 68 ++++++++++++++++++++++
 .../PojoSpringConnectionLimitIntegrationTest.java} | 22 +++++--
 .../demo/pojo/test/PojoSpringMain.java}            | 20 +++----
 .../src/test/resources/log4j.properties            | 20 +++++++
 .../src/test/resources/microservice.yaml           | 29 +++++++++
 ...rationTest.java => ConnectionEventWatcher.java} | 35 +++++++----
 .../demo/pojo/test/PojoSpringIntegrationTest.java  | 13 +++++
 .../transport/highway/HighwayServerConnection.java |  6 +-
 .../highway/TestHighwayServerConnection.java       |  3 +-
 16 files changed, 258 insertions(+), 62 deletions(-)

diff --git a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ClientClosedEvent.java
similarity index 63%
copy from integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java
copy to foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ClientClosedEvent.java
index 29f1c97..9a2ba4d 100644
--- a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ClientClosedEvent.java
@@ -15,19 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.demo.pojo.test;
+package org.apache.servicecomb.foundation.vertx;
 
-import org.junit.BeforeClass;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
+public class ClientClosedEvent {
+  private final String address;
 
-@RunWith(SpringRunner.class)
-@SpringBootTest(classes = PojoSpringMain.class)
-public class PojoSpringIntegrationTest extends PojoIntegrationTestBase {
+  private final int totalConnectedCount;
 
-  @BeforeClass
-  public static void setUpClass() {
-    setUpLocalRegistry();
+  public String getAddress() {
+    return address;
   }
-}
+
+  public int getTotalConnectedCount() {
+    return totalConnectedCount;
+  }
+
+  public ClientClosedEvent(String address, int totalConnectedCount) {
+    this.address = address;
+    this.totalConnectedCount = totalConnectedCount;
+  }
+}
\ No newline at end of file
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ClientConnectedEvent.java
similarity index 61%
copy from foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java
copy to foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ClientConnectedEvent.java
index 3047042..9c2d3a3 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ClientConnectedEvent.java
@@ -14,23 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.servicecomb.foundation.vertx.server;
 
-import org.junit.Assert;
-import org.junit.Test;
+package org.apache.servicecomb.foundation.vertx;
 
-import io.vertx.core.net.impl.NetSocketImpl;
-import mockit.Mocked;
+import io.vertx.core.net.NetSocket;
 
-public class TestTcpServerConnection {
-  @Test
-  public void test(@Mocked NetSocketImpl netSocket) {
-    TcpServerConnection connection = new TcpServerConnection();
-    connection.setProtocol("p");
-    connection.setZipName("z");
+public class ClientConnectedEvent {
+  private final NetSocket netSocket;
 
-    connection.init(netSocket);
+  private final int totalConnectedCount;
 
-    Assert.assertEquals(netSocket, connection.getNetSocket());
+  public NetSocket getNetSocket() {
+    return netSocket;
   }
-}
+
+  public int getTotalConnectedCount() {
+    return totalConnectedCount;
+  }
+
+  public ClientConnectedEvent(NetSocket netSocket, int totalConnectedCount) {
+    this.netSocket = netSocket;
+    this.totalConnectedCount = totalConnectedCount;
+  }
+}
\ No newline at end of file
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java
index a4fd690..936f171 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java
@@ -18,14 +18,19 @@
 package org.apache.servicecomb.foundation.vertx.server;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.foundation.common.net.URIEndpointObject;
 import org.apache.servicecomb.foundation.ssl.SSLCustom;
 import org.apache.servicecomb.foundation.ssl.SSLOption;
 import org.apache.servicecomb.foundation.ssl.SSLOptionFactory;
 import org.apache.servicecomb.foundation.vertx.AsyncResultCallback;
+import org.apache.servicecomb.foundation.vertx.ClientConnectedEvent;
 import org.apache.servicecomb.foundation.vertx.VertxTLSBuilder;
 
+import com.netflix.config.DynamicPropertyFactory;
+
 import io.vertx.core.Vertx;
 import io.vertx.core.net.NetServer;
 import io.vertx.core.net.NetServerOptions;
@@ -33,8 +38,15 @@ import io.vertx.core.net.NetServerOptions;
 public class TcpServer {
   private URIEndpointObject endpointObject;
 
+  private final AtomicInteger connectedCounter;
+
+  private final int connectionLimit;
+
   public TcpServer(URIEndpointObject endpointObject) {
     this.endpointObject = endpointObject;
+    this.connectedCounter = new AtomicInteger(0);
+    this.connectionLimit = DynamicPropertyFactory.getInstance()
+        .getIntProperty("servicecomb.highway.server.connection-limit", Integer.MAX_VALUE).get();
   }
 
   public void init(Vertx vertx, String sslKey, AsyncResultCallback<InetSocketAddress> callback) {
@@ -57,8 +69,18 @@ public class TcpServer {
     }
 
     netServer.connectHandler(netSocket -> {
-      TcpServerConnection connection = createTcpServerConnection();
-      connection.init(netSocket);
+      if (connectedCounter.get() < connectionLimit) {
+        int connectedCount = connectedCounter.incrementAndGet();
+        if (connectedCount <= connectionLimit) {
+          TcpServerConnection connection = createTcpServerConnection();
+          connection.init(netSocket, connectedCounter);
+          EventManager.post(new ClientConnectedEvent(netSocket, connectedCount));
+          return;
+        } else {
+          connectedCounter.decrementAndGet();
+        }
+      }
+      netSocket.close();
     });
 
     InetSocketAddress socketAddress = endpointObject.getSocketAddress();
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java
index 348a3f8..5c47100 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java
@@ -16,6 +16,10 @@
  */
 package org.apache.servicecomb.foundation.vertx.server;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.servicecomb.foundation.common.event.EventManager;
+import org.apache.servicecomb.foundation.vertx.ClientClosedEvent;
 import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,7 +32,7 @@ public class TcpServerConnection extends TcpConnection {
 
   protected TcpParser splitter;
 
-  public void init(NetSocket netSocket) {
+  public void init(NetSocket netSocket, AtomicInteger connectedCounter) {
     // currently, socket always be NetSocketImpl
     this.initNetSocket((NetSocketImpl) netSocket);
 
@@ -46,6 +50,9 @@ public class TcpServerConnection extends TcpConnection {
       LOGGER.error("disconected from {}, in thread {}",
           remoteAddress,
           Thread.currentThread().getName());
+
+      int connectedCount = connectedCounter.decrementAndGet();
+      EventManager.post(new ClientClosedEvent(remoteAddress, connectedCount));
     });
 
     netSocket.handler(splitter);
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java
index 403b731..4702905 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.foundation.vertx.server;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.servicecomb.foundation.common.net.URIEndpointObject;
 import org.apache.servicecomb.foundation.vertx.AsyncResultCallback;
@@ -41,8 +42,8 @@ public class TestTcpServer {
     protected TcpServerConnection createTcpServerConnection() {
       return new TcpServerConnection() {
         @Override
-        public void init(NetSocket netSocket) {
-          super.init(netSocket);
+        public void init(NetSocket netSocket, AtomicInteger connectedCounter) {
+          super.init(netSocket, connectedCounter);
         }
       };
     }
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java
index 3047042..08b3978 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java
@@ -16,6 +16,8 @@
  */
 package org.apache.servicecomb.foundation.vertx.server;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -29,7 +31,7 @@ public class TestTcpServerConnection {
     connection.setProtocol("p");
     connection.setZipName("z");
 
-    connection.init(netSocket);
+    connection.init(netSocket, new AtomicInteger());
 
     Assert.assertEquals(netSocket, connection.getNetSocket());
   }
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 7a244ed..d7ceba4 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -41,6 +41,7 @@
     <module>spring-zuul-tracing-tests</module>
     <module>spring-pojo-tests</module>
     <module>dynamic-config-tests</module>
+    <module>spring-pojo-connection-limit-test</module>
   </modules>
 
   <dependencyManagement>
diff --git a/integration-tests/spring-pojo-connection-limit-test/pom.xml b/integration-tests/spring-pojo-connection-limit-test/pom.xml
new file mode 100644
index 0000000..c0da88e
--- /dev/null
+++ b/integration-tests/spring-pojo-connection-limit-test/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>integration-tests</artifactId>
+    <groupId>org.apache.servicecomb.tests</groupId>
+    <version>1.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>spring-pojo-connection-limit-test</artifactId>
+  <name>Java Chassis::Integration Tests::Spring POJO Connection Limit</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.servicecomb.tests</groupId>
+      <artifactId>pojo-test</artifactId>
+      <version>1.0.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.servicecomb.demo</groupId>
+          <artifactId>demo-signature</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-autoconfigure</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.servicecomb</groupId>
+      <artifactId>spring-boot-starter-provider</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-test</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-test</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.hibernate</groupId>
+      <artifactId>hibernate-validator</artifactId>
+    </dependency>
+
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java b/integration-tests/spring-pojo-connection-limit-test/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringConnectionLimitIntegrationTest.java
similarity index 65%
copy from integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java
copy to integration-tests/spring-pojo-connection-limit-test/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringConnectionLimitIntegrationTest.java
index 29f1c97..69015c8 100644
--- a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java
+++ b/integration-tests/spring-pojo-connection-limit-test/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringConnectionLimitIntegrationTest.java
@@ -17,17 +17,31 @@
 
 package org.apache.servicecomb.demo.pojo.test;
 
+import static org.apache.servicecomb.serviceregistry.client.LocalServiceRegistryClientImpl.LOCAL_REGISTRY_FILE_KEY;
+import static org.junit.Assert.fail;
+
+import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = PojoSpringMain.class)
-public class PojoSpringIntegrationTest extends PojoIntegrationTestBase {
-
+public class PojoSpringConnectionLimitIntegrationTest {
   @BeforeClass
   public static void setUpClass() {
-    setUpLocalRegistry();
+    System.setProperty(LOCAL_REGISTRY_FILE_KEY, "notExistJustForceLocal");
+  }
+
+  @Test
+  public void remoteHelloPojo_sayHello() {
+    try {
+      PojoService.hello.SayHello("whatever");
+      fail("connection limit failed");
+    } catch (Exception e) {
+      Assert.assertEquals("java.io.IOException: socket closed", e.getCause().toString());
+    }
   }
-}
+}
\ No newline at end of file
diff --git a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java b/integration-tests/spring-pojo-connection-limit-test/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringMain.java
similarity index 67%
copy from integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java
copy to integration-tests/spring-pojo-connection-limit-test/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringMain.java
index 29f1c97..1375571 100644
--- a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java
+++ b/integration-tests/spring-pojo-connection-limit-test/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringMain.java
@@ -17,17 +17,15 @@
 
 package org.apache.servicecomb.demo.pojo.test;
 
-import org.junit.BeforeClass;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
+import org.apache.servicecomb.springboot.starter.provider.EnableServiceComb;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
 
-@RunWith(SpringRunner.class)
-@SpringBootTest(classes = PojoSpringMain.class)
-public class PojoSpringIntegrationTest extends PojoIntegrationTestBase {
+@SpringBootApplication
+@EnableServiceComb
+public class PojoSpringMain {
 
-  @BeforeClass
-  public static void setUpClass() {
-    setUpLocalRegistry();
+  public static void main(final String[] args) {
+    SpringApplication.run(PojoSpringMain.class, args);
   }
-}
+}
\ No newline at end of file
diff --git a/integration-tests/spring-pojo-connection-limit-test/src/test/resources/log4j.properties b/integration-tests/spring-pojo-connection-limit-test/src/test/resources/log4j.properties
new file mode 100644
index 0000000..e18648a
--- /dev/null
+++ b/integration-tests/spring-pojo-connection-limit-test/src/test/resources/log4j.properties
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+log4j.rootLogger=INFO, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
\ No newline at end of file
diff --git a/integration-tests/spring-pojo-connection-limit-test/src/test/resources/microservice.yaml b/integration-tests/spring-pojo-connection-limit-test/src/test/resources/microservice.yaml
new file mode 100644
index 0000000..3935df6
--- /dev/null
+++ b/integration-tests/spring-pojo-connection-limit-test/src/test/resources/microservice.yaml
@@ -0,0 +1,29 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+APPLICATION_ID: pojotest-it
+service_description:
+  name: pojo-connection-limit
+  version: 0.0.4
+servicecomb:
+  service:
+    registry:
+      address: http://127.0.0.1:30100
+  highway:
+    address: 0.0.0.0:7070
+    server:
+      connection-limit: 0
\ No newline at end of file
diff --git a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java b/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/ConnectionEventWatcher.java
similarity index 57%
copy from integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java
copy to integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/ConnectionEventWatcher.java
index 29f1c97..68e58cc 100644
--- a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java
+++ b/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/ConnectionEventWatcher.java
@@ -17,17 +17,28 @@
 
 package org.apache.servicecomb.demo.pojo.test;
 
-import org.junit.BeforeClass;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest(classes = PojoSpringMain.class)
-public class PojoSpringIntegrationTest extends PojoIntegrationTestBase {
-
-  @BeforeClass
-  public static void setUpClass() {
-    setUpLocalRegistry();
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.servicecomb.foundation.vertx.ClientClosedEvent;
+import org.apache.servicecomb.foundation.vertx.ClientConnectedEvent;
+
+import com.google.common.eventbus.Subscribe;
+
+public class ConnectionEventWatcher {
+  private final List<Integer> counters = new ArrayList<>();
+
+  public List<Integer> getCounters() {
+    return counters;
+  }
+
+  @Subscribe
+  public void onConnected(ClientConnectedEvent event) {
+    counters.add(event.getTotalConnectedCount());
+  }
+
+  @Subscribe
+  public void onClosed(ClientClosedEvent event) {
+    counters.add(event.getTotalConnectedCount());
   }
 }
diff --git a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java b/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java
index 29f1c97..749a4e5 100644
--- a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java
+++ b/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/PojoSpringIntegrationTest.java
@@ -17,6 +17,10 @@
 
 package org.apache.servicecomb.demo.pojo.test;
 
+import org.apache.servicecomb.core.SCBEngine;
+import org.apache.servicecomb.foundation.common.event.EventManager;
+import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.runner.RunWith;
 import org.springframework.boot.test.context.SpringBootTest;
@@ -26,8 +30,17 @@ import org.springframework.test.context.junit4.SpringRunner;
 @SpringBootTest(classes = PojoSpringMain.class)
 public class PojoSpringIntegrationTest extends PojoIntegrationTestBase {
 
+  private static final ConnectionEventWatcher watcher = new ConnectionEventWatcher();
+
   @BeforeClass
   public static void setUpClass() {
     setUpLocalRegistry();
+    EventManager.register(watcher);
+  }
+
+  @AfterClass
+  public static void teardownClass() {
+    SCBEngine.getInstance().destroy();
+    Assert.assertArrayEquals("check connection count change", new Integer[] {1, 0}, watcher.getCounters().toArray());
   }
 }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
index 79b5e02..1f5a759 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
@@ -16,6 +16,8 @@
  */
 package org.apache.servicecomb.transport.highway;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.ws.rs.core.Response.Status;
 
 import org.apache.servicecomb.core.Endpoint;
@@ -45,9 +47,9 @@ public class HighwayServerConnection extends TcpServerConnection implements TcpB
   }
 
   @Override
-  public void init(NetSocket netSocket) {
+  public void init(NetSocket netSocket, AtomicInteger connectedCounter) {
     splitter = new TcpParser(this);
-    super.init(netSocket);
+    super.init(netSocket, connectedCounter);
   }
 
   @Override
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
index e951b97..4f49fbc 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
@@ -17,6 +17,7 @@
 package org.apache.servicecomb.transport.highway;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.xml.ws.Holder;
 
@@ -75,7 +76,7 @@ public class TestHighwayServerConnection {
       }
     };
     connection = new HighwayServerConnection(endpoint);
-    connection.init(netSocket);
+    connection.init(netSocket, new AtomicInteger());
 
     header = new RequestHeader();
   }