You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/01/11 10:35:39 UTC
camel git commit: CAMEL-10690: Camel-InfluxDB: Support Querying
Repository: camel
Updated Branches:
refs/heads/master 14095d0e7 -> c92845358
CAMEL-10690: Camel-InfluxDB: Support Querying
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c9284535
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c9284535
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c9284535
Branch: refs/heads/master
Commit: c92845358d9f1a4e81416d3c3b560a1a474186af
Parents: 14095d0
Author: Andrea Cosentino <an...@gmail.com>
Authored: Wed Jan 11 11:33:57 2017 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Wed Jan 11 11:33:57 2017 +0100
----------------------------------------------------------------------
.../src/main/docs/influxdb-component.adoc | 4 +-
.../component/influxdb/InfluxDbConstants.java | 1 +
.../component/influxdb/InfluxDbEndpoint.java | 26 ++++++
.../component/influxdb/InfluxDbOperations.java | 22 +++++
.../component/influxdb/InfluxDbProducer.java | 39 +++++++++
.../influxdb/InfluxDbProducerQueryTest.java | 87 ++++++++++++++++++++
.../influxdb/MockedInfluxDbConfiguration.java | 1 +
.../src/test/resources/log4j2.properties | 2 +-
8 files changed, 180 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/docs/influxdb-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/main/docs/influxdb-component.adoc b/components/camel-influxdb/src/main/docs/influxdb-component.adoc
index ff0a5f3..d171c5f 100644
--- a/components/camel-influxdb/src/main/docs/influxdb-component.adoc
+++ b/components/camel-influxdb/src/main/docs/influxdb-component.adoc
@@ -52,7 +52,7 @@ The InfluxDB component has no options.
// endpoint options: START
-The InfluxDB component supports 5 endpoint options which are listed below:
+The InfluxDB component supports 7 endpoint options which are listed below:
{% raw %}
[width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -61,6 +61,8 @@ The InfluxDB component supports 5 endpoint options which are listed below:
| connectionBean | producer | | String | *Required* Connection to the influx database of class InfluxDB.class
| batch | producer | false | boolean | Define if this operation is a batch operation or not
| databaseName | producer | | String | The name of the database where the time series will be stored
+| operation | producer | insert | String | Define if this operation is an insert or a query
+| query | producer | | String | Define the query in case of operation query
| retentionPolicy | producer | default | String | The string that defines the retention policy to the data created by the endpoint
| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
|=======================================================================
http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java
index 06a7124..867d4f6 100644
--- a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java
+++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java
@@ -21,6 +21,7 @@ public final class InfluxDbConstants {
public static final String MEASUREMENT_NAME = "camelInfluxDB.MeasurementName";
public static final String RETENTION_POLICY_HEADER = "camelInfluxDB.RetentionPolicy";
public static final String DBNAME_HEADER = "camelInfluxDB.databaseName";
+ public static final String INFLUXDB_QUERY = "camelInfluxDB.query";
private InfluxDbConstants() {
http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
index efd6d09..1dac976 100644
--- a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
+++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
@@ -48,6 +48,10 @@ public class InfluxDbEndpoint extends DefaultEndpoint {
private String retentionPolicy = "default";
@UriParam(defaultValue = "false")
private boolean batch;
+ @UriParam(defaultValue = InfluxDbOperations.INSERT)
+ private String operation = InfluxDbOperations.INSERT;
+ @UriParam
+ private String query;
public InfluxDbEndpoint(String uri, InfluxDbComponent component) {
super(uri, component);
@@ -134,4 +138,26 @@ public class InfluxDbEndpoint extends DefaultEndpoint {
public void setBatch(boolean batch) {
this.batch = batch;
}
+
+ public String getOperation() {
+ return operation;
+ }
+
+ /**
+ * Define if this operation is an insert or a query
+ */
+ public void setOperation(String operation) {
+ this.operation = operation;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ /**
+ * Define the query in case of operation query
+ */
+ public void setQuery(String query) {
+ this.query = query;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java
new file mode 100644
index 0000000..3a6f114
--- /dev/null
+++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.influxdb;
+
+public interface InfluxDbOperations {
+ String INSERT = "insert";
+ String QUERY = "query";
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
index b071c13..569784f 100644
--- a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
+++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
@@ -17,10 +17,14 @@
package org.apache.camel.component.influxdb;
import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +64,19 @@ public class InfluxDbProducer extends DefaultProducer {
String dataBaseName = calculateDatabaseName(exchange);
String retentionPolicy = calculateRetentionPolicy(exchange);
+ switch (endpoint.getOperation()) {
+ case InfluxDbOperations.INSERT:
+ doInsert(exchange, dataBaseName, retentionPolicy);
+ break;
+ case InfluxDbOperations.QUERY:
+ doQuery(exchange, dataBaseName, retentionPolicy);
+ break;
+ default:
+ throw new IllegalArgumentException("The operation " + endpoint.getOperation() + " is not supported");
+ }
+ }
+
+ private void doInsert(Exchange exchange, String dataBaseName, String retentionPolicy) throws InvalidPayloadException {
if (!endpoint.isBatch()) {
Point p = exchange.getIn().getMandatoryBody(Point.class);
@@ -83,6 +100,13 @@ public class InfluxDbProducer extends DefaultProducer {
}
}
+ private void doQuery(Exchange exchange, String dataBaseName, String retentionPolicy) {
+ String query = calculateQuery(exchange);
+ Query influxdbQuery = new Query(query, dataBaseName);
+ QueryResult resultSet = connection.query(influxdbQuery);
+ exchange.getOut().setBody(resultSet);
+ }
+
private String calculateRetentionPolicy(Exchange exchange) {
String retentionPolicy = exchange.getIn().getHeader(InfluxDbConstants.RETENTION_POLICY_HEADER, String.class);
@@ -102,5 +126,20 @@ public class InfluxDbProducer extends DefaultProducer {
return endpoint.getDatabaseName();
}
+
+ private String calculateQuery(Exchange exchange) {
+ String query = exchange.getIn().getHeader(InfluxDbConstants.INFLUXDB_QUERY, String.class);
+
+ if (query != null) {
+ return query;
+ } else {
+ query = endpoint.getQuery();
+ }
+
+ if (ObjectHelper.isEmpty(query)) {
+ throw new IllegalArgumentException("The query option must be set if you want to run a query operation");
+ }
+ return query;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java
new file mode 100644
index 0000000..b35e662
--- /dev/null
+++ b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.influxdb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InfluxDbProducerQueryTest extends AbstractInfluxDbTest {
+
+ @EndpointInject(uri = "mock:test")
+ MockEndpoint successEndpoint;
+
+ @EndpointInject(uri = "mock:error")
+ MockEndpoint errorEndpoint;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+
+ errorHandler(deadLetterChannel("mock:error").redeliveryDelay(0).maximumRedeliveries(0));
+
+ //test route
+ from("direct:test")
+ .to("influxdb:influxDbBean?databaseName={{influxdb.testDb}}")
+ .process(new Processor() {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(InfluxDbConstants.INFLUXDB_QUERY, "select * from cpu");
+ }
+ })
+ .to("influxdb:influxDbBean?databaseName={{influxdb.testDb}}&operation=query")
+ .to("mock:test");
+ }
+ };
+ }
+
+ @Before
+ public void resetEndpoints() {
+ errorEndpoint.reset();
+ successEndpoint.reset();
+ }
+
+ @Test
+ public void writePointFromMapAndStaticDbName() throws InterruptedException {
+
+ errorEndpoint.expectedMessageCount(0);
+ successEndpoint.expectedMessageCount(1);
+
+ Map<String, Object> pointMap = createMapPoint();
+ sendBody("direct:test", pointMap);
+
+ errorEndpoint.assertIsSatisfied();
+ successEndpoint.assertIsSatisfied();
+ }
+
+ private Map<String, Object> createMapPoint() {
+ Map<String, Object> pointMap = new HashMap<>();
+ pointMap.put(InfluxDbConstants.MEASUREMENT_NAME, "MyTestMeasurement");
+ pointMap.put("CPU", 1);
+ return pointMap;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java
index 1705aa2..632200d 100644
--- a/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java
+++ b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java
@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
import static junit.framework.TestCase.assertNotNull;
import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/test/resources/log4j2.properties b/components/camel-influxdb/src/test/resources/log4j2.properties
index aa36063..58c249f 100644
--- a/components/camel-influxdb/src/test/resources/log4j2.properties
+++ b/components/camel-influxdb/src/test/resources/log4j2.properties
@@ -24,5 +24,5 @@ appender.out.type = Console
appender.out.name = out
appender.out.layout.type = PatternLayout
appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
-rootLogger.level = INFO
+rootLogger.level = DEBUG
rootLogger.appenderRef.file.ref = file