You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/18 11:46:59 UTC
[camel-k-runtime] 01/02: loaders: simplify source loader
interceptors
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 55c843177abf732a0156b9046cb6cdea55142e69
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Fri Sep 18 12:53:18 2020 +0200
loaders: simplify source loader interceptors
---
.../org/apache/camel/k/support/RuntimeSupport.java | 41 ---------------
.../org/apache/camel/k/support/SourcesSupport.java | 34 +++++++++++++
.../camel/k/cron/CronSourceLoaderInterceptor.java | 58 ++++++++++++----------
.../knative/KnativeSourceLoaderInterceptor.java | 43 +++++++++-------
4 files changed, 89 insertions(+), 87 deletions(-)
diff --git a/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/RuntimeSupport.java b/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/RuntimeSupport.java
index 4c9da08..72e9796 100644
--- a/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/RuntimeSupport.java
+++ b/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/RuntimeSupport.java
@@ -19,20 +19,15 @@ package org.apache.camel.k.support;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.camel.CamelContext;
import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.RoutesBuilder;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.builder.RouteBuilderLifecycleStrategy;
import org.apache.camel.k.Constants;
import org.apache.camel.k.ContextCustomizer;
import org.apache.camel.k.Source;
@@ -252,40 +247,4 @@ public final class RuntimeSupport {
return answer;
}
- public static Optional<RoutesBuilder> beforeConfigure(Optional<RoutesBuilder> builder, Consumer<RouteBuilder> consumer) {
- return builder.map(b -> {
- if (b instanceof RouteBuilder) {
- ((RouteBuilder) b).addLifecycleInterceptor(beforeConfigure(consumer));
- }
- return b;
- });
- }
-
- public static RouteBuilderLifecycleStrategy beforeConfigure(Consumer<RouteBuilder> consumer) {
- return new RouteBuilderLifecycleStrategy() {
- @Override
- public void beforeConfigure(RouteBuilder builder) {
- consumer.accept(builder);
- }
- };
- }
-
- public static Optional<RoutesBuilder> afterConfigure(Optional<RoutesBuilder> builder, Consumer<RouteBuilder> consumer) {
- return builder.map(b -> {
- if (b instanceof RouteBuilder) {
- ((RouteBuilder) b).addLifecycleInterceptor(afterConfigure(consumer));
- }
- return b;
- });
- }
-
- public static RouteBuilderLifecycleStrategy afterConfigure(Consumer<RouteBuilder> consumer) {
- return new RouteBuilderLifecycleStrategy() {
- @Override
- public void afterConfigure(RouteBuilder builder) {
- consumer.accept(builder);
- }
- };
- }
-
}
diff --git a/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/SourcesSupport.java b/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/SourcesSupport.java
index 3a87ef2..0482ec7 100644
--- a/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/SourcesSupport.java
+++ b/camel-k-runtime-core/src/main/java/org/apache/camel/k/support/SourcesSupport.java
@@ -17,7 +17,9 @@
package org.apache.camel.k.support;
import java.util.List;
+import java.util.function.Consumer;
+import org.apache.camel.RoutesBuilder;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.builder.RouteBuilderLifecycleStrategy;
@@ -161,4 +163,36 @@ public final class SourcesSupport {
}
);
}
+
+ public static RoutesBuilder beforeConfigure(RoutesBuilder builder, Consumer<RouteBuilder> consumer) {
+ if (builder instanceof RouteBuilder) {
+ ((RouteBuilder) builder).addLifecycleInterceptor(beforeConfigure(consumer));
+ }
+ return builder;
+ }
+
+ public static RouteBuilderLifecycleStrategy beforeConfigure(Consumer<RouteBuilder> consumer) {
+ return new RouteBuilderLifecycleStrategy() {
+ @Override
+ public void beforeConfigure(RouteBuilder builder) {
+ consumer.accept(builder);
+ }
+ };
+ }
+
+ public static RoutesBuilder afterConfigure(RoutesBuilder builder, Consumer<RouteBuilder> consumer) {
+ if (builder instanceof RouteBuilder) {
+ ((RouteBuilder) builder).addLifecycleInterceptor(afterConfigure(consumer));
+ }
+ return builder;
+ }
+
+ public static RouteBuilderLifecycleStrategy afterConfigure(Consumer<RouteBuilder> consumer) {
+ return new RouteBuilderLifecycleStrategy() {
+ @Override
+ public void afterConfigure(RouteBuilder builder) {
+ consumer.accept(builder);
+ }
+ };
+ }
}
diff --git a/camel-k-runtime-cron/src/main/java/org/apache/camel/k/cron/CronSourceLoaderInterceptor.java b/camel-k-runtime-cron/src/main/java/org/apache/camel/k/cron/CronSourceLoaderInterceptor.java
index 3c965ac..cdbf409 100644
--- a/camel-k-runtime-cron/src/main/java/org/apache/camel/k/cron/CronSourceLoaderInterceptor.java
+++ b/camel-k-runtime-cron/src/main/java/org/apache/camel/k/cron/CronSourceLoaderInterceptor.java
@@ -20,12 +20,13 @@ import java.util.Optional;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.k.Runtime;
import org.apache.camel.k.RuntimeAware;
import org.apache.camel.k.Source;
import org.apache.camel.k.SourceLoader;
import org.apache.camel.k.annotation.LoaderInterceptor;
-import org.apache.camel.k.support.RuntimeSupport;
+import org.apache.camel.k.support.SourcesSupport;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.Configurer;
@@ -84,33 +85,10 @@ public class CronSourceLoaderInterceptor implements SourceLoader.Interceptor, Ru
return new SourceLoader.Result() {
@Override
public Optional<RoutesBuilder> builder() {
- return RuntimeSupport.afterConfigure(result.builder(), builder -> {
- if (ObjectHelper.isEmpty(overridableComponents)) {
- return;
- }
-
- final CamelContext context = runtime.getCamelContext();
- final String[] components = overridableComponents.split(",", -1);
-
- for (RouteDefinition def : builder.getRouteCollection().getRoutes()) {
- String uri = def.getInput() != null ? def.getInput().getUri() : null;
- if (shouldBeOverridden(uri, components)) {
- def.getInput().setUri(timerUri);
-
- //
- // Don't install the shutdown strategy more than once.
- //
- if (context.getManagementStrategy().getEventNotifiers().stream().noneMatch(CronShutdownStrategy.class::isInstance)) {
- CronShutdownStrategy strategy = new CronShutdownStrategy(runtime);
- ServiceHelper.startService(strategy);
-
- context.getManagementStrategy().addEventNotifier(strategy);
- }
- }
- }
- });
+ return result.builder().map(
+ builder -> SourcesSupport.afterConfigure(builder, CronSourceLoaderInterceptor.this::afterConfigure)
+ );
}
-
@Override
public Optional<Object> configuration() {
return result.configuration();
@@ -118,6 +96,32 @@ public class CronSourceLoaderInterceptor implements SourceLoader.Interceptor, Ru
};
}
+ private void afterConfigure(RouteBuilder builder) {
+ if (ObjectHelper.isEmpty(overridableComponents)) {
+ return;
+ }
+
+ final CamelContext context = runtime.getCamelContext();
+ final String[] components = overridableComponents.split(",", -1);
+
+ for (RouteDefinition def : builder.getRouteCollection().getRoutes()) {
+ String uri = def.getInput() != null ? def.getInput().getUri() : null;
+ if (shouldBeOverridden(uri, components)) {
+ def.getInput().setUri(timerUri);
+
+ //
+ // Don't install the shutdown strategy more than once.
+ //
+ if (context.getManagementStrategy().getEventNotifiers().stream().noneMatch(CronShutdownStrategy.class::isInstance)) {
+ CronShutdownStrategy strategy = new CronShutdownStrategy(runtime);
+ ServiceHelper.startService(strategy);
+
+ context.getManagementStrategy().addEventNotifier(strategy);
+ }
+ }
+ }
+ }
+
private static boolean shouldBeOverridden(String uri, String... components) {
if (uri == null) {
return false;
diff --git a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java
index 160399c..979fae7 100644
--- a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java
+++ b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java
@@ -21,10 +21,11 @@ import java.util.Optional;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.k.Source;
import org.apache.camel.k.SourceLoader;
import org.apache.camel.k.annotation.LoaderInterceptor;
-import org.apache.camel.k.support.RuntimeSupport;
+import org.apache.camel.k.support.SourcesSupport;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.ToDefinition;
import org.slf4j.Logger;
@@ -44,24 +45,9 @@ public class KnativeSourceLoaderInterceptor implements SourceLoader.Interceptor
return new SourceLoader.Result() {
@Override
public Optional<RoutesBuilder> builder() {
- return RuntimeSupport.afterConfigure(result.builder(), builder -> {
- final CamelContext camelContext = builder.getContext();
- final List<RouteDefinition> definitions = builder.getRouteCollection().getRoutes();
-
- if (definitions.size() == 1) {
- final String sinkName = camelContext.resolvePropertyPlaceholders("{{knative.sink:sink}}");
- final String sinkUri = String.format("knative://endpoint/%s", sinkName);
- final RouteDefinition definition = definitions.get(0);
-
- LOGGER.info("Add sink:{} to route:{}", sinkUri, definition.getId());
-
- // assuming that route is linear like there's no content based routing
- // or ant other EIP that would branch the flow
- definition.getOutputs().add(new ToDefinition(sinkUri));
- } else {
- LOGGER.warn("Cannot determine route to enrich. the knative enpoint need to explicitly be defined");
- }
- });
+ return result.builder().map(
+ bulider -> SourcesSupport.afterConfigure(bulider, KnativeSourceLoaderInterceptor::afterConfigure)
+ );
}
@Override
@@ -71,4 +57,23 @@ public class KnativeSourceLoaderInterceptor implements SourceLoader.Interceptor
};
}
+ private static void afterConfigure(RouteBuilder builder) {
+ final CamelContext camelContext = builder.getContext();
+ final List<RouteDefinition> definitions = builder.getRouteCollection().getRoutes();
+
+ if (definitions.size() == 1) {
+ final String sinkName = camelContext.resolvePropertyPlaceholders("{{knative.sink:sink}}");
+ final String sinkUri = String.format("knative://endpoint/%s", sinkName);
+ final RouteDefinition definition = definitions.get(0);
+
+ LOGGER.info("Add sink:{} to route:{}", sinkUri, definition.getId());
+
+ // assuming that route is linear like there's no content based routing
+ // or ant other EIP that would branch the flow
+ definition.getOutputs().add(new ToDefinition(sinkUri));
+ } else {
+ LOGGER.warn("Cannot determine route to enrich. the knative enpoint need to explicitly be defined");
+ }
+ }
+
}