You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by al...@apache.org on 2020/05/01 11:30:37 UTC
[zeppelin] branch master updated: [ZEPPELIN-4602] Added initial
version of InfluxDB interpreter
This is an automated email from the ASF dual-hosted git repository.
alexott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 94300b3 [ZEPPELIN-4602] Added initial version of InfluxDB interpreter
94300b3 is described below
commit 94300b329bec22ed514f79f2a72de5105bdb4a28
Author: Robert Hajek <ro...@gmail.com>
AuthorDate: Mon Apr 20 15:09:10 2020 +0200
[ZEPPELIN-4602] Added initial version of InfluxDB interpreter
### What is this PR for?
The goal is to add support for querying InfluxDB 2.x using Flux language in Zeppelin notebook.
InfluxDB 2.0 (beta) docs
https://v2.docs.influxdata.com/v2.0/
Flux language docs:
https://docs.influxdata.com/flux/
### What type of PR is it?
Feature
### What is the Jira issue?
* Open an issue on Jira https://issues.apache.org/jira/browse/ZEPPELIN-4602
### How should this be tested?
* First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
* Strongly recommended: add automated unit tests for any new or changed behavior
* Outline any manual steps to test the PR here.
### Screenshots (if appropriate)
linked in docs/interpreter/influxdb.md
Author: Robert Hajek <ro...@gmail.com>
Closes #3640 from rhajek/master and squashes the following commits:
206848edc [Robert Hajek] [ZEPPELIN-4602] updated influxdb client libraries to 1.7.0, added support for InfluxDB 1.8, interpreter code cleanup
34cd9ed20 [Robert Hajek] [ZEPPELIN-4602] BaseZeppelinContext replaced
8955205c1 [Robert Hajek] [ZEPPELIN-4602] Updated README.md, removed specific maven-checkstyle-plugin configuration
04b7e28d8 [Robert Hajek] [ZEPPELIN-4602] Added licences
b6b9a4848 [Robert Hajek] [ZEPPELIN-4602] Added initial version of InfluxDB interpreter
---
.../themes/zeppelin/img/docs-img/influxdb1.png | Bin 0 -> 159175 bytes
.../themes/zeppelin/img/docs-img/influxdb2.png | Bin 0 -> 76681 bytes
docs/interpreter/influxdb.md | 111 +++++++
influxdb/README.md | 18 ++
influxdb/pom.xml | 79 +++++
.../zeppelin/influxdb/InfluxDBInterpreter.java | 203 +++++++++++++
.../src/main/resources/interpreter-setting.json | 42 +++
.../zeppelin/influxdb/InfluxDBInterpeterTest.java | 327 +++++++++++++++++++++
pom.xml | 1 +
zeppelin-distribution/src/bin_license/LICENSE | 3 +
.../src/bin_license/licenses/LICENSE-influxdb | 21 ++
11 files changed, 805 insertions(+)
diff --git a/docs/assets/themes/zeppelin/img/docs-img/influxdb1.png b/docs/assets/themes/zeppelin/img/docs-img/influxdb1.png
new file mode 100644
index 0000000..6731c56
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/influxdb1.png differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/influxdb2.png b/docs/assets/themes/zeppelin/img/docs-img/influxdb2.png
new file mode 100644
index 0000000..71104ae
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/influxdb2.png differ
diff --git a/docs/interpreter/influxdb.md b/docs/interpreter/influxdb.md
new file mode 100644
index 0000000..64dd084
--- /dev/null
+++ b/docs/interpreter/influxdb.md
@@ -0,0 +1,111 @@
+---
+layout: page
+title: "InfluxDB Interpreter for Apache Zeppelin"
+description: "InfluxDB is an open-source time series database designed to handle high write and query loads."
+group: interpreter
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+{% include JB/setup %}
+
+# InfluxDB Interpreter for Apache Zeppelin
+
+<div id="toc"></div>
+
+## Overview
+[InfluxDB](https://v2.docs.influxdata.com/v2.0/) is an open-source time series database (TSDB) developed by InfluxData. It is written in Go and optimized for fast, high-availability storage and retrieval of time series data in fields such as operations monitoring, application metrics, Internet of Things sensor data, and real-time analytics.
+This interpreter allows to perform queries in [Flux Language](https://v2.docs.influxdata.com/v2.0/reference/flux/) in Zeppelin Notebook.
+
+### Notes
+* This interpreter is compatible with InfluxDB 1.8+ and InfluxDB 2.0+ (v2 API, Flux language)
+* Code complete and syntax highlighting is not supported for now
+
+### Example notebook
+
+![InfluxDB notebook]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/influxdb1.png)
+
+### Configuration
+<table class="table-configuration">
+ <tr>
+ <th>Property</th>
+ <th>Default</th>
+ <th>Value</th>
+ </tr>
+ <tr>
+ <td>influxdb.url</td>
+ <td>http://localhost:9999</td>
+ <td>InfluxDB API connection url</td>
+ </tr>
+ <tr>
+ <td>influxdb.org</td>
+ <td>my-org</td>
+ <td>organization name, Organizations are supported in InfluxDB 2.0+, use "-" as org for InfluxDB 1.8</td>
+ </tr>
+ <tr>
+ <td>influxdb.token</td>
+ <td>my-token</td>
+ <td>authorization token for InfluxDB API, token are supported in InfluxDB 2.0+, for InfluxDB 1.8 use 'username:password' as a token.</td>
+ </tr>
+ <tr>
+ <td>influxdb.logLevel</td>
+ <td>NONE</td>
+ <td>InfluxDB client library verbosity level (for debugging purpose)</td>
+ </tr>
+</table>
+
+#### Example configuration
+
+![InfluxDB notebook]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/influxdb2.png)
+
+## Overview
+
+
+## How to use
+Basically, you can use
+
+```
+%influxdb
+from(bucket: "my-bucket")
+ |> range(start: -1h)
+ |> filter(fn: (r) => r._measurement == "cpu")
+ |> filter(fn: (r) => r.cpu == "cpu-total")
+ |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
+```
+In this example we use data collected by `[[inputs.cpu]]` [Telegraf](https://github.com/influxdata/telegraf) input plugin.
+
+The result of Flux command can contain more one or more tables. In the case of multiple tables, each
+table is rendered as a separate %table structure. This example uses `pivot`
+function to collect values from multiple tables into single table.
+
+## How to run InfluxDB 2.0 using docker
+```bash
+docker pull quay.io/influxdb/influxdb:nightly
+docker run --name influxdb -p 9999:9999 quay.io/influxdb/influxdb:nightly
+
+## Post onBoarding request, to setup initial user (my-user@my-password), org (my-org) and bucketSetup (my-bucket)"
+curl -i -X POST http://localhost:9999/api/v2/setup -H 'accept: application/json' \
+ -d '{
+ "username": "my-user",
+ "password": "my-password",
+ "org": "my-org",
+ "bucket": "my-bucket",
+ "token": "my-token"
+ }'
+
+```
+
+
+
+
+
diff --git a/influxdb/README.md b/influxdb/README.md
new file mode 100644
index 0000000..06ba34f
--- /dev/null
+++ b/influxdb/README.md
@@ -0,0 +1,18 @@
+InfluxDB 2.0 interpreter for Apache Zeppelin
+============================================
+
+## Description:
+
+Provide InfluxDB Interpreter for Zeppelin.
+
+## Build
+
+```
+mvn -pl influxdb -am -DskipTests package
+```
+
+## Test
+
+```
+mvn -pl influxdb -am -Dtest='org.apache.zeppelin.influxdb.*' -DfailIfNoTests=false test
+```
diff --git a/influxdb/pom.xml b/influxdb/pom.xml
new file mode 100644
index 0000000..8ab2dd9
--- /dev/null
+++ b/influxdb/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>zeppelin-interpreter-parent</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>zeppelin-influxdb</artifactId>
+ <packaging>jar</packaging>
+ <version>0.9.0-SNAPSHOT</version>
+ <name>Zeppelin: InfluxDB interpreter</name>
+ <description>InfluxDB 2.0 timeseries database support</description>
+
+ <properties>
+ <interpreter.name>influxdb</interpreter.name>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <influxdb.client.version>1.7.0</influxdb.client.version>
+ <dependency.okhttp3.version>3.13.1</dependency.okhttp3.version>
+ <dependency.gson.version>2.8.5</dependency.gson.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.influxdb</groupId>
+ <artifactId>influxdb-client-java</artifactId>
+ <version>${influxdb.client.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${dependency.gson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${dependency.okhttp3.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java
new file mode 100644
index 0000000..1bbcfcd
--- /dev/null
+++ b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.influxdb;
+
+import java.util.Properties;
+import java.util.StringJoiner;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.influxdb.LogLevel;
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import com.influxdb.client.InfluxDBClientOptions;
+import com.influxdb.client.QueryApi;
+import org.apache.zeppelin.interpreter.AbstractInterpreter;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+
+/**
+ * <a href="https://v2.docs.influxdata.com/v2.0/">InfluxDB 2.0</a> interpreter for Zeppelin.
+ * It uses /v2/query API, query is written in Flux Language.
+ * <p>
+ * How to use: <br/>
+ * <pre>
+ * {@code
+ * %influxdb
+ * from(bucket: "my-bucket")
+ * |> range(start: -5m)
+ * |> filter(fn: (r) => r._measurement == "cpu")
+ * |> filter(fn: (r) => r._field == "usage_user")
+ * |> filter(fn: (r) => r.cpu == "cpu-total")
+ * }
+ * </pre>
+ * </p>
+ */
+public class InfluxDBInterpreter extends AbstractInterpreter {
+
+ private static final String INFLUXDB_API_URL_PROPERTY = "influxdb.url";
+ private static final String INFLUXDB_TOKEN_PROPERTY = "influxdb.token";
+ private static final String INFLUXDB_ORG_PROPERTY = "influxdb.org";
+ private static final String INFLUXDB_LOGLEVEL_PROPERTY = "influxdb.logLevel";
+
+ private static final String TABLE_MAGIC_TAG = "%table ";
+ private static final String WHITESPACE = " ";
+ private static final String NEWLINE = "\n";
+ private static final String TAB = "\t";
+ private static final String EMPTY_COLUMN_VALUE = "";
+
+ private volatile InfluxDBClient client;
+ private volatile QueryApi queryApi;
+
+ public InfluxDBInterpreter(Properties properties) {
+ super(properties);
+ }
+
+ @Override
+ public ZeppelinContext getZeppelinContext() {
+ return null;
+ }
+
+ @Override
+ protected InterpreterResult internalInterpret(String query, InterpreterContext context)
+ throws InterpreterException {
+
+ logger.debug("Run Flux command '{}'", query);
+ query = query.trim();
+
+ QueryApi queryService = getInfluxDBClient(context);
+
+ final int[] actualIndex = {-1};
+
+ AtomicReference<InterpreterResult> resultRef = new AtomicReference<>();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ StringBuilder result = new StringBuilder();
+ queryService.query(
+ query,
+
+ //process record
+ (cancellable, fluxRecord) -> {
+
+ Integer tableIndex = fluxRecord.getTable();
+ if (actualIndex[0] != tableIndex) {
+ result.append(NEWLINE);
+ result.append(TABLE_MAGIC_TAG);
+ actualIndex[0] = tableIndex;
+
+ //add column names to table header
+ StringJoiner joiner = new StringJoiner(TAB);
+ fluxRecord.getValues().keySet().forEach(c -> joiner.add(replaceReservedChars(c)));
+ result.append(joiner.toString());
+ result.append(NEWLINE);
+ }
+
+ StringJoiner rowsJoiner = new StringJoiner(TAB);
+ for (Object value : fluxRecord.getValues().values()) {
+ if (value == null) {
+ value = EMPTY_COLUMN_VALUE;
+ }
+ rowsJoiner.add(replaceReservedChars(value.toString()));
+ }
+ result.append(rowsJoiner.toString());
+ result.append(NEWLINE);
+ },
+
+ throwable -> {
+
+ logger.error(throwable.getMessage(), throwable);
+ resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR,
+ throwable.getMessage()));
+
+ countDownLatch.countDown();
+
+ }, () -> {
+ //on complete
+ InterpreterResult intpResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
+ intpResult.add(result.toString());
+ resultRef.set(intpResult);
+ countDownLatch.countDown();
+ }
+ );
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ throw new InterpreterException(e);
+ }
+
+ return resultRef.get();
+ }
+
+
+ private QueryApi getInfluxDBClient(InterpreterContext context) {
+ if (queryApi == null) {
+ queryApi = this.client.getQueryApi();
+ }
+ return queryApi;
+ }
+
+
+ @Override
+ public void open() throws InterpreterException {
+
+ if (this.client == null) {
+ InfluxDBClientOptions opt = InfluxDBClientOptions.builder()
+ .url(getProperty(INFLUXDB_API_URL_PROPERTY))
+ .authenticateToken(getProperty(INFLUXDB_TOKEN_PROPERTY).toCharArray())
+ .logLevel(LogLevel.valueOf(
+ getProperty(INFLUXDB_LOGLEVEL_PROPERTY, LogLevel.NONE.toString())))
+ .org(getProperty(INFLUXDB_ORG_PROPERTY))
+ .build();
+
+ this.client = InfluxDBClientFactory.create(opt);
+ }
+ }
+
+ @Override
+ public void close() throws InterpreterException {
+ if (this.client != null) {
+ this.client.close();
+ this.client = null;
+ }
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) throws InterpreterException {
+
+ }
+
+ @Override
+ public FormType getFormType() throws InterpreterException {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) throws InterpreterException {
+ return 0;
+ }
+
+ /**
+ * For %table response replace Tab and Newline.
+ */
+ private String replaceReservedChars(String str) {
+ if (str == null) {
+ return EMPTY_COLUMN_VALUE;
+ }
+ return str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE);
+ }
+
+}
diff --git a/influxdb/src/main/resources/interpreter-setting.json b/influxdb/src/main/resources/interpreter-setting.json
new file mode 100644
index 0000000..363b9ef
--- /dev/null
+++ b/influxdb/src/main/resources/interpreter-setting.json
@@ -0,0 +1,42 @@
+[
+ {
+ "group": "influxdb",
+ "name": "influxdb",
+ "className": "org.apache.zeppelin.influxdb.InfluxDBInterpreter",
+ "properties": {
+ "influxdb.url": {
+ "envName": null,
+ "propertyName": "influxdb.url",
+ "defaultValue": "http://localhost:9999",
+ "description": "The URL for InfluxDB 2.X API",
+ "type": "string"
+ },
+ "influxdb.token": {
+ "envName": null,
+ "propertyName": "influxdb.token",
+ "defaultValue": "my-token",
+ "description": "InfluxDB auth token",
+ "type": "password"
+ },
+ "influxdb.org": {
+ "envName": null,
+ "propertyName": "influxdb.org",
+ "defaultValue": "my-org",
+ "description": "InfluxDB org name",
+ "type": "string"
+ },
+ "influxdb.logLevel": {
+ "envName": null,
+ "propertyName": "influxdb.logLevel",
+ "defaultValue": "NONE",
+ "description": "InfluxDB http client library verbosity level (NONE, BASIC, HEADERS, BODY)",
+ "type": "string"
+ }
+ },
+ "editor": {
+ "language": "sql",
+ "editOnDblClick": false,
+ "completionSupport": false
+ }
+ }
+]
diff --git a/influxdb/src/test/java/org/apache/zeppelin/influxdb/InfluxDBInterpeterTest.java b/influxdb/src/test/java/org/apache/zeppelin/influxdb/InfluxDBInterpeterTest.java
new file mode 100644
index 0000000..3ed6385
--- /dev/null
+++ b/influxdb/src/test/java/org/apache/zeppelin/influxdb/InfluxDBInterpeterTest.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.zeppelin.influxdb;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nonnull;
+
+import com.influxdb.LogLevel;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class InfluxDBInterpeterTest {
+
+ Properties properties;
+
+ static final String SINGLE_TABLE_RESPONSE =
+ "#datatype,string,long,dateTime:RFC3339,double,string\n" +
+ "#group,false,false,false,false,true\n" +
+ "#default,_result,,,,\n" +
+ ",result,table,_time,_value,_field\n" +
+ ",,0,2020-01-24T10:23:56Z,12.114014251781473,usage_user\n" +
+ ",,0,2020-01-24T10:23:57Z,12.048493938257717,usage_user\n" +
+ ",,0,2020-01-24T10:24:06Z,12.715678919729932,usage_user\n" +
+ ",,0,2020-01-24T10:24:07Z,11.876484560570072,usage_user\n" +
+ ",,0,2020-01-24T10:24:16Z,10.044977511244378,usage_user\n" +
+ ",,0,2020-01-24T10:24:17Z,10.594702648675662,usage_user\n" +
+ ",,0,2020-01-24T10:24:26Z,12.092034512942353,usage_user\n" +
+ ",,0,2020-01-24T10:24:27Z,12.131065532766383,usage_user\n" +
+ ",,0,2020-01-24T10:24:36Z,14.332125452955141,usage_user\n" +
+ ",,0,2020-01-24T10:24:37Z,15.153788447111777,usage_user";
+
+ static final String MULTI_TABLE_RESPONSE =
+ "#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339," +
+ "string,string,string,string,double,dateTime:RFC3339\n" +
+ "#group,false,false,true,true,true,true,true,true,false,false\n" +
+ "#default,_result,,,,,,,,,\n" +
+ ",result,table,_start,_stop,_field,_measurement,cpu,host,_value,_time\n" +
+ ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," +
+ "cpu-total,macek.local,12.381414297598637,2020-01-24T09:28:00Z\n" +
+ ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," +
+ "cpu-total,macek.local,18.870254041431455,2020-01-24T09:29:00Z\n" +
+ ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," +
+ "cpu-total,macek.local,26.64080311971415,2020-01-24T09:30:00Z\n" +
+ ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," +
+ "cpu-total,macek.local,11.644120979499911,2020-01-24T09:31:00Z\n" +
+ ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," +
+ "cpu-total,macek.local,16.046354351571846,2020-01-24T09:32:00Z\n" +
+ ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," +
+ "macek.local,23.525686625686625,2020-01-24T09:28:00Z\n" +
+ ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," +
+ "macek.local,31.582258129037516,2020-01-24T09:29:00Z\n" +
+ ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," +
+ "macek.local,39.20349852756812,2020-01-24T09:30:00Z\n" +
+ ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," +
+ "macek.local,23.533275499942164,2020-01-24T09:31:00Z\n" +
+ ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," +
+ "macek.local,19.11247206247206,2020-01-24T09:32:00Z\n" +
+ ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," +
+ "macek.local,3.775801800801801,2020-01-24T09:28:00Z\n" +
+ ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," +
+ "macek.local,8.776226876226875,2020-01-24T09:29:00Z\n" +
+ ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," +
+ "macek.local,16.15592568092568,2020-01-24T09:30:00Z\n" +
+ ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," +
+ "macek.local,3.466367149700483,2020-01-24T09:31:00Z\n" +
+ ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," +
+ "macek.local,10.123511023511023,2020-01-24T09:32:00Z\n" +
+ ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," +
+ "macek.local,23.186861861861857,2020-01-24T09:28:00Z\n" +
+ ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," +
+ "macek.local,30.502449226101927,2020-01-24T09:29:00Z\n" +
+ ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," +
+ "macek.local,37.800263500263505,2020-01-24T09:30:00Z\n" +
+ ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," +
+ "macek.local,21.04487655320989,2020-01-24T09:31:00Z\n" +
+ ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," +
+ "macek.local,23.40988960155627,2020-01-24T09:32:00Z\n" +
+ ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," +
+ "macek.local,3.7013513513513514,2020-01-24T09:28:00Z\n" +
+ ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," +
+ "macek.local,8.669684156858507,2020-01-24T09:29:00Z\n" +
+ ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," +
+ "macek.local,16.4761093606771,2020-01-24T09:30:00Z\n" +
+ ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," +
+ "macek.local,3.416193908762379,2020-01-24T09:31:00Z\n" +
+ ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," +
+ "macek.local,10.391479708146376,2020-01-24T09:32:00Z\n" +
+ ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," +
+ "macek.local,20.520504495504497,2020-01-24T09:28:00Z\n" +
+ ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," +
+ "macek.local,28.435828535828534,2020-01-24T09:29:00Z\n" +
+ ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," +
+ "macek.local,35.76454396684968,2020-01-24T09:30:00Z\n" +
+ ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," +
+ "macek.local,18.94977031643698,2020-01-24T09:31:00Z\n" +
+ ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," +
+ "macek.local,22.81423008923009,2020-01-24T09:32:00Z\n" +
+ ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," +
+ "macek.local,3.4502771752771753,2020-01-24T09:28:00Z\n" +
+ ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," +
+ "macek.local,8.617365310885685,2020-01-24T09:29:00Z\n" +
+ ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," +
+ "macek.local,16.5813353653174,2020-01-24T09:30:00Z\n" +
+ ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," +
+ "macek.local,3.341634649967983,2020-01-24T09:31:00Z\n" +
+ ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," +
+ "macek.local,10.489286880953548,2020-01-24T09:32:00Z\n" +
+ ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," +
+ "macek.local,17.42073857073857,2020-01-24T09:28:00Z\n" +
+ ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," +
+ "macek.local,25.555054526024517,2020-01-24T09:29:00Z\n" +
+ ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," +
+ "macek.local,34.19774496441163,2020-01-24T09:30:00Z\n" +
+ ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," +
+ "macek.local,15.985298393631725,2020-01-24T09:31:00Z\n" +
+ ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," +
+ "macek.local,21.359203467536798,2020-01-24T09:32:00Z\n" +
+ ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," +
+ "macek.local,3.4507517507517504,2020-01-24T09:28:00Z\n" +
+ ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," +
+ "macek.local,8.817554700888033,2020-01-24T09:29:00Z\n" +
+ ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," +
+ "macek.local,16.957243048909714,2020-01-24T09:30:00Z\n" +
+ ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," +
+ "macek.local,3.408601950268617,2020-01-24T09:31:00Z\n" +
+ ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," +
+ "macek.local,10.672760839427506,2020-01-24T09:32:00Z";
+
+ protected MockWebServer mockServer;
+
+ /**
+ * Start Mock server.
+ *
+ * @return the mock server URL
+ */
+ @Nonnull
+ protected String startMockServer() {
+
+ mockServer = new MockWebServer();
+ try {
+ mockServer.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return mockServer.url("/").url().toString();
+ }
+
+
+ @Nonnull
+ protected MockResponse createResponse(final String data) {
+ return createResponse(data, "text/csv", true);
+ }
+
+ @Nonnull
+ protected MockResponse createResponse(final String data, final String
+ contentType, final boolean chunked) {
+
+ MockResponse response = new MockResponse()
+ .setHeader("Content-Type", contentType + "; charset=utf-8")
+ .setHeader("Date", "Tue, 26 Jun 2018 13:15:01 GMT");
+
+ if (chunked) {
+ response.setChunkedBody(data, data.length());
+ } else {
+ response.setBody(data);
+ }
+
+ return response;
+ }
+
+
+ @Before
+ public void before() throws InterpreterException {
+ //properties for local influxdb2 server
+ properties = new Properties();
+ //properties.setProperty("influxdb.url", "http://localhost:9999");
+ properties.setProperty("influxdb.url", startMockServer());
+
+ properties.setProperty("influxdb.token", "my-token");
+ properties.setProperty("influxdb.org", "my-org");
+ properties.setProperty("influxdb.logLevel", LogLevel.BODY.toString());
+ }
+
+ @After
+ public void after() throws IOException {
+ if (mockServer != null) {
+ mockServer.shutdown();
+ }
+ }
+
+ @Test
+ public void testSigleTable() throws InterpreterException {
+
+ InfluxDBInterpreter t = new InfluxDBInterpreter(properties);
+ t.open();
+
+ //just for testing with real influxdb (not used in mock)
+ String flux = "from(bucket: \"my-bucket\")\n" +
+ " |> range(start:-1m)\n" +
+ " |> filter(fn: (r) => r._measurement == \"cpu\")\n" +
+ " |> filter(fn: (r) => r._field == \"usage_user\")\n" +
+ " |> filter(fn: (r) => r.cpu == \"cpu-total\")\n" +
+ " |> limit(n:5, offset: 0)" +
+ " |> keep(columns: [\"_field\", \"_value\", \"_time\"])";
+
+ InterpreterContext context = InterpreterContext.builder()
+ .setAuthenticationInfo(new AuthenticationInfo("testUser"))
+ .build();
+
+ mockServer.enqueue(createResponse(SINGLE_TABLE_RESPONSE));
+
+ InterpreterResult interpreterResult = t.interpret(flux, context);
+
+ // if prefix not found return ERROR and Prefix not found.
+ assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+
+ List<InterpreterResultMessage> message = interpreterResult.message();
+ Assert.assertEquals(1, message.size());
+ Assert.assertEquals(InterpreterResult.Type.TABLE, message.get(0).getType());
+ Assert.assertEquals("result\ttable\t_time\t_value\t_field\n" +
+ "_result\t0\t2020-01-24T10:23:56Z\t12.114014251781473\tusage_user\n" +
+ "_result\t0\t2020-01-24T10:23:57Z\t12.048493938257717\tusage_user\n" +
+ "_result\t0\t2020-01-24T10:24:06Z\t12.715678919729932\tusage_user\n" +
+ "_result\t0\t2020-01-24T10:24:07Z\t11.876484560570072\tusage_user\n" +
+ "_result\t0\t2020-01-24T10:24:16Z\t10.044977511244378\tusage_user\n" +
+ "_result\t0\t2020-01-24T10:24:17Z\t10.594702648675662\tusage_user\n" +
+ "_result\t0\t2020-01-24T10:24:26Z\t12.092034512942353\tusage_user\n" +
+ "_result\t0\t2020-01-24T10:24:27Z\t12.131065532766383\tusage_user\n" +
+ "_result\t0\t2020-01-24T10:24:36Z\t14.332125452955141\tusage_user\n" +
+ "_result\t0\t2020-01-24T10:24:37Z\t15.153788447111777\tusage_user\n",
+ message.get(0).getData());
+
+ t.close();
+ }
+
+ @Test
+ public void testMultiTable() throws InterpreterException {
+
+ InfluxDBInterpreter t = new InfluxDBInterpreter(properties);
+ t.open();
+
+ //just for testing with real influxdb (not used in mock)
+ String flux = "from(bucket: \"my-bucket\")\n" +
+ " |> range(start: -1h)\n" +
+ " |> filter(fn: (r) => r._measurement == \"cpu\")\n" +
+ " |> filter(fn: (r) => r._field == \"usage_user\")\n" +
+ " |> aggregateWindow(every: 1m, fn: mean)\n" +
+ " |> limit(n:5, offset: 0)";
+
+ InterpreterContext context = InterpreterContext.builder()
+ .setAuthenticationInfo(new AuthenticationInfo("testUser"))
+ .build();
+
+ mockServer.enqueue(createResponse(MULTI_TABLE_RESPONSE));
+ InterpreterResult interpreterResult = t.interpret(flux, context);
+
+ // if prefix not found return ERROR and Prefix not found.
+ if (InterpreterResult.Code.ERROR.equals(interpreterResult.code())) {
+ Assert.fail(interpreterResult.toString());
+ }
+
+ assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+ List<InterpreterResultMessage> message = interpreterResult.message();
+
+ Assert.assertEquals(9, message.size());
+
+ message.forEach(m -> Assert.assertEquals(InterpreterResult.Type.TABLE, m.getType()));
+
+ Assert.assertEquals(
+ "result\ttable\t_start\t_stop\t_field\t_measurement\tcpu\thost\t_value\t_time\n" +
+ "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+ "\tcpu\tcpu-total\tmacek.local\t12.381414297598637\t2020-01-24T09:28:00Z\n" +
+ "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+ "\tcpu\tcpu-total\tmacek.local\t18.870254041431455\t2020-01-24T09:29:00Z\n" +
+ "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+ "\tcpu\tcpu-total\tmacek.local\t26.64080311971415\t2020-01-24T09:30:00Z\n" +
+ "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+ "\tcpu\tcpu-total\tmacek.local\t11.644120979499911\t2020-01-24T09:31:00Z\n" +
+ "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+ "\tcpu\tcpu-total\tmacek.local\t16.046354351571846\t2020-01-24T09:32:00Z\n",
+ message.get(0).getData());
+
+ Assert.assertEquals("result\ttable\t_start\t_stop\t_field\t_measurement\tcpu\thost\t_value" +
+ "\t_time\n" +
+ "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+ "\tcpu\tcpu7\tmacek.local\t3.4507517507517504\t2020-01-24T09:28:00Z\n" +
+ "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+ "\tcpu\tcpu7\tmacek.local\t8.817554700888033\t2020-01-24T09:29:00Z\n" +
+ "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+ "\tcpu\tcpu7\tmacek.local\t16.957243048909714\t2020-01-24T09:30:00Z\n" +
+ "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+ "\tcpu\tcpu7\tmacek.local\t3.408601950268617\t2020-01-24T09:31:00Z\n" +
+ "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+ "\tcpu\tcpu7\tmacek.local\t10.672760839427506\t2020-01-24T09:32:00Z\n",
+ message.get(8).getData());
+
+ t.close();
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 8c358f3..6e6e232 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
<module>file</module>
<module>flink</module>
<module>ignite</module>
+ <module>influxdb</module>
<module>kylin</module>
<module>python</module>
<module>lens</module>
diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE
index 386398e..5a448ac 100644
--- a/zeppelin-distribution/src/bin_license/LICENSE
+++ b/zeppelin-distribution/src/bin_license/LICENSE
@@ -136,6 +136,7 @@ The following components are provided under Apache License.
(Apache 2.0) Google Guice - Core Library (com.google.inject:guice:3.0 - http://code.google.com/p/google-guice/guice/)
(Apache 2.0) OkHttp (com.squareup.okhttp:okhttp:2.5.0 - https://github.com/square/okhttp/okhttp)
(Apache 2.0) Okio (com.squareup.okio:okio:1.6.0 - https://github.com/square/okio/okio)
+ (Apache 2.0) OkHttp mockwebserver (com.squareup.okhttp3:mockwebserver:3.13.1) - https://github.com/square/okhttp/blob/master/LICENSE.txt
(Apache 2.0) config (com.typesafe:config:1.2.1 - https://github.com/typesafehub/config)
(Apache 2.0) akka-actor (com.typesafe.akka:akka-actor_2.10:2.3.7 - http://akka.io/)
(Apache 2.0) akka-remote (com.typesafe.akka:akka-remote_2.10:2.3.7 - http://akka.io/)
@@ -221,6 +222,7 @@ The following components are provided under Apache License.
(Apache 2.0) mongo-java-driver 3.4.1 (org.mongodb:mongo-java-driver:3.4.1) - https://github.com/mongodb/mongo-java-driver/blob/master/LICENSE.txt
(Apache 2.0) Neo4j Java Driver (https://github.com/neo4j/neo4j-java-driver) - https://github.com/neo4j/neo4j-java-driver/blob/1.4.3/LICENSE.txt
(Apache 2.0) Hazelcast Jet (http://jet.hazelcast.org) - https://github.com/hazelcast/hazelcast-jet/blob/master/LICENSE
+ (Apache 2.0) RxJava (io.reactivex.rxjava2:rxjava:2.2.17) - https://github.com/ReactiveX/RxJava/blob/2.x/LICENSE
========================================================================
MIT licenses
@@ -278,6 +280,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
(The MIT License) angular-viewport-watch 0.135 (https://github.com/wix/angular-viewport-watch) - https://github.com/wix/angular-viewport-watch/blob/master/LICENSE
(The MIT License) ansi-up 2.0.2 (https://github.com/drudru/ansi_up) - https://github.com/drudru/ansi_up#license
(The MIT License) bcpkix-jdk15on 1.60 (org.bouncycastle:bcpkix-jdk15on:1.60 https://github.com/bcgit/bc-java) - https://github.com/bcgit/bc-java/blob/master/LICENSE.html
+ (The MIT License) influxdb-client-java 1.4.0 (com.influxdb:influxdb-client-java:1.4.0 https://github.com/influxdata/influxdb-client-java) - https://github.com/influxdata/influxdb-client-java/blob/master/LICENSE
========================================================================
BSD-style licenses
diff --git a/zeppelin-distribution/src/bin_license/licenses/LICENSE-influxdb b/zeppelin-distribution/src/bin_license/licenses/LICENSE-influxdb
new file mode 100644
index 0000000..75f97cf
--- /dev/null
+++ b/zeppelin-distribution/src/bin_license/licenses/LICENSE-influxdb
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Influxdata, Inc.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file