You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/03/24 21:20:58 UTC
[camel] branch main updated: (chores) More of replacing inner classes with lambda (#9635)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2b8b7ca353c (chores) More of replacing inner classes with lambda (#9635)
2b8b7ca353c is described below
commit 2b8b7ca353cf2e395ad345257fdd1c20c118266f
Author: Gilvan Filho <gi...@gmail.com>
AuthorDate: Fri Mar 24 18:20:51 2023 -0300
(chores) More of replacing inner classes with lambda (#9635)
* camel-platform-http-vertx: replace inner class with lambda
* camel-pubnub: replace inner class with lambda
* camel-splunk: replace inner class with lambda
* camel-timer: replace inner class with lambda
* camel-undertow: replace inner class with lambda
* camel-vertx: replace inner class with lambda
---
.../http/vertx/VertxPlatformHttpServerSupport.java | 84 ++++++++++------------
.../camel/component/pubnub/PubNubProducer.java | 67 ++++++-----------
.../camel/component/splunk/SplunkConsumer.java | 19 ++---
.../camel/component/timer/TimerConsumer.java | 14 ++--
.../undertow/handlers/CamelWebSocketHandler.java | 10 +--
.../camel/component/vertx/VertxComponent.java | 22 +++---
.../camel/component/vertx/VertxConsumer.java | 6 +-
7 files changed, 88 insertions(+), 134 deletions(-)
diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServerSupport.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServerSupport.java
index 5f8fb5b51bc..1964ef021af 100644
--- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServerSupport.java
+++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpServerSupport.java
@@ -60,12 +60,9 @@ public final class VertxPlatformHttpServerSupport {
bodyHandler.setMergeFormAttributes(configuration.getBodyHandler().isMergeFormAttributes());
bodyHandler.setPreallocateBodyBuffer(configuration.getBodyHandler().isPreallocateBodyBuffer());
- return new Handler<RoutingContext>() {
- @Override
- public void handle(RoutingContext event) {
- event.request().resume();
- bodyHandler.handle(event);
- }
+ return (RoutingContext event) -> {
+ event.request().resume();
+ bodyHandler.handle(event);
};
}
@@ -78,51 +75,48 @@ public final class VertxPlatformHttpServerSupport {
static Handler<RoutingContext> createCorsHandler(VertxPlatformHttpServerConfiguration configuration) {
final VertxPlatformHttpServerConfiguration.Cors corsConfig = configuration.getCors();
- return new Handler<RoutingContext>() {
- @Override
- public void handle(RoutingContext event) {
- final HttpServerRequest request = event.request();
- final HttpServerResponse response = event.response();
- final String origin = request.getHeader(HttpHeaders.ORIGIN);
-
- if (origin == null) {
- event.next();
- } else {
- final String requestedMethods = request.getHeader(HttpHeaders.ACCESS_CONTROL_REQUEST_METHOD);
- if (requestedMethods != null) {
- processHeaders(response, HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, requestedMethods,
- corsConfig.getMethods());
- }
+ return (RoutingContext event) -> {
+ final HttpServerRequest request = event.request();
+ final HttpServerResponse response = event.response();
+ final String origin = request.getHeader(HttpHeaders.ORIGIN);
+
+ if (origin == null) {
+ event.next();
+ } else {
+ final String requestedMethods = request.getHeader(HttpHeaders.ACCESS_CONTROL_REQUEST_METHOD);
+ if (requestedMethods != null) {
+ processHeaders(response, HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, requestedMethods,
+ corsConfig.getMethods());
+ }
- final String requestedHeaders = request.getHeader(HttpHeaders.ACCESS_CONTROL_REQUEST_HEADERS);
- if (requestedHeaders != null) {
- processHeaders(response, HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, requestedHeaders,
- corsConfig.getHeaders());
- }
+ final String requestedHeaders = request.getHeader(HttpHeaders.ACCESS_CONTROL_REQUEST_HEADERS);
+ if (requestedHeaders != null) {
+ processHeaders(response, HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, requestedHeaders,
+ corsConfig.getHeaders());
+ }
- final boolean allowsOrigin
- = ObjectHelper.isEmpty(corsConfig.getOrigins()) || corsConfig.getOrigins().contains(origin);
- if (allowsOrigin) {
- response.headers().set(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, origin);
- }
+ final boolean allowsOrigin
+ = ObjectHelper.isEmpty(corsConfig.getOrigins()) || corsConfig.getOrigins().contains(origin);
+ if (allowsOrigin) {
+ response.headers().set(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, origin);
+ }
- response.headers().set(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
+ response.headers().set(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
- if (ObjectHelper.isNotEmpty(corsConfig.getExposedHeaders())) {
- response.headers().set(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS,
- String.join(",", corsConfig.getExposedHeaders()));
- }
+ if (ObjectHelper.isNotEmpty(corsConfig.getExposedHeaders())) {
+ response.headers().set(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS,
+ String.join(",", corsConfig.getExposedHeaders()));
+ }
- if (request.method().equals(HttpMethod.OPTIONS)) {
- if ((requestedHeaders != null || requestedMethods != null)
- && corsConfig.getAccessControlMaxAge() != null) {
- response.putHeader(HttpHeaders.ACCESS_CONTROL_MAX_AGE,
- String.valueOf(corsConfig.getAccessControlMaxAge().getSeconds()));
- }
- response.end();
- } else {
- event.next();
+ if (request.method().equals(HttpMethod.OPTIONS)) {
+ if ((requestedHeaders != null || requestedMethods != null)
+ && corsConfig.getAccessControlMaxAge() != null) {
+ response.putHeader(HttpHeaders.ACCESS_CONTROL_MAX_AGE,
+ String.valueOf(corsConfig.getAccessControlMaxAge().getSeconds()));
}
+ response.end();
+ } else {
+ event.next();
}
}
};
diff --git a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubProducer.java b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubProducer.java
index 3d6107d4421..853161ea257 100644
--- a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubProducer.java
+++ b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubProducer.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.pubnub;
import java.util.Arrays;
import com.pubnub.api.PubNubException;
-import com.pubnub.api.callbacks.PNCallback;
import com.pubnub.api.models.consumer.PNErrorData;
import com.pubnub.api.models.consumer.PNPublishResult;
import com.pubnub.api.models.consumer.PNStatus;
@@ -112,14 +111,11 @@ public class PubNubProducer extends DefaultAsyncProducer {
.message(body)
.channel(getChannel(exchange))
.usePOST(true)
- .async(new PNCallback<PNPublishResult>() {
- @Override
- public void onResponse(PNPublishResult result, PNStatus status) {
- if (!status.isError()) {
- exchange.getIn().setHeader(PubNubConstants.TIMETOKEN, result.getTimetoken());
- }
- processMessage(exchange, callback, status, null);
+ .async((PNPublishResult result, PNStatus status) -> {
+ if (!status.isError()) {
+ exchange.getIn().setHeader(PubNubConstants.TIMETOKEN, result.getTimetoken());
}
+ processMessage(exchange, callback, status, null);
});
}
@@ -134,14 +130,11 @@ public class PubNubProducer extends DefaultAsyncProducer {
.fire()
.message(body)
.channel(getChannel(exchange))
- .async(new PNCallback<PNPublishResult>() {
- @Override
- public void onResponse(PNPublishResult result, PNStatus status) {
- if (!status.isError()) {
- exchange.getIn().setHeader(PubNubConstants.TIMETOKEN, result.getTimetoken());
- }
- processMessage(exchange, callback, status, null);
+ .async((PNPublishResult result, PNStatus status) -> {
+ if (!status.isError()) {
+ exchange.getIn().setHeader(PubNubConstants.TIMETOKEN, result.getTimetoken());
}
+ processMessage(exchange, callback, status, null);
});
}
@@ -149,12 +142,9 @@ public class PubNubProducer extends DefaultAsyncProducer {
endpoint.getPubnub()
.history()
.channel(getChannel(exchange))
- .async(new PNCallback<PNHistoryResult>() {
- @Override
- public void onResponse(PNHistoryResult result, PNStatus status) {
- LOG.debug("Got history message [{}]", result);
- processMessage(exchange, callback, status, result.getMessages());
- }
+ .async((PNHistoryResult result, PNStatus status) -> {
+ LOG.debug("Got history message [{}]", result);
+ processMessage(exchange, callback, status, result.getMessages());
});
}
@@ -170,11 +160,9 @@ public class PubNubProducer extends DefaultAsyncProducer {
.channels(Arrays.asList(getChannel(exchange)))
.state(body)
.uuid(getUUID(exchange))
- .async(new PNCallback<PNSetStateResult>() {
- public void onResponse(PNSetStateResult result, PNStatus status) {
- LOG.debug("Got setState responsee [{}]", result);
- processMessage(exchange, callback, status, result);
- }
+ .async((PNSetStateResult result, PNStatus status) -> {
+ LOG.debug("Got setState responsee [{}]", result);
+ processMessage(exchange, callback, status, result);
});
}
@@ -183,12 +171,9 @@ public class PubNubProducer extends DefaultAsyncProducer {
.getPresenceState()
.channels(Arrays.asList(getChannel(exchange)))
.uuid(getUUID(exchange))
- .async(new PNCallback<PNGetStateResult>() {
- @Override
- public void onResponse(PNGetStateResult result, PNStatus status) {
- LOG.debug("Got state [{}]", result.getStateByUUID());
- processMessage(exchange, callback, status, result);
- }
+ .async((PNGetStateResult result, PNStatus status) -> {
+ LOG.debug("Got state [{}]", result.getStateByUUID());
+ processMessage(exchange, callback, status, result);
});
}
@@ -198,12 +183,9 @@ public class PubNubProducer extends DefaultAsyncProducer {
.channels(Arrays.asList(getChannel(exchange)))
.includeState(true)
.includeUUIDs(true)
- .async(new PNCallback<PNHereNowResult>() {
- @Override
- public void onResponse(PNHereNowResult result, PNStatus status) {
- LOG.debug("Got herNow message [{}]", result);
- processMessage(exchange, callback, status, result);
- }
+ .async((PNHereNowResult result, PNStatus status) -> {
+ LOG.debug("Got herNow message [{}]", result);
+ processMessage(exchange, callback, status, result);
});
}
@@ -211,12 +193,9 @@ public class PubNubProducer extends DefaultAsyncProducer {
endpoint.getPubnub()
.whereNow()
.uuid(getUUID(exchange))
- .async(new PNCallback<PNWhereNowResult>() {
- @Override
- public void onResponse(PNWhereNowResult result, PNStatus status) {
- LOG.debug("Got whereNow message [{}]", result.getChannels());
- processMessage(exchange, callback, status, result.getChannels());
- }
+ .async((PNWhereNowResult result, PNStatus status) -> {
+ LOG.debug("Got whereNow message [{}]", result.getChannels());
+ processMessage(exchange, callback, status, result.getChannels());
});
}
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
index 0077aabde24..492b82fb469 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
@@ -27,7 +27,6 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.splunk.event.SplunkEvent;
import org.apache.camel.component.splunk.support.SplunkDataReader;
-import org.apache.camel.component.splunk.support.SplunkResultProcessor;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
@@ -63,18 +62,14 @@ public class SplunkConsumer extends ScheduledBatchPollingConsumer {
protected int poll() throws Exception {
try {
if (endpoint.getConfiguration().isStreaming()) {
- dataReader.read(new SplunkResultProcessor() {
+ dataReader.read(splunkEvent -> {
+ final Exchange exchange = createExchange(true);
+ Message message = exchange.getIn();
+ message.setBody(splunkEvent);
- @Override
- public void process(SplunkEvent splunkEvent) {
- final Exchange exchange = createExchange(true);
- Message message = exchange.getIn();
- message.setBody(splunkEvent);
-
- // use default consumer callback
- AsyncCallback cb = defaultConsumerCallback(exchange, true);
- getAsyncProcessor().process(exchange, cb);
- }
+ // use default consumer callback
+ AsyncCallback cb = defaultConsumerCallback(exchange, true);
+ getAsyncProcessor().process(exchange, cb);
});
// Return 0: no exchanges returned by poll, as exchanges have been returned asynchronously
return 0;
diff --git a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index 4576fd23896..d14e68a7bbe 100644
--- a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -110,14 +110,12 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener, S
executorService = endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
endpoint.getEndpointUri());
- executorService.execute(new Runnable() {
- public void run() {
- final AtomicLong counter = new AtomicLong();
- long count = counter.incrementAndGet();
- while ((endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount()) && isRunAllowed()) {
- sendTimerExchange(count);
- count = counter.incrementAndGet();
- }
+ executorService.execute(() -> {
+ final AtomicLong counter = new AtomicLong();
+ long count = counter.incrementAndGet();
+ while ((endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount()) && isRunAllowed()) {
+ sendTimerExchange(count);
+ count = counter.incrementAndGet();
}
});
}
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
index b50e2fe3867..a47a229d032 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/handlers/CamelWebSocketHandler.java
@@ -80,13 +80,9 @@ public class CamelWebSocketHandler implements HttpHandler {
public CamelWebSocketHandler() {
this.receiveListener = new UndertowReceiveListener();
this.callback = new UndertowWebSocketConnectionCallback();
- this.closeListener = new ChannelListener<WebSocketChannel>() {
- @Override
- public void handleEvent(WebSocketChannel channel) {
- sendEventNotificationIfNeeded((String) channel.getAttribute(UndertowConstants.CONNECTION_KEY),
- null, channel, EventType.ONCLOSE);
- }
- };
+ this.closeListener = (WebSocketChannel channel) -> sendEventNotificationIfNeeded(
+ (String) channel.getAttribute(UndertowConstants.CONNECTION_KEY), null, channel, EventType.ONCLOSE);
+
this.delegate = Handlers.websocket(callback);
}
diff --git a/components/camel-vertx/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java b/components/camel-vertx/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
index 5f61eb24376..259e08e6fdb 100644
--- a/components/camel-vertx/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
+++ b/components/camel-vertx/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
@@ -21,7 +21,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import io.vertx.core.AsyncResult;
-import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.VertxBuilder;
@@ -170,19 +169,16 @@ public class VertxComponent extends DefaultComponent {
LOG.info("Creating Clustered Vertx {}:{}", vertxOptions.getEventBusOptions().getHost(),
vertxOptions.getEventBusOptions().getPort());
// use the async api as we want to wait for the eventbus to be ready before we are in started state
- vertxFactory.clusteredVertx(new Handler<AsyncResult<Vertx>>() {
- @Override
- public void handle(AsyncResult<Vertx> event) {
- if (event.cause() != null) {
- LOG.warn("Error creating Clustered Vertx {}:{} due {}", host, port,
- event.cause().getMessage(), event.cause());
- } else if (event.succeeded()) {
- vertx = event.result();
- LOG.info("EventBus is ready: {}", vertx);
- }
-
- latch.countDown();
+ vertxFactory.clusteredVertx((AsyncResult<Vertx> event) -> {
+ if (event.cause() != null) {
+ LOG.warn("Error creating Clustered Vertx {}:{} due {}", host, port,
+ event.cause().getMessage(), event.cause());
+ } else if (event.succeeded()) {
+ vertx = event.result();
+ LOG.info("EventBus is ready: {}", vertx);
}
+
+ latch.countDown();
});
} else {
LOG.info("Creating Non-Clustered Vertx");
diff --git a/components/camel-vertx/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java b/components/camel-vertx/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
index 2dcddc22ef0..55c5a61f528 100644
--- a/components/camel-vertx/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
+++ b/components/camel-vertx/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
@@ -34,11 +34,7 @@ public class VertxConsumer extends DefaultConsumer {
private final VertxEndpoint endpoint;
private transient MessageConsumer messageConsumer;
- private Handler<Message<Object>> handler = new Handler<Message<Object>>() {
- public void handle(Message event) {
- onEventBusEvent(event);
- }
- };
+ private Handler<Message<Object>> handler = VertxConsumer.this::onEventBusEvent;
public VertxConsumer(VertxEndpoint endpoint, Processor processor) {
super(endpoint, processor);