You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/18 13:21:38 UTC
[3/3] camel git commit: CAMEL-9683: Kubernetes can lookup service
using client, env or dns.
CAMEL-9683: Kubernetes can lookup service using client,env or dns.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b3e148a8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b3e148a8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b3e148a8
Branch: refs/heads/kube-lb
Commit: b3e148a8fe9941c043f0ff8c4a3d99a55701cc4c
Parents: c746ad7
Author: Claus Ibsen <da...@apache.org>
Authored: Wed May 18 15:15:19 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed May 18 15:15:19 2016 +0200
----------------------------------------------------------------------
.../support/ServiceCallExpressionSupport.java | 2 +-
.../KubernetesDnsServiceCallExpression.java | 81 ++++++++++
.../KubernetesDnsServiceCallProcessor.java | 152 +++++++++++++++++++
.../processor/KubernetesProcessorFactory.java | 12 +-
4 files changed, 242 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b3e148a8/camel-core/src/main/java/org/apache/camel/support/ServiceCallExpressionSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/support/ServiceCallExpressionSupport.java b/camel-core/src/main/java/org/apache/camel/support/ServiceCallExpressionSupport.java
index 73e0e72..9901038 100644
--- a/camel-core/src/main/java/org/apache/camel/support/ServiceCallExpressionSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/ServiceCallExpressionSupport.java
@@ -94,7 +94,7 @@ public abstract class ServiceCallExpressionSupport extends ExpressionAdapter {
}
}
- LOG.debug("Camel endpoint uri: {} for calling service: {} + on server {}:{}", answer, name, ip, port);
+ LOG.debug("Camel endpoint uri: {} for calling service: {} on server {}:{}", answer, name, ip, port);
return answer;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/b3e148a8/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java
new file mode 100644
index 0000000..d5e3751
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallExpression.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.support.ExpressionAdapter;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesDnsServiceCallExpression extends ExpressionAdapter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesDnsServiceCallExpression.class);
+
+ private final String name;
+ private final String namespace;
+ private final String scheme;
+ private final String contextPath;
+ private final String uri;
+ private final String dnsDomain;
+
+ public KubernetesDnsServiceCallExpression(String name, String namespace, String scheme, String contextPath, String uri, String dnsDomain) {
+ this.name = name;
+ this.namespace = namespace;
+ this.scheme = scheme;
+ this.contextPath = contextPath;
+ this.uri = uri;
+ this.dnsDomain = dnsDomain;
+ }
+
+ @Override
+ public Object evaluate(Exchange exchange) {
+ try {
+ return buildCamelEndpointUri(name, namespace, uri, contextPath, scheme, dnsDomain);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ protected static String buildCamelEndpointUri(String name, String namespace, String uri, String contextPath, String scheme, String dnsDomain) {
+ // build basic uri if none provided
+ String answer = uri;
+ if (answer == null) {
+ if (scheme == null) {
+ // use http by default if no scheme has been configured
+ scheme = "http";
+ }
+ answer = scheme + "://" + asKubernetesDnsServicePart(name, namespace, dnsDomain);
+ if (contextPath != null) {
+ answer += "/" + contextPath;
+ }
+ } else {
+ // we have existing uri, then replace the serviceName with name.namespace.svc.dnsDomain
+ if (answer.contains(name)) {
+ answer = answer.replaceFirst(name, asKubernetesDnsServicePart(name, namespace, dnsDomain));
+ }
+ }
+
+ LOG.debug("Camel endpoint uri: {} for calling service: {}", answer, name);
+ return answer;
+ }
+
+ protected static String asKubernetesDnsServicePart(String name, String namespace, String dnsDomain) {
+ return name + "." + namespace + ".svc." + dnsDomain;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b3e148a8/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java
new file mode 100644
index 0000000..46a4c7f
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesDnsServiceCallProcessor.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.processor;
+
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Traceable;
+import org.apache.camel.component.kubernetes.KubernetesConstants;
+import org.apache.camel.processor.SendDynamicProcessor;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kubernetes based implementation of the the ServiceCall EIP where the service lookup is environment variable based.
+ */
+public class KubernetesDnsServiceCallProcessor extends ServiceSupport implements AsyncProcessor, CamelContextAware, Traceable, IdAware {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesClientServiceCallProcessor.class);
+
+ private CamelContext camelContext;
+ private String id;
+ private final String name;
+ private final String scheme;
+ private final String contextPath;
+ private final String namespace;
+ private final String uri;
+ private final String dnsDomain;
+ private final ExchangePattern exchangePattern;
+ private final KubernetesServiceCallExpression serviceCallExpression;
+ private SendDynamicProcessor processor;
+ private String ip;
+ private long port;
+
+ public KubernetesDnsServiceCallProcessor(String name, String namespace, String uri, ExchangePattern exchangePattern, String dnsDomain) {
+ // setup from the provided name which can contain scheme and context-path information as well
+ String serviceName;
+ if (name.contains("/")) {
+ serviceName = ObjectHelper.before(name, "/");
+ this.contextPath = ObjectHelper.after(name, "/");
+ } else if (name.contains("?")) {
+ serviceName = ObjectHelper.before(name, "?");
+ this.contextPath = ObjectHelper.after(name, "?");
+ } else {
+ serviceName = name;
+ this.contextPath = null;
+ }
+ if (serviceName.contains(":")) {
+ this.scheme = ObjectHelper.before(serviceName, ":");
+ this.name = ObjectHelper.after(serviceName, ":");
+ } else {
+ this.scheme = null;
+ this.name = serviceName;
+ }
+
+ this.namespace = namespace;
+ this.uri = uri;
+ this.exchangePattern = exchangePattern;
+ this.dnsDomain = dnsDomain;
+ this.serviceCallExpression = new KubernetesServiceCallExpression(this.name, this.scheme, this.contextPath, this.uri);
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ // use a
+
+ LOG.debug("Service {} active at server: {}:{}", name, ip, port);
+
+ // set selected server as header
+ exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_IP, ip);
+ exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_SERVER_PORT, port);
+
+ // use the dynamic send processor to call the service
+ return processor.process(exchange, callback);
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getTraceLabel() {
+ return "kubernetes";
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notEmpty(name, "name", this);
+ ObjectHelper.notEmpty(namespace, "namespace", this);
+ ObjectHelper.notEmpty(dnsDomain, "dnsDomain", this);
+
+ LOG.info("KubernetesServiceCall at namespace: {} with service name: {} using DNS domain {}", namespace, name, dnsDomain);
+
+ processor = new SendDynamicProcessor(uri, serviceCallExpression);
+ processor.setCamelContext(getCamelContext());
+ if (exchangePattern != null) {
+ processor.setPattern(exchangePattern);
+ }
+ ServiceHelper.startServices(processor);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(processor);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b3e148a8/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
index 4be66c6..6fb695b 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/processor/KubernetesProcessorFactory.java
@@ -126,11 +126,15 @@ public class KubernetesProcessorFactory implements ProcessorFactory {
processor.setLoadBalancer(lb);
processor.setServerListStrategy(sl);
return processor;
- } else if ("environment".equals(lookup)) {
- return new KubernetesEnvironmentServiceCallProcessor(name, namespace, uri, mep);
+ } else if ("dns".equals(lookup)) {
+ String dnsDomain = config != null ? config.getDnsDomain() : null;
+ if (dnsDomain == null && configRef != null) {
+ dnsDomain = configRef.getDnsDomain();
+ }
+ return new KubernetesDnsServiceCallProcessor(name, namespace, uri, mep, dnsDomain);
} else {
-// return new KubernetesDnsServiceCallProcessor(name, namespace, uri, mep);
- return null;
+ // environment is default
+ return new KubernetesEnvironmentServiceCallProcessor(name, namespace, uri, mep);
}
} else {
return null;