You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/07/30 09:03:56 UTC
[3/5] camel git commit: [CAMEL-11597] Adding support for
authentication with x-pack
[CAMEL-11597] Adding support for authentication with x-pack
- Upgrade ElasticSearch to 5.5
- Adding support for overriding the client for advance use cases
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc56d620
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc56d620
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc56d620
Branch: refs/heads/master
Commit: dc56d620de66a76cb131e1fa3172c9de46dcd312
Parents: 57a3a3d
Author: fharms <fl...@gmail.com>
Authored: Wed Jul 26 20:16:26 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Jul 30 10:47:26 2017 +0200
----------------------------------------------------------------------
components/camel-elasticsearch5/pom.xml | 19 ++++
.../elasticsearch5/ElasticsearchComponent.java | 19 +++-
.../ElasticsearchConfiguration.java | 93 ++++++++++++++++++++
.../elasticsearch5/ElasticsearchConstants.java | 3 +
.../elasticsearch5/ElasticsearchEndpoint.java | 9 +-
.../elasticsearch5/ElasticsearchProducer.java | 23 +++--
.../elasticsearch5/ElasticsearchBaseTest.java | 4 +-
.../ElasticsearchClusterBaseTest.java | 2 +-
parent/pom.xml | 2 +-
9 files changed, 161 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/dc56d620/components/camel-elasticsearch5/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5/pom.xml b/components/camel-elasticsearch5/pom.xml
index 8342eaf..e7014cd 100644
--- a/components/camel-elasticsearch5/pom.xml
+++ b/components/camel-elasticsearch5/pom.xml
@@ -38,6 +38,20 @@
<camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=elasticsearch5</camel.osgi.export.service>
</properties>
+ <repositories>
+ <!-- add the elasticsearch repo -->
+ <repository>
+ <id>elasticsearch-releases</id>
+ <url>https://artifacts.elastic.co/maven</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
@@ -49,6 +63,11 @@
<version>${elasticsearch5-version}</version>
</dependency>
<dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>x-pack-transport</artifactId>
+ <version>${elasticsearch5-version}</version>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson2-version}</version>
http://git-wip-us.apache.org/repos/asf/camel/blob/dc56d620/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java b/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java
index cb3cd6c..ed8a060 100644
--- a/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java
+++ b/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java
@@ -35,6 +35,9 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
*/
public class ElasticsearchComponent extends UriEndpointComponent {
+ @Metadata(label = "advanced")
+ private TransportClient client;
+
public ElasticsearchComponent() {
super(ElasticsearchEndpoint.class);
}
@@ -50,7 +53,7 @@ public class ElasticsearchComponent extends UriEndpointComponent {
config.setTransportAddressesList(parseTransportAddresses(config.getTransportAddresses(), config));
- Endpoint endpoint = new ElasticsearchEndpoint(uri, this, config);
+ Endpoint endpoint = new ElasticsearchEndpoint(uri, this, config, client);
return endpoint;
}
@@ -73,4 +76,16 @@ public class ElasticsearchComponent extends UriEndpointComponent {
}
return addressesTrAd;
}
-}
\ No newline at end of file
+
+ public TransportClient getClient() {
+ return client;
+ }
+
+ /**
+ * To use an existing configured Elasticsearch client, instead of creating a client per endpoint.
+ * This allow to customize the client with specific settings.
+ */
+ public void setClient(TransportClient client) {
+ this.client = client;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/dc56d620/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java b/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java
index ca09954..b24c662 100644
--- a/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java
+++ b/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java
@@ -47,6 +47,21 @@ public class ElasticsearchConfiguration {
private int port = ElasticsearchConstants.DEFAULT_PORT;
@UriParam(defaultValue = "true")
private Boolean clientTransportSniff = true;
+ @UriParam(defaultValue = "" +ElasticsearchConstants.DEFAULT_PING_SCHEDULE)
+ private String pingSchedule = ElasticsearchConstants.DEFAULT_PING_SCHEDULE;
+ @UriParam(defaultValue = "" +ElasticsearchConstants.DEFAULT_PING_TIMEOUT)
+ private String pingTimeout = ElasticsearchConstants.DEFAULT_PING_TIMEOUT;
+ @UriParam(defaultValue = "" +ElasticsearchConstants.DEFAULT_TCP_CONNECT_TIMEOUT)
+ private String tcpConnectTimeout = ElasticsearchConstants.DEFAULT_TCP_CONNECT_TIMEOUT;
+ @UriParam(defaultValue = "false")
+ private Boolean tcpCompress = false;
+ @UriParam
+ private String user;
+ @UriParam(secret = true)
+ private String password;
+ @UriParam(defaultValue = "false")
+ private Boolean enabledSSL = false;
+
/**
* Name of the cluster
@@ -155,4 +170,82 @@ public class ElasticsearchConfiguration {
public void setTransportAddressesList(List<InetSocketTransportAddress> transportAddressesList) {
this.transportAddressesList = transportAddressesList;
}
+
+ /**
+ * The time(in unit) the client ping the cluster.
+ */
+ public String getPingSchedule() {
+ return pingSchedule;
+ }
+
+ public void setPingSchedule(String pingSchedule) {
+ this.pingSchedule = pingSchedule;
+ }
+
+ /**
+ * The time( in unit) to wait for connection timeout.
+ */
+ public String getTcpConnectTimeout() {
+ return tcpConnectTimeout;
+ }
+
+ public void setTcpConnectTimeout(String tcpConnectTimeout) {
+ this.tcpConnectTimeout = tcpConnectTimeout;
+ }
+
+ /**
+ * true if compression (LZF) enable between all nodes.
+ */
+ public Boolean getTcpCompress() {
+ return tcpCompress;
+ }
+
+ public void setTcpCompress(Boolean tcpCompress) {
+ this.tcpCompress = tcpCompress;
+ }
+
+ /**
+ * User for authenticate against the cluster. Requires "transport_client" role
+ * for accessing the cluster
+ */
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ /**
+ * Password for authenticate against the cluster
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /**
+ * Enable SSL
+ */
+ public Boolean getEnabledSSL() {
+ return enabledSSL;
+ }
+
+ public void setEnabledSSL(Boolean enabledSSL) {
+ this.enabledSSL = enabledSSL;
+ }
+
+ /**
+ * The time(in unit) to wait for a ping response from a node too return.
+ */
+ public String getPingTimeout() {
+ return pingTimeout;
+ }
+
+ public void setPingTimeout(String pingTimeout) {
+ this.pingTimeout = pingTimeout;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/dc56d620/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java b/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java
index 6c31173..c77f32f 100644
--- a/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java
+++ b/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java
@@ -32,6 +32,9 @@ public interface ElasticsearchConstants {
String PORT = "port";
int DEFAULT_PORT = 9300;
int DEFAULT_FOR_WAIT_ACTIVE_SHARDS = 1; // Meaning only wait for the primary shard
+ String DEFAULT_PING_SCHEDULE = "5s"; // Meaning how often it should ping the cluster
+ String DEFAULT_PING_TIMEOUT = "5s"; // Meaning how long to wait for ping before timeout
+ String DEFAULT_TCP_CONNECT_TIMEOUT = "30s"; // Meaning how many seconds before it timeout when establish connection
String TRANSPORT_ADDRESSES_SEPARATOR_REGEX = ",";
String IP_PORT_SEPARATOR_REGEX = ":";
String ES_QUERY_DSL_PREFIX = "query";
http://git-wip-us.apache.org/repos/asf/camel/blob/dc56d620/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java b/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java
index 8a845e9..a75634b 100644
--- a/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java
+++ b/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java
@@ -24,6 +24,7 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.elasticsearch.client.transport.TransportClient;
/**
* The elasticsearch component is used for interfacing with ElasticSearch server using 5.x API.
@@ -33,12 +34,14 @@ public class ElasticsearchEndpoint extends DefaultEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchEndpoint.class);
+ private TransportClient client;
@UriParam
protected final ElasticsearchConfiguration configuration;
- public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, ElasticsearchConfiguration config) throws Exception {
+ public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, ElasticsearchConfiguration config, TransportClient client) throws Exception {
super(uri, component);
this.configuration = config;
+ this.client = client;
}
public Producer createProducer() throws Exception {
@@ -52,4 +55,8 @@ public class ElasticsearchEndpoint extends DefaultEndpoint {
public boolean isSingleton() {
return true;
}
+
+ public TransportClient getClient() {
+ return client;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/dc56d620/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java b/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java
index c0aaff9..a6b0c91 100644
--- a/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch5/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java
@@ -39,6 +39,7 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ public class ElasticsearchProducer extends DefaultProducer {
public ElasticsearchProducer(ElasticsearchEndpoint endpoint, ElasticsearchConfiguration configuration) {
super(endpoint);
this.configuration = configuration;
+ this.client = endpoint.getClient();
}
private ElasticsearchOperation resolveOperation(Exchange exchange) {
@@ -218,7 +220,7 @@ public class ElasticsearchProducer extends DefaultProducer {
LOG.info("Connecting to the ElasticSearch cluster: " + configuration.getClusterName());
if (configuration.getIp() != null) {
- client = new PreBuiltTransportClient(getSettings())
+ client = new PreBuiltXPackTransportClient(getSettings())
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(configuration.getIp()), configuration.getPort()));
} else if (configuration.getTransportAddressesList() != null
&& !configuration.getTransportAddressesList().isEmpty()) {
@@ -234,11 +236,20 @@ public class ElasticsearchProducer extends DefaultProducer {
}
private Settings getSettings() {
- return Settings.builder()
- .put("cluster.name", configuration.getClusterName())
- .put("client.transport.ignore_cluster_name", false)
- .put("client.transport.sniff", configuration.getClientTransportSniff())
- .build();
+ final Settings.Builder settings = Settings.builder()
+ .put("cluster.name", configuration.getClusterName())
+ .put("client.transport.ignore_cluster_name", false)
+ .put("client.transport.sniff", configuration.getClientTransportSniff())
+ .put("transport.ping_schedule", configuration.getPingSchedule())
+ .put("client.transport.ping_timeout", configuration.getPingTimeout())
+ .put("client.transport.sniff", configuration.getClientTransportSniff())
+ .put("request.headers.X-Found-Cluster", configuration.getClusterName());//according to the documentation this should be the same as cluster name
+
+ if (configuration.getUser() != null && configuration.getPassword() != null) {
+ settings.put("xpack.security.user", configuration.getUser() +":"+ configuration.getPassword())
+ .put("xpack.security.transport.ssl.enabled", configuration.getEnabledSSL());
+ }
+ return settings.build();
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/dc56d620/components/camel-elasticsearch5/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java b/components/camel-elasticsearch5/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java
index 99885ff..4209393 100644
--- a/components/camel-elasticsearch5/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java
+++ b/components/camel-elasticsearch5/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java
@@ -33,7 +33,7 @@ import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.Netty4Plugin;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -62,7 +62,7 @@ public class ElasticsearchBaseTest extends CamelTestSupport {
.put("path.home", "target/home")
.put("transport.profiles.default.port", ES_TRANSPORT_PORT)
.build(), Arrays.asList(Netty4Plugin.class)).start();
- client = new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), ES_TRANSPORT_PORT));
+ client = new PreBuiltXPackTransportClient(Settings.EMPTY).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), ES_TRANSPORT_PORT));
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/camel/blob/dc56d620/components/camel-elasticsearch5/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java b/components/camel-elasticsearch5/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java
index ac6d323..89245bf 100644
--- a/components/camel-elasticsearch5/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java
+++ b/components/camel-elasticsearch5/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java
@@ -64,7 +64,7 @@ public class ElasticsearchClusterBaseTest extends CamelTestSupport {
.baseTransportPort(ES_BASE_TRANSPORT_PORT)
.baseHttpPort(ES_BASE_HTTP_PORT)
.basePath("target/testcluster/")
- .useLogger());
+ .disableESLogger());
// wait for green status
runner.ensureGreen();
http://git-wip-us.apache.org/repos/asf/camel/blob/dc56d620/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index feceb87..ad0e4f9 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -197,7 +197,7 @@
<elasticsearch-bundle-version>2.4.4_1</elasticsearch-bundle-version>
<elasticsearch-guava-version>18.0</elasticsearch-guava-version>
<elasticsearch-version>2.4.4</elasticsearch-version>
- <elasticsearch5-version>5.4.3</elasticsearch5-version>
+ <elasticsearch5-version>5.5.0</elasticsearch5-version>
<elasticsearch5-bundle-version>5.4.3_1</elasticsearch5-bundle-version>
<elasticsearch-cluster-runner-version>2.4.0.0</elasticsearch-cluster-runner-version>
<elasticsearch5-cluster-runner-version>5.4.2.0</elasticsearch5-cluster-runner-version>