You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/01/11 10:35:39 UTC

camel git commit: CAMEL-10690: Camel-InfluxDB: Support Querying

Repository: camel
Updated Branches:
  refs/heads/master 14095d0e7 -> c92845358


CAMEL-10690: Camel-InfluxDB: Support Querying


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c9284535
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c9284535
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c9284535

Branch: refs/heads/master
Commit: c92845358d9f1a4e81416d3c3b560a1a474186af
Parents: 14095d0
Author: Andrea Cosentino <an...@gmail.com>
Authored: Wed Jan 11 11:33:57 2017 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Wed Jan 11 11:33:57 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/influxdb-component.adoc       |  4 +-
 .../component/influxdb/InfluxDbConstants.java   |  1 +
 .../component/influxdb/InfluxDbEndpoint.java    | 26 ++++++
 .../component/influxdb/InfluxDbOperations.java  | 22 +++++
 .../component/influxdb/InfluxDbProducer.java    | 39 +++++++++
 .../influxdb/InfluxDbProducerQueryTest.java     | 87 ++++++++++++++++++++
 .../influxdb/MockedInfluxDbConfiguration.java   |  1 +
 .../src/test/resources/log4j2.properties        |  2 +-
 8 files changed, 180 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/docs/influxdb-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/main/docs/influxdb-component.adoc b/components/camel-influxdb/src/main/docs/influxdb-component.adoc
index ff0a5f3..d171c5f 100644
--- a/components/camel-influxdb/src/main/docs/influxdb-component.adoc
+++ b/components/camel-influxdb/src/main/docs/influxdb-component.adoc
@@ -52,7 +52,7 @@ The InfluxDB component has no options.
 
 
 // endpoint options: START
-The InfluxDB component supports 5 endpoint options which are listed below:
+The InfluxDB component supports 7 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -61,6 +61,8 @@ The InfluxDB component supports 5 endpoint options which are listed below:
 | connectionBean | producer |  | String | *Required* Connection to the influx database of class InfluxDB.class
 | batch | producer | false | boolean | Define if this operation is a batch operation or not
 | databaseName | producer |  | String | The name of the database where the time series will be stored
+| operation | producer | insert | String | Define if this operation is an insert or a query
+| query | producer |  | String | Define the query in case of operation query
 | retentionPolicy | producer | default | String | The string that defines the retention policy to the data created by the endpoint
 | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
 |=======================================================================

