You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2019/12/03 06:18:36 UTC
[servicecomb-java-chassis] 07/14: [SCB-1592]upgrade vert.x and
netty.(1)update pom and fix compile errors
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit 06d7f23c1848751597f5913fb2f4a7b938276ea7
Author: liubao <bi...@qq.com>
AuthorDate: Fri Nov 15 09:24:30 2019 +0800
[SCB-1592]upgrade vert.x and netty.(1)update pom and fix compile errors
---
.../servicecomb/edge/core/TestEdgeInvocation.java | 7 -
.../main/java/io/vertx/core/impl/SyncContext.java | 28 +-
.../main/java/io/vertx/core/impl/SyncVertx.java | 53 -
.../main/java/io/vertx/core/impl/VertxImpl.java | 1137 --------------------
.../main/java/io/vertx/core/impl/VertxImpl.java | 1118 -------------------
.../vertx/ext/web/impl/HttpServerRequestUtils.java | 26 -
.../VertxServerRequestToHttpServletRequest.java | 4 +-
.../vertx/stream/InputStreamToReadStream.java | 3 +-
.../vertx/stream/OutputStreamToWriteStream.java | 25 +-
.../ext/web/impl/TestHttpServerRequestUtils.java | 8 -
.../foundation/vertx/TestVertxUtils.java | 6 +-
.../vertx/client/TestClientPoolManager.java | 20 +-
.../vertx/client/tcp/TestTcpClientConnection.java | 1 +
.../foundation/vertx/http/TestReadStreamPart.java | 10 +-
...TestVertxServerRequestToHttpServletRequest.java | 11 +-
...stVertxServerResponseToHttpServletResponse.java | 17 +-
.../foundation/vertx/stream/TestPumpFromPart.java | 4 +-
.../rest/vertx/TestVertxRestDispatcher.java | 53 +-
18 files changed, 122 insertions(+), 2409 deletions(-)
diff --git a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
index a566085..8995cbd 100644
--- a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
+++ b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
@@ -87,13 +87,6 @@ public class TestEdgeInvocation {
@Before
public void setup() {
- new Expectations(VertxImpl.class) {
- {
- VertxImpl.context();
- result = context;
- }
- };
-
referenceConfig.setMicroserviceVersionRule(microserviceVersionRule);
referenceConfig.setTransport("rest");
diff --git a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java
index 4fd3369..0640b48 100644
--- a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java
+++ b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java
@@ -19,20 +19,28 @@ package io.vertx.core.impl;
import java.util.concurrent.Executor;
import io.vertx.core.AsyncResult;
-import io.vertx.core.Future;
import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
import io.vertx.core.spi.metrics.PoolMetrics;
public class SyncContext extends EventLoopContext {
+ protected VertxInternal owner;
+
public SyncContext() {
this(null);
}
public SyncContext(VertxInternal vertx) {
super(vertx, null, null, null, null, null, null);
- if (SyncVertx.class.isInstance(vertx)) {
- ((SyncVertx) vertx).setContext(this);
- }
+ }
+
+ public VertxInternal owner() {
+ return owner;
+ }
+
+ public void setOwner(VertxInternal owner) {
+ this.owner = owner;
}
@Override
@@ -40,9 +48,9 @@ public class SyncContext extends EventLoopContext {
task.handle(null);
}
- public static <T> void syncExecuteBlocking(Handler<Future<T>> blockingCodeHandler,
+ public static <T> void syncExecuteBlocking(Handler<Promise<T>> blockingCodeHandler,
Handler<AsyncResult<T>> asyncResultHandler) {
- Future<T> res = Future.future();
+ Promise<T> res = Promise.promise();
try {
blockingCodeHandler.handle(res);
@@ -51,11 +59,11 @@ public class SyncContext extends EventLoopContext {
return;
}
- res.setHandler(asyncResultHandler);
+ res.future().setHandler(asyncResultHandler);
}
@Override
- public <T> void executeBlockingInternal(Handler<Future<T>> action, Handler<AsyncResult<T>> resultHandler) {
+ public <T> void executeBlockingInternal(Handler<Promise<T>> action, Handler<AsyncResult<T>> resultHandler) {
syncExecuteBlocking((future) -> {
try {
action.handle(future);
@@ -66,13 +74,13 @@ public class SyncContext extends EventLoopContext {
}
@Override
- public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
+ public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered,
Handler<AsyncResult<T>> asyncResultHandler) {
syncExecuteBlocking(blockingCodeHandler, asyncResultHandler);
}
@Override
- <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler,
+ <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler,
Executor exec, TaskQueue queue, @SuppressWarnings("rawtypes") PoolMetrics metrics) {
syncExecuteBlocking(blockingCodeHandler, resultHandler);
diff --git a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java
deleted file mode 100644
index c200ed7..0000000
--- a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.vertx.core.impl;
-
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
-import io.vertx.core.VertxOptions;
-import io.vertx.core.net.impl.transport.Transport;
-
-/**
- * after test finished, need to invoke vertx.close
- */
-public class SyncVertx extends VertxImpl {
- private ContextImpl context = new SyncContext(this);
-
- public SyncVertx() {
- this(new VertxOptions(), null);
- }
-
- protected SyncVertx(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
- super(options, Transport.transport(options.getPreferNativeTransport()));
- init();
- }
-
- @Override
- public ContextImpl getContext() {
- return context;
- }
-
- public void setContext(ContextImpl context) {
- this.context = context;
- }
-
- @Override
- public ContextImpl getOrCreateContext() {
- return context;
- }
-}
diff --git a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/VertxImpl.java b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/VertxImpl.java
deleted file mode 100644
index 92dd1d0..0000000
--- a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/VertxImpl.java
+++ /dev/null
@@ -1,1137 +0,0 @@
-/*
- * Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
- * which is available at https://www.apache.org/licenses/LICENSE-2.0.
- *
- * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
- */
-
-package io.vertx.core.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
-
-import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.resolver.AddressResolverGroup;
-import io.netty.util.ResourceLeakDetector;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Closeable;
-import io.vertx.core.Context;
-import io.vertx.core.DeploymentOptions;
-import io.vertx.core.Future;
-import io.vertx.core.Handler;
-import io.vertx.core.ServiceHelper;
-import io.vertx.core.TimeoutStream;
-import io.vertx.core.Verticle;
-import io.vertx.core.Vertx;
-import io.vertx.core.VertxOptions;
-import io.vertx.core.datagram.DatagramSocket;
-import io.vertx.core.datagram.DatagramSocketOptions;
-import io.vertx.core.datagram.impl.DatagramSocketImpl;
-import io.vertx.core.dns.AddressResolverOptions;
-import io.vertx.core.dns.DnsClient;
-import io.vertx.core.dns.DnsClientOptions;
-import io.vertx.core.dns.impl.DnsClientImpl;
-import io.vertx.core.eventbus.EventBus;
-import io.vertx.core.eventbus.impl.EventBusImpl;
-import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus;
-import io.vertx.core.file.FileSystem;
-import io.vertx.core.file.impl.FileResolver;
-import io.vertx.core.file.impl.FileSystemImpl;
-import io.vertx.core.file.impl.WindowsFileSystem;
-import io.vertx.core.http.HttpClient;
-import io.vertx.core.http.HttpClientOptions;
-import io.vertx.core.http.HttpServer;
-import io.vertx.core.http.HttpServerOptions;
-import io.vertx.core.http.impl.HttpClientImpl;
-import io.vertx.core.http.impl.HttpServerImpl;
-import io.vertx.core.impl.resolver.DnsResolverProvider;
-import io.vertx.core.json.JsonObject;
-import io.vertx.core.logging.Logger;
-import io.vertx.core.logging.LoggerFactory;
-import io.vertx.core.net.NetClient;
-import io.vertx.core.net.NetClientOptions;
-import io.vertx.core.net.NetServer;
-import io.vertx.core.net.NetServerOptions;
-import io.vertx.core.net.impl.NetClientImpl;
-import io.vertx.core.net.impl.NetServerImpl;
-import io.vertx.core.net.impl.ServerID;
-import io.vertx.core.net.impl.transport.Transport;
-import io.vertx.core.shareddata.SharedData;
-import io.vertx.core.shareddata.impl.SharedDataImpl;
-import io.vertx.core.spi.VerticleFactory;
-import io.vertx.core.spi.VertxMetricsFactory;
-import io.vertx.core.spi.cluster.ClusterManager;
-import io.vertx.core.spi.metrics.Metrics;
-import io.vertx.core.spi.metrics.MetricsProvider;
-import io.vertx.core.spi.metrics.PoolMetrics;
-import io.vertx.core.spi.metrics.VertxMetrics;
-
-/**
- * @author <a href="http://tfox.org">Tim Fox</a>
- */
-public class VertxImpl implements VertxInternal, MetricsProvider {
-
- private static final Logger log = LoggerFactory.getLogger(VertxImpl.class);
-
- private static final String CLUSTER_MAP_NAME = "__vertx.haInfo";
- private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio";
- private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50);
-
- static {
- // Netty resource leak detection has a performance overhead and we do not need it in Vert.x
- ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
- // Use the JDK deflater/inflater by default
- System.setProperty("io.netty.noJdkZlibDecoder", "false");
- }
-
- static VertxImpl vertx(VertxOptions options) {
- VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport()));
- vertx.init();
- return vertx;
- }
-
- static VertxImpl vertx(VertxOptions options, Transport transport) {
- VertxImpl vertx = new VertxImpl(options, transport);
- vertx.init();
- return vertx;
- }
-
- static void clusteredVertx(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
- VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport()));
- vertx.joinCluster(options, resultHandler);
- }
-
- private final FileSystem fileSystem = getFileSystem();
- private final SharedData sharedData;
- private final VertxMetrics metrics;
- private final ConcurrentMap<Long, InternalTimerHandler> timeouts = new ConcurrentHashMap<>();
- private final AtomicLong timeoutCounter = new AtomicLong(0);
- private final ClusterManager clusterManager;
- private final DeploymentManager deploymentManager;
- private final FileResolver fileResolver;
- private final Map<ServerID, HttpServerImpl> sharedHttpServers = new HashMap<>();
- private final Map<ServerID, NetServerImpl> sharedNetServers = new HashMap<>();
- final WorkerPool workerPool;
- final WorkerPool internalBlockingPool;
- private final ThreadFactory eventLoopThreadFactory;
- private final EventLoopGroup eventLoopGroup;
- private final EventLoopGroup acceptorEventLoopGroup;
- private final BlockedThreadChecker checker;
- private final AddressResolver addressResolver;
- private final AddressResolverOptions addressResolverOptions;
- private final EventBus eventBus;
- private volatile HAManager haManager;
- private boolean closed;
- private volatile Handler<Throwable> exceptionHandler;
- private final Map<String, SharedWorkerPool> namedWorkerPools;
- private final int defaultWorkerPoolSize;
- private final long defaultWorkerMaxExecTime;
- private final CloseHooks closeHooks;
- private final Transport transport;
-
- @SuppressWarnings("rawtypes")
- public VertxImpl(VertxOptions options, Transport transport) {
- // Sanity check
- if (Vertx.currentContext() != null) {
- log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?");
- }
- closeHooks = new CloseHooks(log);
- checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit());
- eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit());
- eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);
- ThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit());
- // The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections
- // under a lot of load
- acceptorEventLoopGroup = transport.eventLoopGroup(1, acceptorEventLoopThreadFactory, 100);
-
- metrics = initialiseMetrics(options);
-
- ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
- new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()));
- PoolMetrics workerPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
- ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
- new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()));
- PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;
- internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);
- namedWorkerPools = new HashMap<>();
- workerPool = new WorkerPool(workerExec, workerPoolMetrics);
- defaultWorkerPoolSize = options.getWorkerPoolSize();
- defaultWorkerMaxExecTime = options.getMaxWorkerExecuteTime();
-
- this.transport = transport;
- this.fileResolver = new FileResolver(options.getFileSystemOptions());
- this.addressResolverOptions = options.getAddressResolverOptions();
- this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions());
- this.deploymentManager = new DeploymentManager(this);
- if (options.isClustered()) {
- this.clusterManager = getClusterManager(options);
- this.eventBus = new ClusteredEventBus(this, options, clusterManager);
- } else {
- this.clusterManager = null;
- this.eventBus = new EventBusImpl(this);
- }
- this.sharedData = new SharedDataImpl(this, clusterManager);
- }
-
- public void init() {
- eventBus.start(ar -> {});
- if (metrics != null) {
- metrics.vertxCreated(this);
- }
- }
-
- private void joinCluster(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
- clusterManager.setVertx(this);
- clusterManager.join(ar -> {
- if (ar.succeeded()) {
- createHaManager(options, resultHandler);
- } else {
- log.error("Failed to join cluster", ar.cause());
- resultHandler.handle(Future.failedFuture(ar.cause()));
- }
- });
- }
-
- private void createHaManager(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
- this.<Map<String, String>>executeBlocking(fut -> {
- fut.complete(clusterManager.getSyncMap(CLUSTER_MAP_NAME));
- }, false, ar -> {
- if (ar.succeeded()) {
- Map<String, String> clusterMap = ar.result();
- haManager = new HAManager(this, deploymentManager, clusterManager, clusterMap, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled());
- startEventBus(resultHandler);
- } else {
- log.error("Failed to start HAManager", ar.cause());
- resultHandler.handle(Future.failedFuture(ar.cause()));
- }
- });
- }
-
- private void startEventBus(Handler<AsyncResult<Vertx>> resultHandler) {
- eventBus.start(ar -> {
- if (ar.succeeded()) {
- initializeHaManager(resultHandler);
- } else {
- log.error("Failed to start event bus", ar.cause());
- resultHandler.handle(Future.failedFuture(ar.cause()));
- }
- });
- }
-
- private void initializeHaManager(Handler<AsyncResult<Vertx>> resultHandler) {
- this.executeBlocking(fut -> {
- // Init the manager (i.e register listener and check the quorum)
- // after the event bus has been fully started and updated its state
- // it will have also set the clustered changed view handler on the ha manager
- haManager.init();
- fut.complete();
- }, false, ar -> {
- if (ar.succeeded()) {
- if (metrics != null) {
- metrics.vertxCreated(this);
- }
- resultHandler.handle(Future.succeededFuture(this));
- } else {
- log.error("Failed to initialize HAManager", ar.cause());
- resultHandler.handle(Future.failedFuture(ar.cause()));
- }
- });
- }
-
- /**
- * @return The FileSystem implementation for the OS
- */
- protected FileSystem getFileSystem() {
- return Utils.isWindows() ? new WindowsFileSystem(this) : new FileSystemImpl(this);
- }
-
- @Override
- public DatagramSocket createDatagramSocket(DatagramSocketOptions options) {
- return DatagramSocketImpl.create(this, options);
- }
-
- @Override
- public DatagramSocket createDatagramSocket() {
- return createDatagramSocket(new DatagramSocketOptions());
- }
-
- public NetServer createNetServer(NetServerOptions options) {
- return new NetServerImpl(this, options);
- }
-
- @Override
- public NetServer createNetServer() {
- return createNetServer(new NetServerOptions());
- }
-
- public NetClient createNetClient(NetClientOptions options) {
- return new NetClientImpl(this, options);
- }
-
- @Override
- public NetClient createNetClient() {
- return createNetClient(new NetClientOptions());
- }
-
- @Override
- public Transport transport() {
- return transport;
- }
-
- @Override
- public boolean isNativeTransportEnabled() {
- return transport != Transport.JDK;
- }
-
- public FileSystem fileSystem() {
- return fileSystem;
- }
-
- public SharedData sharedData() {
- return sharedData;
- }
-
- public HttpServer createHttpServer(HttpServerOptions serverOptions) {
- return new HttpServerImpl(this, serverOptions);
- }
-
- @Override
- public HttpServer createHttpServer() {
- return createHttpServer(new HttpServerOptions());
- }
-
- public HttpClient createHttpClient(HttpClientOptions options) {
- return new HttpClientImpl(this, options);
- }
-
- @Override
- public HttpClient createHttpClient() {
- return createHttpClient(new HttpClientOptions());
- }
-
- public EventBus eventBus() {
- return eventBus;
- }
-
- public long setPeriodic(long delay, Handler<Long> handler) {
- return scheduleTimeout(getOrCreateContext(), handler, delay, true);
- }
-
- @Override
- public TimeoutStream periodicStream(long delay) {
- return new TimeoutStreamImpl(delay, true);
- }
-
- public long setTimer(long delay, Handler<Long> handler) {
- return scheduleTimeout(getOrCreateContext(), handler, delay, false);
- }
-
- @Override
- public TimeoutStream timerStream(long delay) {
- return new TimeoutStreamImpl(delay, false);
- }
-
- public void runOnContext(Handler<Void> task) {
- ContextImpl context = getOrCreateContext();
- context.runOnContext(task);
- }
-
- // The background pool is used for making blocking calls to legacy synchronous APIs
- public ExecutorService getWorkerPool() {
- return workerPool.executor();
- }
-
- public EventLoopGroup getEventLoopGroup() {
- return eventLoopGroup;
- }
-
- public EventLoopGroup getAcceptorEventLoopGroup() {
- return acceptorEventLoopGroup;
- }
-
- public ContextImpl getOrCreateContext() {
- ContextImpl ctx = getContext();
- if (ctx == null) {
- // We are running embedded - Create a context
- ctx = createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader());
- }
- return ctx;
- }
-
- public Map<ServerID, HttpServerImpl> sharedHttpServers() {
- return sharedHttpServers;
- }
-
- public Map<ServerID, NetServerImpl> sharedNetServers() {
- return sharedNetServers;
- }
-
- @Override
- public boolean isMetricsEnabled() {
- return metrics != null;
- }
-
- @Override
- public Metrics getMetrics() {
- return metrics;
- }
-
- public boolean cancelTimer(long id) {
- InternalTimerHandler handler = timeouts.remove(id);
- if (handler != null) {
- handler.context.removeCloseHook(handler);
- return handler.cancel();
- } else {
- return false;
- }
- }
-
- @Override
- public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) {
- return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl);
- }
-
- @Override
- public ContextImpl createWorkerContext(boolean multiThreaded, String deploymentID, WorkerPool workerPool, JsonObject config,
- ClassLoader tccl) {
- if (workerPool == null) {
- workerPool = this.workerPool;
- }
- if (multiThreaded) {
- return new MultiThreadedWorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl);
- } else {
- return new WorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl);
- }
- }
-
- @Override
- public DnsClient createDnsClient(int port, String host) {
- return createDnsClient(new DnsClientOptions().setHost(host).setPort(port));
- }
-
- @Override
- public DnsClient createDnsClient() {
- return createDnsClient(new DnsClientOptions());
- }
-
- @Override
- public DnsClient createDnsClient(DnsClientOptions options) {
- String host = options.getHost();
- int port = options.getPort();
- if (host == null || port < 0) {
- DnsResolverProvider provider = new DnsResolverProvider(this, addressResolverOptions);
- InetSocketAddress address = provider.nameServerAddresses().get(0);
- // provide the host and port
- options = new DnsClientOptions(options)
- .setHost(address.getAddress().getHostAddress())
- .setPort(address.getPort());
- }
- return new DnsClientImpl(this, options);
- }
-
- private VertxMetrics initialiseMetrics(VertxOptions options) {
- if (options.getMetricsOptions() != null && options.getMetricsOptions().isEnabled()) {
- VertxMetricsFactory factory = options.getMetricsOptions().getFactory();
- if (factory == null) {
- factory = ServiceHelper.loadFactoryOrNull(VertxMetricsFactory.class);
- if (factory == null) {
- log.warn("Metrics has been set to enabled but no VertxMetricsFactory found on classpath");
- }
- }
- if (factory != null) {
- VertxMetrics metrics = factory.metrics(options);
- Objects.requireNonNull(metrics, "The metric instance created from " + factory + " cannot be null");
- return metrics;
- }
- }
- return null;
- }
-
- private ClusterManager getClusterManager(VertxOptions options) {
- ClusterManager mgr = options.getClusterManager();
- if (mgr == null) {
- String clusterManagerClassName = System.getProperty("vertx.cluster.managerClass");
- if (clusterManagerClassName != null) {
- // We allow specify a sys prop for the cluster manager factory which overrides ServiceLoader
- try {
- Class<?> clazz = Class.forName(clusterManagerClassName);
- mgr = (ClusterManager) clazz.newInstance();
- } catch (Exception e) {
- throw new IllegalStateException("Failed to instantiate " + clusterManagerClassName, e);
- }
- } else {
- mgr = ServiceHelper.loadFactoryOrNull(ClusterManager.class);
- if (mgr == null) {
- throw new IllegalStateException("No ClusterManagerFactory instances found on classpath");
- }
- }
- }
- return mgr;
- }
-
- private long scheduleTimeout(ContextImpl context, Handler<Long> handler, long delay, boolean periodic) {
- if (delay < 1) {
- throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms");
- }
- long timerId = timeoutCounter.getAndIncrement();
- InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, delay, context);
- timeouts.put(timerId, task);
- context.addCloseHook(task);
- return timerId;
- }
-
- public static Context context() {
- Thread current = Thread.currentThread();
- if (current instanceof VertxThread) {
- return ((VertxThread) current).getContext();
- }
- return null;
- }
-
- public ContextImpl getContext() {
- ContextImpl context = (ContextImpl) context();
- if (context != null && context.owner == this) {
- return context;
- }
- return null;
- }
-
- public ClusterManager getClusterManager() {
- return clusterManager;
- }
-
- @Override
- public void close() {
- close(null);
- }
-
- private void closeClusterManager(Handler<AsyncResult<Void>> completionHandler) {
- if (clusterManager != null) {
- clusterManager.leave(ar -> {
- if (ar.failed()) {
- log.error("Failed to leave cluster", ar.cause());
- }
- if (completionHandler != null) {
- runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
- }
- });
- } else if (completionHandler != null) {
- runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
- }
- }
-
- @Override
- public synchronized void close(Handler<AsyncResult<Void>> completionHandler) {
- if (closed || eventBus == null) {
- // Just call the handler directly since pools shutdown
- if (completionHandler != null) {
- completionHandler.handle(Future.succeededFuture());
- }
- return;
- }
- closed = true;
-
- closeHooks.run(ar -> {
- deploymentManager.undeployAll(ar1 -> {
- HAManager haManager = haManager();
- Future<Void> haFuture = Future.future();
- if (haManager != null) {
- this.executeBlocking(fut -> {
- haManager.stop();
- fut.complete();
- }, false, haFuture);
- } else {
- haFuture.complete();
- }
- haFuture.setHandler(ar2 -> {
- addressResolver.close(ar3 -> {
- eventBus.close(ar4 -> {
- closeClusterManager(ar5 -> {
- // Copy set to prevent ConcurrentModificationException
- Set<HttpServerImpl> httpServers = new HashSet<>(sharedHttpServers.values());
- Set<NetServerImpl> netServers = new HashSet<>(sharedNetServers.values());
- sharedHttpServers.clear();
- sharedNetServers.clear();
-
- int serverCount = httpServers.size() + netServers.size();
-
- AtomicInteger serverCloseCount = new AtomicInteger();
-
- Handler<AsyncResult<Void>> serverCloseHandler = res -> {
- if (res.failed()) {
- log.error("Failure in shutting down server", res.cause());
- }
- if (serverCloseCount.incrementAndGet() == serverCount) {
- deleteCacheDirAndShutdown(completionHandler);
- }
- };
-
- for (HttpServerImpl server : httpServers) {
- server.closeAll(serverCloseHandler);
- }
- for (NetServerImpl server : netServers) {
- server.closeAll(serverCloseHandler);
- }
- if (serverCount == 0) {
- deleteCacheDirAndShutdown(completionHandler);
- }
- });
- });
- });
- });
- });
- });
- }
-
- @Override
- public void deployVerticle(Verticle verticle) {
- deployVerticle(verticle, new DeploymentOptions(), null);
- }
-
- @Override
- public void deployVerticle(Verticle verticle, Handler<AsyncResult<String>> completionHandler) {
- deployVerticle(verticle, new DeploymentOptions(), completionHandler);
- }
-
- @Override
- public void deployVerticle(String name, Handler<AsyncResult<String>> completionHandler) {
- deployVerticle(name, new DeploymentOptions(), completionHandler);
- }
-
- @Override
- public void deployVerticle(Verticle verticle, DeploymentOptions options) {
- deployVerticle(verticle, options, null);
- }
-
- @Override
- public void deployVerticle(Class<? extends Verticle> verticleClass, DeploymentOptions options) {
- deployVerticle(() -> {
- try {
- return verticleClass.newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, options);
- }
-
- @Override
- public void deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options) {
- deployVerticle(verticleSupplier, options, null);
- }
-
- @Override
- public void deployVerticle(Verticle verticle, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
- if (options.getInstances() != 1) {
- throw new IllegalArgumentException("Can't specify > 1 instances for already created verticle");
- }
- deployVerticle(() -> verticle, options, completionHandler);
- }
-
- @Override
- public void deployVerticle(Class<? extends Verticle> verticleClass, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
- deployVerticle(() -> {
- try {
- return verticleClass.newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, options, completionHandler);
- }
-
- @Override
- public void deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
- boolean closed;
- synchronized (this) {
- closed = this.closed;
- }
- if (closed) {
- if (completionHandler != null) {
- completionHandler.handle(Future.failedFuture("Vert.x closed"));
- }
- } else {
- deploymentManager.deployVerticle(verticleSupplier, options, completionHandler);
- }
- }
-
- @Override
- public void deployVerticle(String name) {
- deployVerticle(name, new DeploymentOptions(), null);
- }
-
- @Override
- public void deployVerticle(String name, DeploymentOptions options) {
- deployVerticle(name, options, null);
- }
-
- @Override
- public void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
- if (options.isHa() && haManager() != null && haManager().isEnabled()) {
- haManager().deployVerticle(name, options, completionHandler);
- } else {
- deploymentManager.deployVerticle(name, options, completionHandler);
- }
- }
-
- @Override
- public String getNodeID() {
- return clusterManager.getNodeID();
- }
-
- @Override
- public void undeploy(String deploymentID) {
- undeploy(deploymentID, res -> {
- });
- }
-
- @Override
- public void undeploy(String deploymentID, Handler<AsyncResult<Void>> completionHandler) {
- HAManager haManager = haManager();
- Future<Void> haFuture = Future.future();
- if (haManager != null && haManager.isEnabled()) {
- this.executeBlocking(fut -> {
- haManager.removeFromHA(deploymentID);
- fut.complete();
- }, false, haFuture);
- } else {
- haFuture.complete();
- }
- haFuture.compose(v -> {
- Future<Void> deploymentFuture = Future.future();
- deploymentManager.undeployVerticle(deploymentID, deploymentFuture);
- return deploymentFuture;
- }).setHandler(completionHandler);
- }
-
- @Override
- public Set<String> deploymentIDs() {
- return deploymentManager.deployments();
- }
-
- @Override
- public void registerVerticleFactory(VerticleFactory factory) {
- deploymentManager.registerVerticleFactory(factory);
- }
-
- @Override
- public void unregisterVerticleFactory(VerticleFactory factory) {
- deploymentManager.unregisterVerticleFactory(factory);
- }
-
- @Override
- public Set<VerticleFactory> verticleFactories() {
- return deploymentManager.verticleFactories();
- }
-
- @Override
- public <T> void executeBlockingInternal(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
- ContextImpl context = getOrCreateContext();
-
- context.executeBlockingInternal(blockingCodeHandler, resultHandler);
- }
-
- @Override
- public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
- Handler<AsyncResult<T>> asyncResultHandler) {
- ContextImpl context = getOrCreateContext();
- context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler);
- }
-
- @Override
- public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler,
- Handler<AsyncResult<T>> asyncResultHandler) {
- executeBlocking(blockingCodeHandler, true, asyncResultHandler);
- }
-
- @Override
- public boolean isClustered() {
- return clusterManager != null;
- }
-
- @Override
- public EventLoopGroup nettyEventLoopGroup() {
- return eventLoopGroup;
- }
-
- // For testing
- public void simulateKill() {
- if (haManager() != null) {
- haManager().simulateKill();
- }
- }
-
- @Override
- public Deployment getDeployment(String deploymentID) {
- return deploymentManager.getDeployment(deploymentID);
- }
-
- @Override
- public synchronized void failoverCompleteHandler(FailoverCompleteHandler failoverCompleteHandler) {
- if (haManager() != null) {
- haManager().setFailoverCompleteHandler(failoverCompleteHandler);
- }
- }
-
- @Override
- public boolean isKilled() {
- return haManager().isKilled();
- }
-
- @Override
- public void failDuringFailover(boolean fail) {
- if (haManager() != null) {
- haManager().failDuringFailover(fail);
- }
- }
-
- @Override
- public VertxMetrics metricsSPI() {
- return metrics;
- }
-
- @Override
- public File resolveFile(String fileName) {
- return fileResolver.resolveFile(fileName);
- }
-
- @Override
- public void resolveAddress(String hostname, Handler<AsyncResult<InetAddress>> resultHandler) {
- addressResolver.resolveHostname(hostname, resultHandler);
- }
-
- @Override
- public AddressResolver addressResolver() {
- return addressResolver;
- }
-
- @Override
- public AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup() {
- return addressResolver.nettyAddressResolverGroup();
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private void deleteCacheDirAndShutdown(Handler<AsyncResult<Void>> completionHandler) {
- executeBlockingInternal(fut -> {
- try {
- fileResolver.close();
- fut.complete();
- } catch (IOException e) {
- fut.tryFail(e);
- }
- }, ar -> {
-
- workerPool.close();
- internalBlockingPool.close();
- new ArrayList<>(namedWorkerPools.values()).forEach(WorkerPool::close);
-
- acceptorEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() {
- @Override
- public void operationComplete(io.netty.util.concurrent.Future future) throws Exception {
- if (!future.isSuccess()) {
- log.warn("Failure in shutting down acceptor event loop group", future.cause());
- }
- eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() {
- @Override
- public void operationComplete(io.netty.util.concurrent.Future future) throws Exception {
- if (!future.isSuccess()) {
- log.warn("Failure in shutting down event loop group", future.cause());
- }
- if (metrics != null) {
- metrics.close();
- }
-
- checker.close();
-
- if (completionHandler != null) {
- eventLoopThreadFactory.newThread(() -> {
- completionHandler.handle(Future.succeededFuture());
- }).start();
- }
- }
- });
- }
- });
- });
- }
-
- public HAManager haManager() {
- return haManager;
- }
-
- private class InternalTimerHandler implements Handler<Void>, Closeable {
- final Handler<Long> handler;
- final boolean periodic;
- final long timerID;
- final ContextImpl context;
- final java.util.concurrent.Future<?> future;
- final AtomicBoolean cancelled;
-
- boolean cancel() {
- if (cancelled.compareAndSet(false, true)) {
- if (metrics != null) {
- metrics.timerEnded(timerID, true);
- }
- future.cancel(false);
- return true;
- } else {
- return false;
- }
- }
-
- InternalTimerHandler(long timerID, Handler<Long> runnable, boolean periodic, long delay, ContextImpl context) {
- this.context = context;
- this.timerID = timerID;
- this.handler = runnable;
- this.periodic = periodic;
- this.cancelled = new AtomicBoolean();
- EventLoop el = context.nettyEventLoop();
- Runnable toRun = () -> context.runOnContext(this);
- if (periodic) {
- future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS);
- } else {
- future = el.schedule(toRun, delay, TimeUnit.MILLISECONDS);
- }
- if (metrics != null) {
- metrics.timerCreated(timerID);
- }
- }
-
- public void handle(Void v) {
- if (!cancelled.get()) {
- try {
- handler.handle(timerID);
- } finally {
- if (!periodic) {
- // Clean up after it's fired
- cleanupNonPeriodic();
- }
- }
- }
- }
-
- private void cleanupNonPeriodic() {
- VertxImpl.this.timeouts.remove(timerID);
- if (metrics != null) {
- metrics.timerEnded(timerID, false);
- }
- ContextImpl context = getContext();
- if (context != null) {
- context.removeCloseHook(this);
- }
- }
-
- // Called via Context close hook when Verticle is undeployed
- public void close(Handler<AsyncResult<Void>> completionHandler) {
- VertxImpl.this.timeouts.remove(timerID);
- cancel();
- completionHandler.handle(Future.succeededFuture());
- }
-
- }
-
- /*
- *
- * This class is optimised for performance when used on the same event loop that is was passed to the handler with.
- * However it can be used safely from other threads.
- *
- * The internal state is protected using the synchronized keyword. If always used on the same event loop, then
- * we benefit from biased locking which makes the overhead of synchronized near zero.
- *
- */
- private class TimeoutStreamImpl implements TimeoutStream, Handler<Long> {
-
- private final long delay;
- private final boolean periodic;
-
- private Long id;
- private Handler<Long> handler;
- private Handler<Void> endHandler;
- private long demand;
-
- public TimeoutStreamImpl(long delay, boolean periodic) {
- this.delay = delay;
- this.periodic = periodic;
- this.demand = Long.MAX_VALUE;
- }
-
- @Override
- public synchronized void handle(Long event) {
- try {
- if (demand > 0) {
- demand--;
- handler.handle(event);
- }
- } finally {
- if (!periodic && endHandler != null) {
- endHandler.handle(null);
- }
- }
- }
-
- @Override
- public synchronized TimeoutStream fetch(long amount) {
- demand += amount;
- if (demand < 0) {
- demand = Long.MAX_VALUE;
- }
- return this;
- }
-
- @Override
- public TimeoutStream exceptionHandler(Handler<Throwable> handler) {
- return this;
- }
-
- @Override
- public void cancel() {
- if (id != null) {
- VertxImpl.this.cancelTimer(id);
- }
- }
-
- @Override
- public synchronized TimeoutStream handler(Handler<Long> handler) {
- if (handler != null) {
- if (id != null) {
- throw new IllegalStateException();
- }
- this.handler = handler;
- id = scheduleTimeout(getOrCreateContext(), this, delay, periodic);
- } else {
- cancel();
- }
- return this;
- }
-
- @Override
- public synchronized TimeoutStream pause() {
- demand = 0;
- return this;
- }
-
- @Override
- public synchronized TimeoutStream resume() {
- demand = Long.MAX_VALUE;
- return this;
- }
-
- @Override
- public synchronized TimeoutStream endHandler(Handler<Void> endHandler) {
- this.endHandler = endHandler;
- return this;
- }
- }
-
- class SharedWorkerPool extends WorkerPool {
-
- private final String name;
- private int refCount = 1;
-
- @SuppressWarnings("rawtypes")
- SharedWorkerPool(String name, ExecutorService workerExec, PoolMetrics workerMetrics) {
- super(workerExec, workerMetrics);
- this.name = name;
- }
-
- @Override
- void close() {
- synchronized (VertxImpl.this) {
- if (refCount > 0) {
- refCount = 0;
- super.close();
- }
- }
- }
-
- void release() {
- synchronized (VertxImpl.this) {
- if (--refCount == 0) {
- namedWorkerPools.remove(name);
- super.close();
- }
- }
- }
- }
-
- @Override
- public WorkerExecutorImpl createSharedWorkerExecutor(String name) {
- return createSharedWorkerExecutor(name, defaultWorkerPoolSize);
- }
-
- @Override
- public WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize) {
- return createSharedWorkerExecutor(name, poolSize, defaultWorkerMaxExecTime);
- }
-
- @Override
- public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime) {
- return createSharedWorkerExecutor(name, poolSize, maxExecuteTime, TimeUnit.NANOSECONDS);
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime, TimeUnit maxExecuteTimeUnit) {
- if (poolSize < 1) {
- throw new IllegalArgumentException("poolSize must be > 0");
- }
- if (maxExecuteTime < 1) {
- throw new IllegalArgumentException("maxExecuteTime must be > 0");
- }
- SharedWorkerPool sharedWorkerPool = namedWorkerPools.get(name);
- if (sharedWorkerPool == null) {
- ExecutorService workerExec = Executors.newFixedThreadPool(poolSize, new VertxThreadFactory(name + "-", checker, true, maxExecuteTime, maxExecuteTimeUnit));
- PoolMetrics workerMetrics = metrics != null ? metrics.createPoolMetrics("worker", name, poolSize) : null;
- namedWorkerPools.put(name, sharedWorkerPool = new SharedWorkerPool(name, workerExec, workerMetrics));
- } else {
- sharedWorkerPool.refCount++;
- }
- ContextImpl context = getOrCreateContext();
- WorkerExecutorImpl namedExec = new WorkerExecutorImpl(context, sharedWorkerPool);
- context.addCloseHook(namedExec);
- return namedExec;
- }
-
- @Override
- public Vertx exceptionHandler(Handler<Throwable> handler) {
- exceptionHandler = handler;
- return this;
- }
-
- @Override
- public Handler<Throwable> exceptionHandler() {
- return exceptionHandler;
- }
-
- @Override
- public void addCloseHook(Closeable hook) {
- closeHooks.add(hook);
- }
-
- @Override
- public void removeCloseHook(Closeable hook) {
- closeHooks.remove(hook);
- }
-}
diff --git a/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImpl.java b/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImpl.java
deleted file mode 100644
index ef61ee2..0000000
--- a/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImpl.java
+++ /dev/null
@@ -1,1118 +0,0 @@
-/*
- * Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
- * which is available at https://www.apache.org/licenses/LICENSE-2.0.
- *
- * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
- */
-
-package io.vertx.core.impl;
-
-import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.resolver.AddressResolverGroup;
-import io.netty.util.ResourceLeakDetector;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.vertx.core.*;
-import io.vertx.core.Future;
-import io.vertx.core.datagram.DatagramSocket;
-import io.vertx.core.datagram.DatagramSocketOptions;
-import io.vertx.core.datagram.impl.DatagramSocketImpl;
-import io.vertx.core.dns.AddressResolverOptions;
-import io.vertx.core.dns.DnsClient;
-import io.vertx.core.dns.DnsClientOptions;
-import io.vertx.core.dns.impl.DnsClientImpl;
-import io.vertx.core.eventbus.EventBus;
-import io.vertx.core.eventbus.impl.EventBusImpl;
-import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus;
-import io.vertx.core.file.FileSystem;
-import io.vertx.core.file.impl.FileResolver;
-import io.vertx.core.file.impl.FileSystemImpl;
-import io.vertx.core.file.impl.WindowsFileSystem;
-import io.vertx.core.http.HttpClient;
-import io.vertx.core.http.HttpClientOptions;
-import io.vertx.core.http.HttpServer;
-import io.vertx.core.http.HttpServerOptions;
-import io.vertx.core.http.impl.HttpClientImpl;
-import io.vertx.core.http.impl.HttpServerImpl;
-import io.vertx.core.impl.resolver.DnsResolverProvider;
-import io.vertx.core.json.JsonObject;
-import io.vertx.core.logging.Logger;
-import io.vertx.core.logging.LoggerFactory;
-import io.vertx.core.net.NetClient;
-import io.vertx.core.net.NetClientOptions;
-import io.vertx.core.net.NetServer;
-import io.vertx.core.net.NetServerOptions;
-import io.vertx.core.net.impl.NetClientImpl;
-import io.vertx.core.net.impl.NetServerImpl;
-import io.vertx.core.net.impl.ServerID;
-import io.vertx.core.net.impl.transport.Transport;
-import io.vertx.core.shareddata.SharedData;
-import io.vertx.core.shareddata.impl.SharedDataImpl;
-import io.vertx.core.spi.VerticleFactory;
-import io.vertx.core.spi.VertxMetricsFactory;
-import io.vertx.core.spi.cluster.ClusterManager;
-import io.vertx.core.spi.metrics.Metrics;
-import io.vertx.core.spi.metrics.MetricsProvider;
-import io.vertx.core.spi.metrics.PoolMetrics;
-import io.vertx.core.spi.metrics.VertxMetrics;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
-
-/**
- * @author <a href="http://tfox.org">Tim Fox</a>
- */
-public class VertxImpl implements VertxInternal, MetricsProvider {
-
- private static final Logger log = LoggerFactory.getLogger(VertxImpl.class);
-
- private static final String CLUSTER_MAP_NAME = "__vertx.haInfo";
- private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio";
- private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50);
-
- static {
- // Netty resource leak detection has a performance overhead and we do not need it in Vert.x
- ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
- // Use the JDK deflater/inflater by default
- System.setProperty("io.netty.noJdkZlibDecoder", "false");
- }
-
- static VertxImpl vertx(VertxOptions options) {
- VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport()));
- vertx.init();
- return vertx;
- }
-
- static VertxImpl vertx(VertxOptions options, Transport transport) {
- VertxImpl vertx = new VertxImpl(options, transport);
- vertx.init();
- return vertx;
- }
-
- static void clusteredVertx(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
- VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport()));
- vertx.joinCluster(options, resultHandler);
- }
-
- private final FileSystem fileSystem = getFileSystem();
- private final SharedData sharedData;
- private final VertxMetrics metrics;
- private final ConcurrentMap<Long, InternalTimerHandler> timeouts = new ConcurrentHashMap<>();
- private final AtomicLong timeoutCounter = new AtomicLong(0);
- private final ClusterManager clusterManager;
- private final DeploymentManager deploymentManager;
- private final FileResolver fileResolver;
- private final Map<ServerID, HttpServerImpl> sharedHttpServers = new HashMap<>();
- private final Map<ServerID, NetServerImpl> sharedNetServers = new HashMap<>();
- final WorkerPool workerPool;
- final WorkerPool internalBlockingPool;
- private final ThreadFactory eventLoopThreadFactory;
- private final EventLoopGroup eventLoopGroup;
- private final EventLoopGroup acceptorEventLoopGroup;
- private final BlockedThreadChecker checker;
- private final AddressResolver addressResolver;
- private final AddressResolverOptions addressResolverOptions;
- private final EventBus eventBus;
- private volatile HAManager haManager;
- private boolean closed;
- private volatile Handler<Throwable> exceptionHandler;
- private final Map<String, SharedWorkerPool> namedWorkerPools;
- private final int defaultWorkerPoolSize;
- private final long defaultWorkerMaxExecTime;
- private final CloseHooks closeHooks;
- private final Transport transport;
-
- @SuppressWarnings("rawtypes")
- public VertxImpl(VertxOptions options, Transport transport) {
- // Sanity check
- if (Vertx.currentContext() != null) {
- log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?");
- }
- closeHooks = new CloseHooks(log);
- checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit());
- eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit());
- eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);
- ThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit());
- // The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections
- // under a lot of load
- acceptorEventLoopGroup = transport.eventLoopGroup(1, acceptorEventLoopThreadFactory, 100);
-
- metrics = initialiseMetrics(options);
-
- ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
- new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()));
- PoolMetrics workerPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
- ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
- new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()));
- PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;
- internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);
- namedWorkerPools = new HashMap<>();
- workerPool = new WorkerPool(workerExec, workerPoolMetrics);
- defaultWorkerPoolSize = options.getWorkerPoolSize();
- defaultWorkerMaxExecTime = options.getMaxWorkerExecuteTime();
-
- this.transport = transport;
- this.fileResolver = new FileResolver(options.getFileSystemOptions());
- this.addressResolverOptions = options.getAddressResolverOptions();
- this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions());
- this.deploymentManager = new DeploymentManager(this);
- if (options.isClustered()) {
- this.clusterManager = getClusterManager(options);
- this.eventBus = new ClusteredEventBus(this, options, clusterManager);
- } else {
- this.clusterManager = null;
- this.eventBus = new EventBusImpl(this);
- }
- this.sharedData = new SharedDataImpl(this, clusterManager);
- }
-
- public void init() {
- eventBus.start(ar -> {});
- if (metrics != null) {
- metrics.vertxCreated(this);
- }
- }
-
- private void joinCluster(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
- clusterManager.setVertx(this);
- clusterManager.join(ar -> {
- if (ar.succeeded()) {
- createHaManager(options, resultHandler);
- } else {
- log.error("Failed to join cluster", ar.cause());
- resultHandler.handle(Future.failedFuture(ar.cause()));
- }
- });
- }
-
- private void createHaManager(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
- this.<Map<String, String>>executeBlocking(fut -> {
- fut.complete(clusterManager.getSyncMap(CLUSTER_MAP_NAME));
- }, false, ar -> {
- if (ar.succeeded()) {
- Map<String, String> clusterMap = ar.result();
- haManager = new HAManager(this, deploymentManager, clusterManager, clusterMap, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled());
- startEventBus(resultHandler);
- } else {
- log.error("Failed to start HAManager", ar.cause());
- resultHandler.handle(Future.failedFuture(ar.cause()));
- }
- });
- }
-
- private void startEventBus(Handler<AsyncResult<Vertx>> resultHandler) {
- eventBus.start(ar -> {
- if (ar.succeeded()) {
- initializeHaManager(resultHandler);
- } else {
- log.error("Failed to start event bus", ar.cause());
- resultHandler.handle(Future.failedFuture(ar.cause()));
- }
- });
- }
-
- private void initializeHaManager(Handler<AsyncResult<Vertx>> resultHandler) {
- this.executeBlocking(fut -> {
- // Init the manager (i.e register listener and check the quorum)
- // after the event bus has been fully started and updated its state
- // it will have also set the clustered changed view handler on the ha manager
- haManager.init();
- fut.complete();
- }, false, ar -> {
- if (ar.succeeded()) {
- if (metrics != null) {
- metrics.vertxCreated(this);
- }
- resultHandler.handle(Future.succeededFuture(this));
- } else {
- log.error("Failed to initialize HAManager", ar.cause());
- resultHandler.handle(Future.failedFuture(ar.cause()));
- }
- });
- }
-
- /**
- * @return The FileSystem implementation for the OS
- */
- protected FileSystem getFileSystem() {
- return Utils.isWindows() ? new WindowsFileSystem(this) : new FileSystemImpl(this);
- }
-
- @Override
- public DatagramSocket createDatagramSocket(DatagramSocketOptions options) {
- return DatagramSocketImpl.create(this, options);
- }
-
- @Override
- public DatagramSocket createDatagramSocket() {
- return createDatagramSocket(new DatagramSocketOptions());
- }
-
- public NetServer createNetServer(NetServerOptions options) {
- return new NetServerImpl(this, options);
- }
-
- @Override
- public NetServer createNetServer() {
- return createNetServer(new NetServerOptions());
- }
-
- public NetClient createNetClient(NetClientOptions options) {
- return new NetClientImpl(this, options);
- }
-
- @Override
- public NetClient createNetClient() {
- return createNetClient(new NetClientOptions());
- }
-
- @Override
- public Transport transport() {
- return transport;
- }
-
- @Override
- public boolean isNativeTransportEnabled() {
- return transport != Transport.JDK;
- }
-
- public FileSystem fileSystem() {
- return fileSystem;
- }
-
- public SharedData sharedData() {
- return sharedData;
- }
-
- public HttpServer createHttpServer(HttpServerOptions serverOptions) {
- return new HttpServerImpl(this, serverOptions);
- }
-
- @Override
- public HttpServer createHttpServer() {
- return createHttpServer(new HttpServerOptions());
- }
-
- public HttpClient createHttpClient(HttpClientOptions options) {
- return new HttpClientImpl(this, options);
- }
-
- @Override
- public HttpClient createHttpClient() {
- return createHttpClient(new HttpClientOptions());
- }
-
- public EventBus eventBus() {
- return eventBus;
- }
-
- public long setPeriodic(long delay, Handler<Long> handler) {
- return scheduleTimeout(getOrCreateContext(), handler, delay, true);
- }
-
- @Override
- public TimeoutStream periodicStream(long delay) {
- return new TimeoutStreamImpl(delay, true);
- }
-
- public long setTimer(long delay, Handler<Long> handler) {
- return scheduleTimeout(getOrCreateContext(), handler, delay, false);
- }
-
- @Override
- public TimeoutStream timerStream(long delay) {
- return new TimeoutStreamImpl(delay, false);
- }
-
- public void runOnContext(Handler<Void> task) {
- ContextImpl context = getOrCreateContext();
- context.runOnContext(task);
- }
-
- // The background pool is used for making blocking calls to legacy synchronous APIs
- public ExecutorService getWorkerPool() {
- return workerPool.executor();
- }
-
- public EventLoopGroup getEventLoopGroup() {
- return eventLoopGroup;
- }
-
- public EventLoopGroup getAcceptorEventLoopGroup() {
- return acceptorEventLoopGroup;
- }
-
- public ContextImpl getOrCreateContext() {
- ContextImpl ctx = getContext();
- if (ctx == null) {
- // We are running embedded - Create a context
- ctx = createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader());
- }
- return ctx;
- }
-
- public Map<ServerID, HttpServerImpl> sharedHttpServers() {
- return sharedHttpServers;
- }
-
- public Map<ServerID, NetServerImpl> sharedNetServers() {
- return sharedNetServers;
- }
-
- @Override
- public boolean isMetricsEnabled() {
- return metrics != null;
- }
-
- @Override
- public Metrics getMetrics() {
- return metrics;
- }
-
- public boolean cancelTimer(long id) {
- InternalTimerHandler handler = timeouts.remove(id);
- if (handler != null) {
- handler.context.removeCloseHook(handler);
- return handler.cancel();
- } else {
- return false;
- }
- }
-
- @Override
- public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) {
- return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl);
- }
-
- @Override
- public ContextImpl createWorkerContext(boolean multiThreaded, String deploymentID, WorkerPool workerPool, JsonObject config,
- ClassLoader tccl) {
- if (workerPool == null) {
- workerPool = this.workerPool;
- }
- if (multiThreaded) {
- return new MultiThreadedWorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl);
- } else {
- return new WorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl);
- }
- }
-
- @Override
- public DnsClient createDnsClient(int port, String host) {
- return createDnsClient(new DnsClientOptions().setHost(host).setPort(port));
- }
-
- @Override
- public DnsClient createDnsClient() {
- return createDnsClient(new DnsClientOptions());
- }
-
- @Override
- public DnsClient createDnsClient(DnsClientOptions options) {
- String host = options.getHost();
- int port = options.getPort();
- if (host == null || port < 0) {
- DnsResolverProvider provider = new DnsResolverProvider(this, addressResolverOptions);
- InetSocketAddress address = provider.nameServerAddresses().get(0);
- // provide the host and port
- options = new DnsClientOptions(options)
- .setHost(address.getAddress().getHostAddress())
- .setPort(address.getPort());
- }
- return new DnsClientImpl(this, options);
- }
-
- private VertxMetrics initialiseMetrics(VertxOptions options) {
- if (options.getMetricsOptions() != null && options.getMetricsOptions().isEnabled()) {
- VertxMetricsFactory factory = options.getMetricsOptions().getFactory();
- if (factory == null) {
- factory = ServiceHelper.loadFactoryOrNull(VertxMetricsFactory.class);
- if (factory == null) {
- log.warn("Metrics has been set to enabled but no VertxMetricsFactory found on classpath");
- }
- }
- if (factory != null) {
- VertxMetrics metrics = factory.metrics(options);
- Objects.requireNonNull(metrics, "The metric instance created from " + factory + " cannot be null");
- return metrics;
- }
- }
- return null;
- }
-
- private ClusterManager getClusterManager(VertxOptions options) {
- ClusterManager mgr = options.getClusterManager();
- if (mgr == null) {
- String clusterManagerClassName = System.getProperty("vertx.cluster.managerClass");
- if (clusterManagerClassName != null) {
- // We allow specify a sys prop for the cluster manager factory which overrides ServiceLoader
- try {
- Class<?> clazz = Class.forName(clusterManagerClassName);
- mgr = (ClusterManager) clazz.newInstance();
- } catch (Exception e) {
- throw new IllegalStateException("Failed to instantiate " + clusterManagerClassName, e);
- }
- } else {
- mgr = ServiceHelper.loadFactoryOrNull(ClusterManager.class);
- if (mgr == null) {
- throw new IllegalStateException("No ClusterManagerFactory instances found on classpath");
- }
- }
- }
- return mgr;
- }
-
- private long scheduleTimeout(ContextImpl context, Handler<Long> handler, long delay, boolean periodic) {
- if (delay < 1) {
- throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms");
- }
- long timerId = timeoutCounter.getAndIncrement();
- InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, delay, context);
- timeouts.put(timerId, task);
- context.addCloseHook(task);
- return timerId;
- }
-
- public static Context context() {
- Thread current = Thread.currentThread();
- if (current instanceof VertxThread) {
- return ((VertxThread) current).getContext();
- }
- return null;
- }
-
- public ContextImpl getContext() {
- ContextImpl context = (ContextImpl) context();
- if (context != null && context.owner == this) {
- return context;
- }
- return null;
- }
-
- public ClusterManager getClusterManager() {
- return clusterManager;
- }
-
- @Override
- public void close() {
- close(null);
- }
-
- private void closeClusterManager(Handler<AsyncResult<Void>> completionHandler) {
- if (clusterManager != null) {
- clusterManager.leave(ar -> {
- if (ar.failed()) {
- log.error("Failed to leave cluster", ar.cause());
- }
- if (completionHandler != null) {
- runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
- }
- });
- } else if (completionHandler != null) {
- runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
- }
- }
-
- @Override
- public synchronized void close(Handler<AsyncResult<Void>> completionHandler) {
- if (closed || eventBus == null) {
- // Just call the handler directly since pools shutdown
- if (completionHandler != null) {
- completionHandler.handle(Future.succeededFuture());
- }
- return;
- }
- closed = true;
-
- closeHooks.run(ar -> {
- deploymentManager.undeployAll(ar1 -> {
- HAManager haManager = haManager();
- Future<Void> haFuture = Future.future();
- if (haManager != null) {
- this.executeBlocking(fut -> {
- haManager.stop();
- fut.complete();
- }, false, haFuture);
- } else {
- haFuture.complete();
- }
- haFuture.setHandler(ar2 -> {
- addressResolver.close(ar3 -> {
- eventBus.close(ar4 -> {
- closeClusterManager(ar5 -> {
- // Copy set to prevent ConcurrentModificationException
- Set<HttpServerImpl> httpServers = new HashSet<>(sharedHttpServers.values());
- Set<NetServerImpl> netServers = new HashSet<>(sharedNetServers.values());
- sharedHttpServers.clear();
- sharedNetServers.clear();
-
- int serverCount = httpServers.size() + netServers.size();
-
- AtomicInteger serverCloseCount = new AtomicInteger();
-
- Handler<AsyncResult<Void>> serverCloseHandler = res -> {
- if (res.failed()) {
- log.error("Failure in shutting down server", res.cause());
- }
- if (serverCloseCount.incrementAndGet() == serverCount) {
- deleteCacheDirAndShutdown(completionHandler);
- }
- };
-
- for (HttpServerImpl server : httpServers) {
- server.closeAll(serverCloseHandler);
- }
- for (NetServerImpl server : netServers) {
- server.closeAll(serverCloseHandler);
- }
- if (serverCount == 0) {
- deleteCacheDirAndShutdown(completionHandler);
- }
- });
- });
- });
- });
- });
- });
- }
-
- @Override
- public void deployVerticle(Verticle verticle) {
- deployVerticle(verticle, new DeploymentOptions(), null);
- }
-
- @Override
- public void deployVerticle(Verticle verticle, Handler<AsyncResult<String>> completionHandler) {
- deployVerticle(verticle, new DeploymentOptions(), completionHandler);
- }
-
- @Override
- public void deployVerticle(String name, Handler<AsyncResult<String>> completionHandler) {
- deployVerticle(name, new DeploymentOptions(), completionHandler);
- }
-
- @Override
- public void deployVerticle(Verticle verticle, DeploymentOptions options) {
- deployVerticle(verticle, options, null);
- }
-
- @Override
- public void deployVerticle(Class<? extends Verticle> verticleClass, DeploymentOptions options) {
- deployVerticle(() -> {
- try {
- return verticleClass.newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, options);
- }
-
- @Override
- public void deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options) {
- deployVerticle(verticleSupplier, options, null);
- }
-
- @Override
- public void deployVerticle(Verticle verticle, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
- if (options.getInstances() != 1) {
- throw new IllegalArgumentException("Can't specify > 1 instances for already created verticle");
- }
- deployVerticle(() -> verticle, options, completionHandler);
- }
-
- @Override
- public void deployVerticle(Class<? extends Verticle> verticleClass, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
- deployVerticle(() -> {
- try {
- return verticleClass.newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, options, completionHandler);
- }
-
- @Override
- public void deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
- boolean closed;
- synchronized (this) {
- closed = this.closed;
- }
- if (closed) {
- if (completionHandler != null) {
- completionHandler.handle(Future.failedFuture("Vert.x closed"));
- }
- } else {
- deploymentManager.deployVerticle(verticleSupplier, options, completionHandler);
- }
- }
-
- @Override
- public void deployVerticle(String name) {
- deployVerticle(name, new DeploymentOptions(), null);
- }
-
- @Override
- public void deployVerticle(String name, DeploymentOptions options) {
- deployVerticle(name, options, null);
- }
-
- @Override
- public void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
- if (options.isHa() && haManager() != null && haManager().isEnabled()) {
- haManager().deployVerticle(name, options, completionHandler);
- } else {
- deploymentManager.deployVerticle(name, options, completionHandler);
- }
- }
-
- @Override
- public String getNodeID() {
- return clusterManager.getNodeID();
- }
-
- @Override
- public void undeploy(String deploymentID) {
- undeploy(deploymentID, res -> {
- });
- }
-
- @Override
- public void undeploy(String deploymentID, Handler<AsyncResult<Void>> completionHandler) {
- HAManager haManager = haManager();
- Future<Void> haFuture = Future.future();
- if (haManager != null && haManager.isEnabled()) {
- this.executeBlocking(fut -> {
- haManager.removeFromHA(deploymentID);
- fut.complete();
- }, false, haFuture);
- } else {
- haFuture.complete();
- }
- haFuture.compose(v -> {
- Future<Void> deploymentFuture = Future.future();
- deploymentManager.undeployVerticle(deploymentID, deploymentFuture);
- return deploymentFuture;
- }).setHandler(completionHandler);
- }
-
- @Override
- public Set<String> deploymentIDs() {
- return deploymentManager.deployments();
- }
-
- @Override
- public void registerVerticleFactory(VerticleFactory factory) {
- deploymentManager.registerVerticleFactory(factory);
- }
-
- @Override
- public void unregisterVerticleFactory(VerticleFactory factory) {
- deploymentManager.unregisterVerticleFactory(factory);
- }
-
- @Override
- public Set<VerticleFactory> verticleFactories() {
- return deploymentManager.verticleFactories();
- }
-
- @Override
- public <T> void executeBlockingInternal(Handler<Future<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
- ContextImpl context = getOrCreateContext();
-
- context.executeBlockingInternal(blockingCodeHandler, resultHandler);
- }
-
- @Override
- public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
- Handler<AsyncResult<T>> asyncResultHandler) {
- ContextImpl context = getOrCreateContext();
- context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler);
- }
-
- @Override
- public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler,
- Handler<AsyncResult<T>> asyncResultHandler) {
- executeBlocking(blockingCodeHandler, true, asyncResultHandler);
- }
-
- @Override
- public boolean isClustered() {
- return clusterManager != null;
- }
-
- @Override
- public EventLoopGroup nettyEventLoopGroup() {
- return eventLoopGroup;
- }
-
- // For testing
- public void simulateKill() {
- if (haManager() != null) {
- haManager().simulateKill();
- }
- }
-
- @Override
- public Deployment getDeployment(String deploymentID) {
- return deploymentManager.getDeployment(deploymentID);
- }
-
- @Override
- public synchronized void failoverCompleteHandler(FailoverCompleteHandler failoverCompleteHandler) {
- if (haManager() != null) {
- haManager().setFailoverCompleteHandler(failoverCompleteHandler);
- }
- }
-
- @Override
- public boolean isKilled() {
- return haManager().isKilled();
- }
-
- @Override
- public void failDuringFailover(boolean fail) {
- if (haManager() != null) {
- haManager().failDuringFailover(fail);
- }
- }
-
- @Override
- public VertxMetrics metricsSPI() {
- return metrics;
- }
-
- @Override
- public File resolveFile(String fileName) {
- return fileResolver.resolveFile(fileName);
- }
-
- @Override
- public void resolveAddress(String hostname, Handler<AsyncResult<InetAddress>> resultHandler) {
- addressResolver.resolveHostname(hostname, resultHandler);
- }
-
- @Override
- public AddressResolver addressResolver() {
- return addressResolver;
- }
-
- @Override
- public AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup() {
- return addressResolver.nettyAddressResolverGroup();
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private void deleteCacheDirAndShutdown(Handler<AsyncResult<Void>> completionHandler) {
- executeBlockingInternal(fut -> {
- try {
- fileResolver.close();
- fut.complete();
- } catch (IOException e) {
- fut.tryFail(e);
- }
- }, ar -> {
-
- workerPool.close();
- internalBlockingPool.close();
- new ArrayList<>(namedWorkerPools.values()).forEach(WorkerPool::close);
-
- acceptorEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() {
- @Override
- public void operationComplete(io.netty.util.concurrent.Future future) throws Exception {
- if (!future.isSuccess()) {
- log.warn("Failure in shutting down acceptor event loop group", future.cause());
- }
- eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() {
- @Override
- public void operationComplete(io.netty.util.concurrent.Future future) throws Exception {
- if (!future.isSuccess()) {
- log.warn("Failure in shutting down event loop group", future.cause());
- }
- if (metrics != null) {
- metrics.close();
- }
-
- checker.close();
-
- if (completionHandler != null) {
- eventLoopThreadFactory.newThread(() -> {
- completionHandler.handle(Future.succeededFuture());
- }).start();
- }
- }
- });
- }
- });
- });
- }
-
- public HAManager haManager() {
- return haManager;
- }
-
- private class InternalTimerHandler implements Handler<Void>, Closeable {
- final Handler<Long> handler;
- final boolean periodic;
- final long timerID;
- final ContextImpl context;
- final java.util.concurrent.Future<?> future;
- final AtomicBoolean cancelled;
-
- boolean cancel() {
- if (cancelled.compareAndSet(false, true)) {
- if (metrics != null) {
- metrics.timerEnded(timerID, true);
- }
- future.cancel(false);
- return true;
- } else {
- return false;
- }
- }
-
- InternalTimerHandler(long timerID, Handler<Long> runnable, boolean periodic, long delay, ContextImpl context) {
- this.context = context;
- this.timerID = timerID;
- this.handler = runnable;
- this.periodic = periodic;
- this.cancelled = new AtomicBoolean();
- EventLoop el = context.nettyEventLoop();
- Runnable toRun = () -> context.runOnContext(this);
- if (periodic) {
- future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS);
- } else {
- future = el.schedule(toRun, delay, TimeUnit.MILLISECONDS);
- }
- if (metrics != null) {
- metrics.timerCreated(timerID);
- }
- }
-
- public void handle(Void v) {
- if (!cancelled.get()) {
- try {
- handler.handle(timerID);
- } finally {
- if (!periodic) {
- // Clean up after it's fired
- cleanupNonPeriodic();
- }
- }
- }
- }
-
- private void cleanupNonPeriodic() {
- VertxImpl.this.timeouts.remove(timerID);
- if (metrics != null) {
- metrics.timerEnded(timerID, false);
- }
- ContextImpl context = getContext();
- if (context != null) {
- context.removeCloseHook(this);
- }
- }
-
- // Called via Context close hook when Verticle is undeployed
- public void close(Handler<AsyncResult<Void>> completionHandler) {
- VertxImpl.this.timeouts.remove(timerID);
- cancel();
- completionHandler.handle(Future.succeededFuture());
- }
-
- }
-
- /*
- *
- * This class is optimised for performance when used on the same event loop that is was passed to the handler with.
- * However it can be used safely from other threads.
- *
- * The internal state is protected using the synchronized keyword. If always used on the same event loop, then
- * we benefit from biased locking which makes the overhead of synchronized near zero.
- *
- */
- private class TimeoutStreamImpl implements TimeoutStream, Handler<Long> {
-
- private final long delay;
- private final boolean periodic;
-
- private Long id;
- private Handler<Long> handler;
- private Handler<Void> endHandler;
- private long demand;
-
- public TimeoutStreamImpl(long delay, boolean periodic) {
- this.delay = delay;
- this.periodic = periodic;
- this.demand = Long.MAX_VALUE;
- }
-
- @Override
- public synchronized void handle(Long event) {
- try {
- if (demand > 0) {
- demand--;
- handler.handle(event);
- }
- } finally {
- if (!periodic && endHandler != null) {
- endHandler.handle(null);
- }
- }
- }
-
- @Override
- public synchronized TimeoutStream fetch(long amount) {
- demand += amount;
- if (demand < 0) {
- demand = Long.MAX_VALUE;
- }
- return this;
- }
-
- @Override
- public TimeoutStream exceptionHandler(Handler<Throwable> handler) {
- return this;
- }
-
- @Override
- public void cancel() {
- if (id != null) {
- VertxImpl.this.cancelTimer(id);
- }
- }
-
- @Override
- public synchronized TimeoutStream handler(Handler<Long> handler) {
- if (handler != null) {
- if (id != null) {
- throw new IllegalStateException();
- }
- this.handler = handler;
- id = scheduleTimeout(getOrCreateContext(), this, delay, periodic);
- } else {
- cancel();
- }
- return this;
- }
-
- @Override
- public synchronized TimeoutStream pause() {
- demand = 0;
- return this;
- }
-
- @Override
- public synchronized TimeoutStream resume() {
- demand = Long.MAX_VALUE;
- return this;
- }
-
- @Override
- public synchronized TimeoutStream endHandler(Handler<Void> endHandler) {
- this.endHandler = endHandler;
- return this;
- }
- }
-
- class SharedWorkerPool extends WorkerPool {
-
- private final String name;
- private int refCount = 1;
-
- @SuppressWarnings("rawtypes")
- SharedWorkerPool(String name, ExecutorService workerExec, PoolMetrics workerMetrics) {
- super(workerExec, workerMetrics);
- this.name = name;
- }
-
- @Override
- void close() {
- synchronized (VertxImpl.this) {
- if (refCount > 0) {
- refCount = 0;
- super.close();
- }
- }
- }
-
- void release() {
- synchronized (VertxImpl.this) {
- if (--refCount == 0) {
- namedWorkerPools.remove(name);
- super.close();
- }
- }
- }
- }
-
- @Override
- public WorkerExecutorImpl createSharedWorkerExecutor(String name) {
- return createSharedWorkerExecutor(name, defaultWorkerPoolSize);
- }
-
- @Override
- public WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize) {
- return createSharedWorkerExecutor(name, poolSize, defaultWorkerMaxExecTime);
- }
-
- @Override
- public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime) {
- return createSharedWorkerExecutor(name, poolSize, maxExecuteTime, TimeUnit.NANOSECONDS);
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime, TimeUnit maxExecuteTimeUnit) {
- if (poolSize < 1) {
- throw new IllegalArgumentException("poolSize must be > 0");
- }
- if (maxExecuteTime < 1) {
- throw new IllegalArgumentException("maxExecuteTime must be > 0");
- }
- SharedWorkerPool sharedWorkerPool = namedWorkerPools.get(name);
- if (sharedWorkerPool == null) {
- ExecutorService workerExec = Executors.newFixedThreadPool(poolSize, new VertxThreadFactory(name + "-", checker, true, maxExecuteTime, maxExecuteTimeUnit));
- PoolMetrics workerMetrics = metrics != null ? metrics.createPoolMetrics("worker", name, poolSize) : null;
- namedWorkerPools.put(name, sharedWorkerPool = new SharedWorkerPool(name, workerExec, workerMetrics));
- } else {
- sharedWorkerPool.refCount++;
- }
- ContextImpl context = getOrCreateContext();
- WorkerExecutorImpl namedExec = new WorkerExecutorImpl(context, sharedWorkerPool);
- context.addCloseHook(namedExec);
- return namedExec;
- }
-
- @Override
- public Vertx exceptionHandler(Handler<Throwable> handler) {
- exceptionHandler = handler;
- return this;
- }
-
- @Override
- public Handler<Throwable> exceptionHandler() {
- return exceptionHandler;
- }
-
- @Override
- public void addCloseHook(Closeable hook) {
- closeHooks.add(hook);
- }
-
- @Override
- public void removeCloseHook(Closeable hook) {
- closeHooks.remove(hook);
- }
-}
diff --git a/foundations/foundation-vertx/src/main/java/io/vertx/ext/web/impl/HttpServerRequestUtils.java b/foundations/foundation-vertx/src/main/java/io/vertx/ext/web/impl/HttpServerRequestUtils.java
deleted file mode 100644
index 257770e..0000000
--- a/foundations/foundation-vertx/src/main/java/io/vertx/ext/web/impl/HttpServerRequestUtils.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.vertx.ext.web.impl;
-
-import io.vertx.core.http.HttpServerRequest;
-
-public interface HttpServerRequestUtils {
- static void setPath(HttpServerRequest request, String path) {
- ((HttpServerRequestWrapper) request).setPath(path);
- }
-}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerRequestToHttpServletRequest.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerRequestToHttpServletRequest.java
index a670480..516419d 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerRequestToHttpServletRequest.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/VertxServerRequestToHttpServletRequest.java
@@ -95,10 +95,10 @@ public class VertxServerRequestToHttpServletRequest extends AbstractHttpServletR
@Override
public Cookie[] getCookies() {
if (cookies == null) {
- Set<io.vertx.ext.web.Cookie> vertxCookies = context.cookies();
+ Map<String, io.vertx.core.http.Cookie> vertxCookies = context.cookieMap();
Cookie tmpCookies[] = new Cookie[vertxCookies.size()];
int idx = 0;
- for (io.vertx.ext.web.Cookie oneVertxCookie : vertxCookies) {
+ for (io.vertx.core.http.Cookie oneVertxCookie : vertxCookies.values()) {
Cookie cookie = new Cookie(oneVertxCookie.getName(), oneVertxCookie.getValue());
tmpCookies[idx] = cookie;
idx++;
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
index 18ed274..6a0592e 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java
@@ -28,6 +28,7 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
+import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
@@ -120,7 +121,7 @@ public class InputStreamToReadStream implements ReadStream<Buffer> {
}
}
- private synchronized void readInWorker(Future<ReadResult> future) {
+ private synchronized void readInWorker(Promise<ReadResult> future) {
try {
ReadResult readResult = new ReadResult();
readResult.doRead();
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java
index 9e69759..d449ddf 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/OutputStreamToWriteStream.java
@@ -24,9 +24,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.servicecomb.foundation.common.io.AsyncCloseable;
+import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
-import io.vertx.core.Future;
import io.vertx.core.Handler;
+import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.WriteStream;
@@ -89,19 +90,24 @@ public class OutputStreamToWriteStream implements WriteStream<Buffer>, AsyncClos
@Override
public synchronized WriteStream<Buffer> write(Buffer data) {
+ return write(data, ar -> {
+ if (ar.failed()) {
+ handleException(ar.cause());
+ }
+ });
+ }
+
+ @Override
+ public WriteStream<Buffer> write(Buffer data, Handler<AsyncResult<Void>> handler) {
currentBufferCount++;
buffers.add(data);
context.executeBlocking(this::writeInWorker,
true,
- ar -> {
- if (ar.failed()) {
- handleException(ar.cause());
- }
- });
+ handler);
return this;
}
- protected void writeInWorker(Future<Object> future) {
+ protected void writeInWorker(Promise<Void> future) {
while (true) {
Buffer buffer = buffers.poll();
if (buffer == null) {
@@ -127,6 +133,11 @@ public class OutputStreamToWriteStream implements WriteStream<Buffer>, AsyncClos
@Override
public void end() {
+ end((Handler<AsyncResult<Void>>)null);
+ }
+
+ @Override
+ public void end(Handler<AsyncResult<Void>> handler) {
close();
}
diff --git a/foundations/foundation-vertx/src/test/java/io/vertx/ext/web/impl/TestHttpServerRequestUtils.java b/foundations/foundation-vertx/src/test/java/io/vertx/ext/web/impl/TestHttpServerRequestUtils.java
index 7730bb6..0b4fdbc 100644
--- a/foundations/foundation-vertx/src/test/java/io/vertx/ext/web/impl/TestHttpServerRequestUtils.java
+++ b/foundations/foundation-vertx/src/test/java/io/vertx/ext/web/impl/TestHttpServerRequestUtils.java
@@ -28,14 +28,6 @@ import mockit.Mocked;
public class TestHttpServerRequestUtils {
@Test
- public void setPath(@Mocked HttpServerRequest request) {
- HttpServerRequestWrapper wrapper = new HttpServerRequestWrapper(request);
- HttpServerRequestUtils.setPath(wrapper, "abc");
-
- Assert.assertEquals("abc", wrapper.path());
- }
-
- @Test
public void VertxServerRequestToHttpServletRequest(@Mocked RoutingContext context,
@Mocked HttpServerRequest request) {
HttpServerRequestWrapper wrapper = new HttpServerRequestWrapper(request);
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestVertxUtils.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestVertxUtils.java
index 3da853d..8683b1d 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestVertxUtils.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/TestVertxUtils.java
@@ -28,7 +28,6 @@ import javax.xml.ws.Holder;
import org.apache.commons.io.FileUtils;
import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils;
import org.apache.servicecomb.foundation.vertx.stream.BufferInputStream;
-import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
@@ -60,7 +59,8 @@ public class TestVertxUtils {
public void testCreateVertxWithFileCPResolving() {
// Prepare
ArchaiusUtils.resetConfig();
- String cacheDirBase = System.getProperty(FileResolver.CACHE_DIR_BASE_PROP_NAME, ".vertx");
+ String cacheDirBase = System.getProperty(FileResolver.CACHE_DIR_BASE_PROP_NAME,
+ System.getProperty("java.io.tmpdir", ".") + File.separator + "vertx-cache");
File file = new File(cacheDirBase);
// create .vertx folder
@@ -92,7 +92,7 @@ public class TestVertxUtils {
@Test
public void testVertxUtilsInitWithOptions() {
VertxOptions oOptions = new VertxOptions();
- oOptions.setClustered(false);
+ oOptions.getEventBusOptions().setClustered(false);
Vertx vertx = VertxUtils.init(oOptions);
Assert.assertNotEquals(null, vertx);
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/TestClientPoolManager.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/TestClientPoolManager.java
index 6709ca5..dc86d9b 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/TestClientPoolManager.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/TestClientPoolManager.java
@@ -145,11 +145,11 @@ public class TestClientPoolManager {
HttpClientWithContext notMatchPool = new HttpClientWithContext(null, null);
pools.add(notMatchPool);
- new Expectations(VertxImpl.class) {
+ new Expectations() {
{
factory.createClientPool(context);
result = new HttpClientWithContext(null, null);
- VertxImpl.context();
+ Vertx.currentContext();
result = context;
}
};
@@ -167,9 +167,9 @@ public class TestClientPoolManager {
pools.add(pool1);
pools.add(pool2);
- new Expectations(VertxImpl.class) {
+ new Expectations() {
{
- VertxImpl.context();
+ Vertx.currentContext();
result = null;
}
};
@@ -188,9 +188,9 @@ public class TestClientPoolManager {
HttpClientWithContext pool = new HttpClientWithContext(null, null);
pools.add(pool);
- new Expectations(VertxImpl.class) {
+ new Expectations() {
{
- VertxImpl.context();
+ Vertx.currentContext();
result = null;
}
};
@@ -203,9 +203,9 @@ public class TestClientPoolManager {
HttpClientWithContext pool = new HttpClientWithContext(null, null);
pools.add(pool);
- new Expectations(VertxImpl.class) {
+ new Expectations() {
{
- VertxImpl.context();
+ Vertx.currentContext();
result = otherContext;
otherContext.owner();
result = otherVertx;
@@ -220,9 +220,9 @@ public class TestClientPoolManager {
HttpClientWithContext pool = new HttpClientWithContext(null, null);
pools.add(pool);
- new Expectations(VertxImpl.class) {
+ new Expectations() {
{
- VertxImpl.context();
+ Vertx.currentContext();
result = workerContext;
workerContext.owner();
result = vertx;
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java
index 860e141..c2756e9 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnection.java
@@ -172,6 +172,7 @@ public class TestTcpClientConnection {
{
tcpClientPackage.getMsgId();
result = msgId;
+ tcpClientConnection.write((ByteBuf) any);
}
};
new MockUp<Context>(context) {
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
index 5080839..50c151d 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java
@@ -38,12 +38,13 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.FileSystemException;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpClientResponse;
-import io.vertx.core.impl.ContextInternal;
-import io.vertx.core.impl.SyncVertx;
+import io.vertx.core.impl.SyncContext;
+import io.vertx.core.impl.VertxInternal;
import io.vertx.core.streams.WriteStream;
import mockit.Expectations;
import mockit.Mock;
@@ -51,9 +52,9 @@ import mockit.MockUp;
import mockit.Mocked;
public class TestReadStreamPart {
- static SyncVertx vertx = new SyncVertx();
+ static Vertx vertx = Vertx.vertx();
- static ContextInternal context = vertx.getContext();
+ static SyncContext context = new SyncContext();
static String src = "src";
@@ -68,6 +69,7 @@ public class TestReadStreamPart {
@Before
public void setup() throws IOException {
+ context.setOwner((VertxInternal)vertx);
inputStream.reset();
}
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerRequestToHttpServletRequest.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerRequestToHttpServletRequest.java
index 0ec60b8..bcec03f 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerRequestToHttpServletRequest.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerRequestToHttpServletRequest.java
@@ -20,9 +20,8 @@ package org.apache.servicecomb.foundation.vertx.http;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashSet;
+import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Set;
import javax.servlet.AsyncContext;
import javax.servlet.ServletInputStream;
@@ -118,12 +117,12 @@ public class TestVertxServerRequestToHttpServletRequest {
@Test
public void testGetCookies() {
- Set<io.vertx.ext.web.Cookie> vertxCookies = new LinkedHashSet<>();
- vertxCookies.add(io.vertx.ext.web.Cookie.cookie("c1", "c1v"));
- vertxCookies.add(io.vertx.ext.web.Cookie.cookie("c2", "c2v"));
+ Map<String, io.vertx.core.http.Cookie> vertxCookies = new LinkedHashMap<>();
+ vertxCookies.put("c1", io.vertx.core.http.Cookie.cookie("c1", "c1v"));
+ vertxCookies.put("c2", io.vertx.core.http.Cookie.cookie("c2", "c2v"));
new Expectations() {
{
- context.cookies();
+ context.cookieMap();
result = vertxCookies;
}
};
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
index 215ea56..e82c662 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestVertxServerResponseToHttpServletResponse.java
@@ -42,14 +42,13 @@ import org.junit.rules.ExpectedException;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
-import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.impl.SyncContext;
-import io.vertx.core.impl.VertxImpl;
import io.vertx.core.streams.WriteStream;
import mockit.Deencapsulation;
import mockit.Expectations;
@@ -139,9 +138,9 @@ public class TestVertxServerResponseToHttpServletResponse {
}
}.getMockInstance();
- new Expectations(VertxImpl.class) {
+ new Expectations() {
{
- VertxImpl.context();
+ Vertx.currentContext();
result = context;
}
};
@@ -154,7 +153,7 @@ public class TestVertxServerResponseToHttpServletResponse {
}
@Mock
- <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
+ <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered,
Handler<AsyncResult<T>> resultHandler) {
SyncContext.syncExecuteBlocking(blockingCodeHandler, resultHandler);
}
@@ -170,9 +169,9 @@ public class TestVertxServerResponseToHttpServletResponse {
@Test
public void construct_invalid() throws IOException {
- new Expectations(VertxImpl.class) {
+ new Expectations() {
{
- VertxImpl.context();
+ Vertx.currentContext();
result = null;
}
};
@@ -270,9 +269,9 @@ public class TestVertxServerResponseToHttpServletResponse {
@Test
public void flushBuffer_diffContext() throws IOException {
- new Expectations(VertxImpl.class) {
+ new Expectations() {
{
- VertxImpl.context();
+ Vertx.currentContext();
result = null;
}
};
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
index 9ba8a28..befcc2e 100644
--- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFromPart.java
@@ -33,7 +33,7 @@ import org.junit.Assert;
import org.junit.Test;
import io.vertx.core.Context;
-import io.vertx.core.Future;
+import io.vertx.core.Promise;
import io.vertx.core.impl.SyncContext;
import mockit.Expectations;
import mockit.Mock;
@@ -121,7 +121,7 @@ public class TestPumpFromPart {
public void pump_read_error() throws IOException {
new MockUp<InputStreamToReadStream>() {
@Mock
- void readInWorker(Future<ReadResult> future) {
+ void readInWorker(Promise<ReadResult> future) {
future.fail(error);
}
};
diff --git a/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestVertxRestDispatcher.java b/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestVertxRestDispatcher.java
index f02ee0e..2d8185c 100644
--- a/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestVertxRestDispatcher.java
+++ b/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestVertxRestDispatcher.java
@@ -42,11 +42,13 @@ import org.junit.Before;
import org.junit.Test;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.ErrorDataDecoderException;
+import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.Cookie;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
@@ -292,12 +294,6 @@ public class TestVertxRestDispatcher {
}
}.getMockInstance();
- new Expectations(VertxImpl.class) {
- {
- VertxImpl.context();
- result = context;
- }
- };
Deencapsulation.invoke(dispatcher, "onRequest", routingContext);
Assert.assertEquals(VertxRestInvocation.class, map.get(RestConst.REST_PRODUCER_INVOCATION).getClass());
@@ -381,12 +377,22 @@ class MockHttpServerResponse implements HttpServerResponse {
}
@Override
+ public void end(Handler<AsyncResult<Void>> handler) {
+
+ }
+
+ @Override
public void end(String chunk) {
responseEnded = true;
responseChunk = chunk;
}
@Override
+ public void end(String s, Handler<AsyncResult<Void>> handler) {
+
+ }
+
+ @Override
public HttpServerResponse exceptionHandler(Handler<Throwable> handler) {
return null;
}
@@ -397,6 +403,11 @@ class MockHttpServerResponse implements HttpServerResponse {
}
@Override
+ public HttpServerResponse write(Buffer buffer, Handler<AsyncResult<Void>> handler) {
+ return null;
+ }
+
+ @Override
public HttpServerResponse setWriteQueueMaxSize(int maxSize) {
return null;
}
@@ -492,11 +503,21 @@ class MockHttpServerResponse implements HttpServerResponse {
}
@Override
+ public HttpServerResponse write(String s, String s1, Handler<AsyncResult<Void>> handler) {
+ return null;
+ }
+
+ @Override
public HttpServerResponse write(String chunk) {
return null;
}
@Override
+ public HttpServerResponse write(String s, Handler<AsyncResult<Void>> handler) {
+ return null;
+ }
+
+ @Override
public HttpServerResponse writeContinue() {
return null;
}
@@ -507,11 +528,21 @@ class MockHttpServerResponse implements HttpServerResponse {
}
@Override
+ public void end(String s, String s1, Handler<AsyncResult<Void>> handler) {
+
+ }
+
+ @Override
public void end(Buffer chunk) {
}
@Override
+ public void end(Buffer buffer, Handler<AsyncResult<Void>> handler) {
+
+ }
+
+ @Override
public HttpServerResponse sendFile(String filename, long offset, long length) {
return null;
}
@@ -589,4 +620,14 @@ class MockHttpServerResponse implements HttpServerResponse {
public HttpServerResponse writeCustomFrame(int type, int flags, Buffer payload) {
return null;
}
+
+ @Override
+ public HttpServerResponse addCookie(Cookie cookie) {
+ return null;
+ }
+
+ @Override
+ public @Nullable Cookie removeCookie(String s, boolean b) {
+ return null;
+ }
}