http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java
index 06a7124..867d4f6 100644
--- a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java
+++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbConstants.java
@@ -21,6 +21,7 @@ public final class InfluxDbConstants {
     public static final String MEASUREMENT_NAME = "camelInfluxDB.MeasurementName";
     public static final String RETENTION_POLICY_HEADER = "camelInfluxDB.RetentionPolicy";
     public static final String DBNAME_HEADER = "camelInfluxDB.databaseName";
+    public static final String INFLUXDB_QUERY = "camelInfluxDB.query";
     
     private InfluxDbConstants() {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
index efd6d09..1dac976 100644
--- a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
+++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbEndpoint.java
@@ -48,6 +48,10 @@ public class InfluxDbEndpoint extends DefaultEndpoint {
     private String retentionPolicy = "default";
     @UriParam(defaultValue = "false")
     private boolean batch;
+    @UriParam(defaultValue = InfluxDbOperations.INSERT)
+    private String operation = InfluxDbOperations.INSERT;
+    @UriParam
+    private String query;
     
     public InfluxDbEndpoint(String uri, InfluxDbComponent component) {
         super(uri, component);
@@ -134,4 +138,26 @@ public class InfluxDbEndpoint extends DefaultEndpoint {
     public void setBatch(boolean batch) {
         this.batch = batch;
     }
+
+    public String getOperation() {
+        return operation;
+    }
+
+    /**
+     * Define if this operation is an insert or a query
+     */
+    public void setOperation(String operation) {
+        this.operation = operation;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    /**
+     * Define the query in case of operation query
+     */
+    public void setQuery(String query) {
+        this.query = query;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java
new file mode 100644
index 0000000..3a6f114
--- /dev/null
+++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbOperations.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.influxdb;
+
+public interface InfluxDbOperations {
+    String INSERT = "insert";
+    String QUERY = "query";
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
index b071c13..569784f 100644
--- a/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
+++ b/components/camel-influxdb/src/main/java/org/apache/camel/component/influxdb/InfluxDbProducer.java
@@ -17,10 +17,14 @@
 package org.apache.camel.component.influxdb;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
 import org.influxdb.InfluxDB;
 import org.influxdb.dto.BatchPoints;
 import org.influxdb.dto.Point;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +64,19 @@ public class InfluxDbProducer extends DefaultProducer {
 
         String dataBaseName = calculateDatabaseName(exchange);
         String retentionPolicy = calculateRetentionPolicy(exchange);
+        switch (endpoint.getOperation()) {
+        case InfluxDbOperations.INSERT:
+            doInsert(exchange, dataBaseName, retentionPolicy);
+            break;
+        case InfluxDbOperations.QUERY:
+            doQuery(exchange, dataBaseName, retentionPolicy);
+            break;
+        default:
+            throw new IllegalArgumentException("The operation " + endpoint.getOperation() + " is not supported");
+        }
+    }
+
+    private void doInsert(Exchange exchange, String dataBaseName, String retentionPolicy) throws InvalidPayloadException {
         if (!endpoint.isBatch()) {
             Point p = exchange.getIn().getMandatoryBody(Point.class);
 
@@ -83,6 +100,13 @@ public class InfluxDbProducer extends DefaultProducer {
         }
     }
 
+    private void doQuery(Exchange exchange, String dataBaseName, String retentionPolicy) {
+        String query = calculateQuery(exchange);
+        Query influxdbQuery = new Query(query, dataBaseName);
+        QueryResult resultSet = connection.query(influxdbQuery);
+        exchange.getOut().setBody(resultSet);
+    }
+
     private String calculateRetentionPolicy(Exchange exchange) {
         String retentionPolicy = exchange.getIn().getHeader(InfluxDbConstants.RETENTION_POLICY_HEADER, String.class);
 
@@ -102,5 +126,20 @@ public class InfluxDbProducer extends DefaultProducer {
 
         return endpoint.getDatabaseName();
     }
+    
+    private String calculateQuery(Exchange exchange) {
+        String query = exchange.getIn().getHeader(InfluxDbConstants.INFLUXDB_QUERY, String.class);
+
+        if (query != null) {
+            return query;
+        } else {
+            query = endpoint.getQuery();
+        }
+        
+        if (ObjectHelper.isEmpty(query)) {
+            throw new IllegalArgumentException("The query option must be set if you want to run a query operation");
+        }
+        return query;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java
new file mode 100644
index 0000000..b35e662
--- /dev/null
+++ b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/InfluxDbProducerQueryTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.influxdb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InfluxDbProducerQueryTest extends AbstractInfluxDbTest {
+
+    @EndpointInject(uri = "mock:test")
+    MockEndpoint successEndpoint;
+
+    @EndpointInject(uri = "mock:error")
+    MockEndpoint errorEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+
+                errorHandler(deadLetterChannel("mock:error").redeliveryDelay(0).maximumRedeliveries(0));
+
+                //test route
+                from("direct:test")
+                        .to("influxdb:influxDbBean?databaseName={{influxdb.testDb}}")
+                        .process(new Processor() {
+
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                exchange.getIn().setHeader(InfluxDbConstants.INFLUXDB_QUERY, "select * from cpu");
+                            }
+                        })
+                        .to("influxdb:influxDbBean?databaseName={{influxdb.testDb}}&operation=query")
+                        .to("mock:test");
+            }
+        };
+    }
+
+    @Before
+    public void resetEndpoints() {
+        errorEndpoint.reset();
+        successEndpoint.reset();
+    }
+
+    @Test
+    public void writePointFromMapAndStaticDbName() throws InterruptedException {
+
+        errorEndpoint.expectedMessageCount(0);
+        successEndpoint.expectedMessageCount(1);
+
+        Map<String, Object> pointMap = createMapPoint();
+        sendBody("direct:test", pointMap);
+
+        errorEndpoint.assertIsSatisfied();
+        successEndpoint.assertIsSatisfied();
+    }
+
+    private Map<String, Object> createMapPoint() {
+        Map<String, Object> pointMap = new HashMap<>();
+        pointMap.put(InfluxDbConstants.MEASUREMENT_NAME, "MyTestMeasurement");
+        pointMap.put("CPU", 1);
+        return pointMap;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java
index 1705aa2..632200d 100644
--- a/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java
+++ b/components/camel-influxdb/src/test/java/org/apache/camel/component/influxdb/MockedInfluxDbConfiguration.java
@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
 import static junit.framework.TestCase.assertNotNull;
 
 import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Bean;

http://git-wip-us.apache.org/repos/asf/camel/blob/c9284535/components/camel-influxdb/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-influxdb/src/test/resources/log4j2.properties b/components/camel-influxdb/src/test/resources/log4j2.properties
index aa36063..58c249f 100644
--- a/components/camel-influxdb/src/test/resources/log4j2.properties
+++ b/components/camel-influxdb/src/test/resources/log4j2.properties
@@ -24,5 +24,5 @@ appender.out.type = Console
 appender.out.name = out
 appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
-rootLogger.level = INFO
+rootLogger.level = DEBUG
 rootLogger.appenderRef.file.ref = file