You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by al...@apache.org on 2020/05/01 11:30:37 UTC

[zeppelin] branch master updated: [ZEPPELIN-4602] Added initial version of InfluxDB interpreter

This is an automated email from the ASF dual-hosted git repository.

alexott pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 94300b3  [ZEPPELIN-4602] Added initial version of InfluxDB interpreter
94300b3 is described below

commit 94300b329bec22ed514f79f2a72de5105bdb4a28
Author: Robert Hajek <ro...@gmail.com>
AuthorDate: Mon Apr 20 15:09:10 2020 +0200

    [ZEPPELIN-4602] Added initial version of InfluxDB interpreter
    
    ### What is this PR for?
    The goal is to add support for querying InfluxDB 2.x using Flux language in Zeppelin notebook.
    
    InfluxDB 2.0 (beta) docs
    https://v2.docs.influxdata.com/v2.0/
    
    Flux language docs:
    https://docs.influxdata.com/flux/
    
    ### What type of PR is it?
    Feature
    
    ### What is the Jira issue?
    * Open an issue on Jira https://issues.apache.org/jira/browse/ZEPPELIN-4602
    
    ### How should this be tested?
    * First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration
    * Strongly recommended: add automated unit tests for any new or changed behavior
    * Outline any manual steps to test the PR here.
    
    ### Screenshots (if appropriate)
    linked in docs/interpreter/influxdb.md
    
    Author: Robert Hajek <ro...@gmail.com>
    
    Closes #3640 from rhajek/master and squashes the following commits:
    
    206848edc [Robert Hajek] [ZEPPELIN-4602] updated influxdb client libraries to 1.7.0, added support for InfluxDB 1.8, interpreter code cleanup
    34cd9ed20 [Robert Hajek] [ZEPPELIN-4602] BaseZeppelinContext replaced
    8955205c1 [Robert Hajek] [ZEPPELIN-4602] Updated README.md, removed specific maven-checkstyle-plugin configuration
    04b7e28d8 [Robert Hajek] [ZEPPELIN-4602] Added licences
    b6b9a4848 [Robert Hajek] [ZEPPELIN-4602] Added initial version of InfluxDB interpreter
---
 .../themes/zeppelin/img/docs-img/influxdb1.png     | Bin 0 -> 159175 bytes
 .../themes/zeppelin/img/docs-img/influxdb2.png     | Bin 0 -> 76681 bytes
 docs/interpreter/influxdb.md                       | 111 +++++++
 influxdb/README.md                                 |  18 ++
 influxdb/pom.xml                                   |  79 +++++
 .../zeppelin/influxdb/InfluxDBInterpreter.java     | 203 +++++++++++++
 .../src/main/resources/interpreter-setting.json    |  42 +++
 .../zeppelin/influxdb/InfluxDBInterpeterTest.java  | 327 +++++++++++++++++++++
 pom.xml                                            |   1 +
 zeppelin-distribution/src/bin_license/LICENSE      |   3 +
 .../src/bin_license/licenses/LICENSE-influxdb      |  21 ++
 11 files changed, 805 insertions(+)

diff --git a/docs/assets/themes/zeppelin/img/docs-img/influxdb1.png b/docs/assets/themes/zeppelin/img/docs-img/influxdb1.png
new file mode 100644
index 0000000..6731c56
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/influxdb1.png differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/influxdb2.png b/docs/assets/themes/zeppelin/img/docs-img/influxdb2.png
new file mode 100644
index 0000000..71104ae
Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/influxdb2.png differ
diff --git a/docs/interpreter/influxdb.md b/docs/interpreter/influxdb.md
new file mode 100644
index 0000000..64dd084
--- /dev/null
+++ b/docs/interpreter/influxdb.md
@@ -0,0 +1,111 @@
+---
+layout: page
+title: "InfluxDB Interpreter for Apache Zeppelin"
+description: "InfluxDB is an open-source time series database designed to handle high write and query loads."
+group: interpreter
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+{% include JB/setup %}
+
+# InfluxDB Interpreter for Apache Zeppelin
+
+<div id="toc"></div>
+
+## Overview
+[InfluxDB](https://v2.docs.influxdata.com/v2.0/)  is an open-source time series database (TSDB) developed by InfluxData. It is written in Go and optimized for fast, high-availability storage and retrieval of time series data in fields such as operations monitoring, application metrics, Internet of Things sensor data, and real-time analytics.
+This interpreter allows to perform queries in [Flux Language](https://v2.docs.influxdata.com/v2.0/reference/flux/) in Zeppelin Notebook.
+
+### Notes
+* This interpreter is compatible with InfluxDB 1.8+ and InfluxDB 2.0+ (v2 API, Flux language)
+* Code complete and syntax highlighting is not supported for now
+
+### Example notebook
+
+![InfluxDB notebook]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/influxdb1.png)
+
+### Configuration
+<table class="table-configuration">
+  <tr>
+    <th>Property</th>
+    <th>Default</th>
+    <th>Value</th>
+  </tr>
+  <tr>
+    <td>influxdb.url</td>
+    <td>http://localhost:9999</td>
+    <td>InfluxDB API connection url</td>
+  </tr>
+  <tr>
+    <td>influxdb.org</td>
+    <td>my-org</td>
+    <td>organization name, Organizations are supported in InfluxDB 2.0+, use "-" as org for InfluxDB 1.8</td>
+  </tr>
+  <tr>
+    <td>influxdb.token</td>
+    <td>my-token</td>
+    <td>authorization token for InfluxDB API, token are supported in InfluxDB 2.0+, for InfluxDB 1.8 use 'username:password' as a token.</td>
+  </tr>
+  <tr>
+    <td>influxdb.logLevel</td>
+    <td>NONE</td>
+    <td>InfluxDB client library verbosity level (for debugging purpose)</td>
+  </tr>
+</table>
+
+#### Example configuration
+
+![InfluxDB notebook]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/influxdb2.png)
+
+## Overview
+
+
+## How to use
+Basically, you can use
+
+```
+%influxdb
+from(bucket: "my-bucket")
+  |> range(start: -1h)
+  |> filter(fn: (r) => r._measurement == "cpu")
+  |> filter(fn: (r) => r.cpu == "cpu-total")
+  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
+```
+In this example we use data collected by  `[[inputs.cpu]]` [Telegraf](https://github.com/influxdata/telegraf) input plugin. 
+
+The result of Flux command can contain more one or more tables. In the case of multiple tables, each 
+table is rendered as a separate %table structure. This example uses `pivot` 
+function to collect values from multiple tables into single table. 
+
+## How to run InfluxDB 2.0 using docker
+```bash
+docker pull quay.io/influxdb/influxdb:nightly
+docker run --name influxdb -p 9999:9999 quay.io/influxdb/influxdb:nightly
+
+## Post onBoarding request, to setup initial user (my-user@my-password), org (my-org) and bucketSetup (my-bucket)"
+curl -i -X POST http://localhost:9999/api/v2/setup -H 'accept: application/json' \
+    -d '{
+            "username": "my-user",
+            "password": "my-password",
+            "org": "my-org",
+            "bucket": "my-bucket",
+            "token": "my-token"
+        }'
+
+```
+
+
+ 
+    
+
diff --git a/influxdb/README.md b/influxdb/README.md
new file mode 100644
index 0000000..06ba34f
--- /dev/null
+++ b/influxdb/README.md
@@ -0,0 +1,18 @@
+InfluxDB 2.0 interpreter for Apache Zeppelin
+============================================
+
+## Description:
+
+Provide InfluxDB Interpreter for Zeppelin.
+
+## Build
+
+```
+mvn -pl influxdb -am -DskipTests package
+```
+
+## Test
+
+```
+mvn -pl influxdb -am -Dtest='org.apache.zeppelin.influxdb.*' -DfailIfNoTests=false test
+```
diff --git a/influxdb/pom.xml b/influxdb/pom.xml
new file mode 100644
index 0000000..8ab2dd9
--- /dev/null
+++ b/influxdb/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>zeppelin-interpreter-parent</artifactId>
+        <groupId>org.apache.zeppelin</groupId>
+        <version>0.9.0-SNAPSHOT</version>
+        <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>zeppelin-influxdb</artifactId>
+    <packaging>jar</packaging>
+    <version>0.9.0-SNAPSHOT</version>
+    <name>Zeppelin: InfluxDB interpreter</name>
+    <description>InfluxDB 2.0 timeseries database support</description>
+
+    <properties>
+        <interpreter.name>influxdb</interpreter.name>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <influxdb.client.version>1.7.0</influxdb.client.version>
+        <dependency.okhttp3.version>3.13.1</dependency.okhttp3.version>
+        <dependency.gson.version>2.8.5</dependency.gson.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.influxdb</groupId>
+            <artifactId>influxdb-client-java</artifactId>
+            <version>${influxdb.client.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>${dependency.gson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${dependency.okhttp3.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-shade-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java
new file mode 100644
index 0000000..1bbcfcd
--- /dev/null
+++ b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.influxdb;
+
+import java.util.Properties;
+import java.util.StringJoiner;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.influxdb.LogLevel;
+import com.influxdb.client.InfluxDBClient;
+import com.influxdb.client.InfluxDBClientFactory;
+import com.influxdb.client.InfluxDBClientOptions;
+import com.influxdb.client.QueryApi;
+import org.apache.zeppelin.interpreter.AbstractInterpreter;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+
+/**
+ * <a href="https://v2.docs.influxdata.com/v2.0/">InfluxDB 2.0</a> interpreter for Zeppelin.
+ * It uses /v2/query API, query is written in Flux Language.
+ * <p>
+ * How to use: <br/>
+ * <pre>
+ * {@code
+ * %influxdb
+ * from(bucket: "my-bucket")
+ *   |> range(start: -5m)
+ *   |> filter(fn: (r) => r._measurement == "cpu")
+ *   |> filter(fn: (r) => r._field == "usage_user")
+ *   |> filter(fn: (r) => r.cpu == "cpu-total")
+ * }
+ * </pre>
+ * </p>
+ */
+public class InfluxDBInterpreter extends AbstractInterpreter {
+
+  private static final String INFLUXDB_API_URL_PROPERTY = "influxdb.url";
+  private static final String INFLUXDB_TOKEN_PROPERTY = "influxdb.token";
+  private static final String INFLUXDB_ORG_PROPERTY = "influxdb.org";
+  private static final String INFLUXDB_LOGLEVEL_PROPERTY = "influxdb.logLevel";
+
+  private static final String TABLE_MAGIC_TAG = "%table ";
+  private static final String WHITESPACE = " ";
+  private static final String NEWLINE = "\n";
+  private static final String TAB = "\t";
+  private static final String EMPTY_COLUMN_VALUE = "";
+
+  private volatile InfluxDBClient client;
+  private volatile QueryApi queryApi;
+
+  public InfluxDBInterpreter(Properties properties) {
+    super(properties);
+  }
+
+  @Override
+  public ZeppelinContext getZeppelinContext() {
+    return null;
+  }
+
+  @Override
+  protected InterpreterResult internalInterpret(String query, InterpreterContext context)
+      throws InterpreterException {
+
+    logger.debug("Run Flux command '{}'", query);
+    query = query.trim();
+
+    QueryApi queryService = getInfluxDBClient(context);
+
+    final int[] actualIndex = {-1};
+
+    AtomicReference<InterpreterResult> resultRef = new AtomicReference<>();
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+
+    StringBuilder result = new StringBuilder();
+    queryService.query(
+        query,
+
+        //process record
+        (cancellable, fluxRecord) -> {
+
+          Integer tableIndex = fluxRecord.getTable();
+          if (actualIndex[0] != tableIndex) {
+            result.append(NEWLINE);
+            result.append(TABLE_MAGIC_TAG);
+            actualIndex[0] = tableIndex;
+
+            //add column names to table header
+            StringJoiner joiner = new StringJoiner(TAB);
+            fluxRecord.getValues().keySet().forEach(c -> joiner.add(replaceReservedChars(c)));
+            result.append(joiner.toString());
+            result.append(NEWLINE);
+          }
+
+          StringJoiner rowsJoiner = new StringJoiner(TAB);
+          for (Object value : fluxRecord.getValues().values()) {
+            if (value == null) {
+              value = EMPTY_COLUMN_VALUE;
+            }
+            rowsJoiner.add(replaceReservedChars(value.toString()));
+          }
+          result.append(rowsJoiner.toString());
+          result.append(NEWLINE);
+        },
+
+        throwable -> {
+
+          logger.error(throwable.getMessage(), throwable);
+          resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR,
+              throwable.getMessage()));
+
+          countDownLatch.countDown();
+
+        }, () -> {
+          //on complete
+          InterpreterResult intpResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
+          intpResult.add(result.toString());
+          resultRef.set(intpResult);
+          countDownLatch.countDown();
+        }
+    );
+    try {
+      countDownLatch.await();
+    } catch (InterruptedException e) {
+      throw new InterpreterException(e);
+    }
+
+    return resultRef.get();
+  }
+
+
+  private QueryApi getInfluxDBClient(InterpreterContext context) {
+    if (queryApi == null) {
+      queryApi = this.client.getQueryApi();
+    }
+    return queryApi;
+  }
+
+
+  @Override
+  public void open() throws InterpreterException {
+
+    if (this.client == null) {
+      InfluxDBClientOptions opt = InfluxDBClientOptions.builder()
+          .url(getProperty(INFLUXDB_API_URL_PROPERTY))
+          .authenticateToken(getProperty(INFLUXDB_TOKEN_PROPERTY).toCharArray())
+          .logLevel(LogLevel.valueOf(
+              getProperty(INFLUXDB_LOGLEVEL_PROPERTY, LogLevel.NONE.toString())))
+          .org(getProperty(INFLUXDB_ORG_PROPERTY))
+          .build();
+
+      this.client = InfluxDBClientFactory.create(opt);
+    }
+  }
+
+  @Override
+  public void close() throws InterpreterException {
+    if (this.client != null) {
+      this.client.close();
+      this.client = null;
+    }
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+
+  }
+
+  @Override
+  public FormType getFormType() throws InterpreterException {
+    return FormType.SIMPLE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) throws InterpreterException {
+    return 0;
+  }
+
+  /**
+   * For %table response replace Tab and Newline.
+   */
+  private String replaceReservedChars(String str) {
+    if (str == null) {
+      return EMPTY_COLUMN_VALUE;
+    }
+    return str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE);
+  }
+
+}
diff --git a/influxdb/src/main/resources/interpreter-setting.json b/influxdb/src/main/resources/interpreter-setting.json
new file mode 100644
index 0000000..363b9ef
--- /dev/null
+++ b/influxdb/src/main/resources/interpreter-setting.json
@@ -0,0 +1,42 @@
+[
+  {
+    "group": "influxdb",
+    "name": "influxdb",
+    "className": "org.apache.zeppelin.influxdb.InfluxDBInterpreter",
+    "properties": {
+      "influxdb.url": {
+        "envName": null,
+        "propertyName": "influxdb.url",
+        "defaultValue": "http://localhost:9999",
+        "description": "The URL for InfluxDB 2.X API",
+        "type": "string"
+      },
+      "influxdb.token": {
+        "envName": null,
+        "propertyName": "influxdb.token",
+        "defaultValue": "my-token",
+        "description": "InfluxDB auth token",
+        "type": "password"
+      },
+      "influxdb.org": {
+        "envName": null,
+        "propertyName": "influxdb.org",
+        "defaultValue": "my-org",
+        "description": "InfluxDB org name",
+        "type": "string"
+      },
+      "influxdb.logLevel": {
+        "envName": null,
+        "propertyName": "influxdb.logLevel",
+        "defaultValue": "NONE",
+        "description": "InfluxDB http client library verbosity level (NONE, BASIC, HEADERS, BODY)",
+        "type": "string"
+      }
+    },
+    "editor": {
+      "language": "sql",
+      "editOnDblClick": false,
+      "completionSupport": false
+    }
+  }
+]
diff --git a/influxdb/src/test/java/org/apache/zeppelin/influxdb/InfluxDBInterpeterTest.java b/influxdb/src/test/java/org/apache/zeppelin/influxdb/InfluxDBInterpeterTest.java
new file mode 100644
index 0000000..3ed6385
--- /dev/null
+++ b/influxdb/src/test/java/org/apache/zeppelin/influxdb/InfluxDBInterpeterTest.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.zeppelin.influxdb;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nonnull;
+
+import com.influxdb.LogLevel;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
+import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class InfluxDBInterpeterTest {
+
+  Properties properties;
+
+  static final String SINGLE_TABLE_RESPONSE =
+      "#datatype,string,long,dateTime:RFC3339,double,string\n" +
+          "#group,false,false,false,false,true\n" +
+          "#default,_result,,,,\n" +
+          ",result,table,_time,_value,_field\n" +
+          ",,0,2020-01-24T10:23:56Z,12.114014251781473,usage_user\n" +
+          ",,0,2020-01-24T10:23:57Z,12.048493938257717,usage_user\n" +
+          ",,0,2020-01-24T10:24:06Z,12.715678919729932,usage_user\n" +
+          ",,0,2020-01-24T10:24:07Z,11.876484560570072,usage_user\n" +
+          ",,0,2020-01-24T10:24:16Z,10.044977511244378,usage_user\n" +
+          ",,0,2020-01-24T10:24:17Z,10.594702648675662,usage_user\n" +
+          ",,0,2020-01-24T10:24:26Z,12.092034512942353,usage_user\n" +
+          ",,0,2020-01-24T10:24:27Z,12.131065532766383,usage_user\n" +
+          ",,0,2020-01-24T10:24:36Z,14.332125452955141,usage_user\n" +
+          ",,0,2020-01-24T10:24:37Z,15.153788447111777,usage_user";
+
+  static final String MULTI_TABLE_RESPONSE =
+      "#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339," +
+          "string,string,string,string,double,dateTime:RFC3339\n" +
+          "#group,false,false,true,true,true,true,true,true,false,false\n" +
+          "#default,_result,,,,,,,,,\n" +
+          ",result,table,_start,_stop,_field,_measurement,cpu,host,_value,_time\n" +
+          ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," +
+          "cpu-total,macek.local,12.381414297598637,2020-01-24T09:28:00Z\n" +
+          ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," +
+          "cpu-total,macek.local,18.870254041431455,2020-01-24T09:29:00Z\n" +
+          ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," +
+          "cpu-total,macek.local,26.64080311971415,2020-01-24T09:30:00Z\n" +
+          ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," +
+          "cpu-total,macek.local,11.644120979499911,2020-01-24T09:31:00Z\n" +
+          ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," +
+          "cpu-total,macek.local,16.046354351571846,2020-01-24T09:32:00Z\n" +
+          ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," +
+          "macek.local,23.525686625686625,2020-01-24T09:28:00Z\n" +
+          ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," +
+          "macek.local,31.582258129037516,2020-01-24T09:29:00Z\n" +
+          ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," +
+          "macek.local,39.20349852756812,2020-01-24T09:30:00Z\n" +
+          ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," +
+          "macek.local,23.533275499942164,2020-01-24T09:31:00Z\n" +
+          ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," +
+          "macek.local,19.11247206247206,2020-01-24T09:32:00Z\n" +
+          ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," +
+          "macek.local,3.775801800801801,2020-01-24T09:28:00Z\n" +
+          ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," +
+          "macek.local,8.776226876226875,2020-01-24T09:29:00Z\n" +
+          ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," +
+          "macek.local,16.15592568092568,2020-01-24T09:30:00Z\n" +
+          ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," +
+          "macek.local,3.466367149700483,2020-01-24T09:31:00Z\n" +
+          ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," +
+          "macek.local,10.123511023511023,2020-01-24T09:32:00Z\n" +
+          ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," +
+          "macek.local,23.186861861861857,2020-01-24T09:28:00Z\n" +
+          ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," +
+          "macek.local,30.502449226101927,2020-01-24T09:29:00Z\n" +
+          ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," +
+          "macek.local,37.800263500263505,2020-01-24T09:30:00Z\n" +
+          ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," +
+          "macek.local,21.04487655320989,2020-01-24T09:31:00Z\n" +
+          ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," +
+          "macek.local,23.40988960155627,2020-01-24T09:32:00Z\n" +
+          ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," +
+          "macek.local,3.7013513513513514,2020-01-24T09:28:00Z\n" +
+          ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," +
+          "macek.local,8.669684156858507,2020-01-24T09:29:00Z\n" +
+          ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," +
+          "macek.local,16.4761093606771,2020-01-24T09:30:00Z\n" +
+          ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," +
+          "macek.local,3.416193908762379,2020-01-24T09:31:00Z\n" +
+          ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," +
+          "macek.local,10.391479708146376,2020-01-24T09:32:00Z\n" +
+          ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," +
+          "macek.local,20.520504495504497,2020-01-24T09:28:00Z\n" +
+          ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," +
+          "macek.local,28.435828535828534,2020-01-24T09:29:00Z\n" +
+          ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," +
+          "macek.local,35.76454396684968,2020-01-24T09:30:00Z\n" +
+          ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," +
+          "macek.local,18.94977031643698,2020-01-24T09:31:00Z\n" +
+          ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," +
+          "macek.local,22.81423008923009,2020-01-24T09:32:00Z\n" +
+          ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," +
+          "macek.local,3.4502771752771753,2020-01-24T09:28:00Z\n" +
+          ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," +
+          "macek.local,8.617365310885685,2020-01-24T09:29:00Z\n" +
+          ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," +
+          "macek.local,16.5813353653174,2020-01-24T09:30:00Z\n" +
+          ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," +
+          "macek.local,3.341634649967983,2020-01-24T09:31:00Z\n" +
+          ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," +
+          "macek.local,10.489286880953548,2020-01-24T09:32:00Z\n" +
+          ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," +
+          "macek.local,17.42073857073857,2020-01-24T09:28:00Z\n" +
+          ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," +
+          "macek.local,25.555054526024517,2020-01-24T09:29:00Z\n" +
+          ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," +
+          "macek.local,34.19774496441163,2020-01-24T09:30:00Z\n" +
+          ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," +
+          "macek.local,15.985298393631725,2020-01-24T09:31:00Z\n" +
+          ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," +
+          "macek.local,21.359203467536798,2020-01-24T09:32:00Z\n" +
+          ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," +
+          "macek.local,3.4507517507517504,2020-01-24T09:28:00Z\n" +
+          ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," +
+          "macek.local,8.817554700888033,2020-01-24T09:29:00Z\n" +
+          ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," +
+          "macek.local,16.957243048909714,2020-01-24T09:30:00Z\n" +
+          ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," +
+          "macek.local,3.408601950268617,2020-01-24T09:31:00Z\n" +
+          ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," +
+          "macek.local,10.672760839427506,2020-01-24T09:32:00Z";
+
+  protected MockWebServer mockServer;
+
+  /**
+   * Start Mock server.
+   *
+   * @return the mock server URL
+   */
+  @Nonnull
+  protected String startMockServer() {
+
+    mockServer = new MockWebServer();
+    try {
+      mockServer.start();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return mockServer.url("/").url().toString();
+  }
+
+
+  @Nonnull
+  protected MockResponse createResponse(final String data) {
+    return createResponse(data, "text/csv", true);
+  }
+
+  @Nonnull
+  protected MockResponse createResponse(final String data, final String
+      contentType, final boolean chunked) {
+
+    MockResponse response = new MockResponse()
+        .setHeader("Content-Type", contentType + "; charset=utf-8")
+        .setHeader("Date", "Tue, 26 Jun 2018 13:15:01 GMT");
+
+    if (chunked) {
+      response.setChunkedBody(data, data.length());
+    } else {
+      response.setBody(data);
+    }
+
+    return response;
+  }
+
+
+  @Before
+  public void before() throws InterpreterException {
+    //properties for local influxdb2 server
+    properties = new Properties();
+    //properties.setProperty("influxdb.url", "http://localhost:9999");
+    properties.setProperty("influxdb.url", startMockServer());
+
+    properties.setProperty("influxdb.token", "my-token");
+    properties.setProperty("influxdb.org", "my-org");
+    properties.setProperty("influxdb.logLevel", LogLevel.BODY.toString());
+  }
+
+  @After
+  public void after() throws IOException {
+    if (mockServer != null) {
+      mockServer.shutdown();
+    }
+  }
+
+  @Test
+  public void testSigleTable() throws InterpreterException {
+
+    InfluxDBInterpreter t = new InfluxDBInterpreter(properties);
+    t.open();
+
+    //just for testing with real influxdb (not used in mock)
+    String flux = "from(bucket: \"my-bucket\")\n" +
+        "  |> range(start:-1m)\n" +
+        "  |> filter(fn: (r) => r._measurement == \"cpu\")\n" +
+        "  |> filter(fn: (r) => r._field == \"usage_user\")\n" +
+        "  |> filter(fn: (r) => r.cpu == \"cpu-total\")\n" +
+        "  |> limit(n:5, offset: 0)" +
+        "  |> keep(columns: [\"_field\", \"_value\", \"_time\"])";
+
+    InterpreterContext context = InterpreterContext.builder()
+        .setAuthenticationInfo(new AuthenticationInfo("testUser"))
+        .build();
+
+    mockServer.enqueue(createResponse(SINGLE_TABLE_RESPONSE));
+
+    InterpreterResult interpreterResult = t.interpret(flux, context);
+
+    // if prefix not found return ERROR and Prefix not found.
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+
+    List<InterpreterResultMessage> message = interpreterResult.message();
+    Assert.assertEquals(1, message.size());
+    Assert.assertEquals(InterpreterResult.Type.TABLE, message.get(0).getType());
+    Assert.assertEquals("result\ttable\t_time\t_value\t_field\n" +
+            "_result\t0\t2020-01-24T10:23:56Z\t12.114014251781473\tusage_user\n" +
+            "_result\t0\t2020-01-24T10:23:57Z\t12.048493938257717\tusage_user\n" +
+            "_result\t0\t2020-01-24T10:24:06Z\t12.715678919729932\tusage_user\n" +
+            "_result\t0\t2020-01-24T10:24:07Z\t11.876484560570072\tusage_user\n" +
+            "_result\t0\t2020-01-24T10:24:16Z\t10.044977511244378\tusage_user\n" +
+            "_result\t0\t2020-01-24T10:24:17Z\t10.594702648675662\tusage_user\n" +
+            "_result\t0\t2020-01-24T10:24:26Z\t12.092034512942353\tusage_user\n" +
+            "_result\t0\t2020-01-24T10:24:27Z\t12.131065532766383\tusage_user\n" +
+            "_result\t0\t2020-01-24T10:24:36Z\t14.332125452955141\tusage_user\n" +
+            "_result\t0\t2020-01-24T10:24:37Z\t15.153788447111777\tusage_user\n",
+        message.get(0).getData());
+
+    t.close();
+  }
+
+  @Test
+  public void testMultiTable() throws InterpreterException {
+
+    InfluxDBInterpreter t = new InfluxDBInterpreter(properties);
+    t.open();
+
+    //just for testing with real influxdb (not used in mock)
+    String flux = "from(bucket: \"my-bucket\")\n" +
+        "  |> range(start: -1h)\n" +
+        "  |> filter(fn: (r) => r._measurement == \"cpu\")\n" +
+        "  |> filter(fn: (r) => r._field == \"usage_user\")\n" +
+        "  |> aggregateWindow(every: 1m, fn: mean)\n" +
+        "  |> limit(n:5, offset: 0)";
+
+    InterpreterContext context = InterpreterContext.builder()
+        .setAuthenticationInfo(new AuthenticationInfo("testUser"))
+        .build();
+
+    mockServer.enqueue(createResponse(MULTI_TABLE_RESPONSE));
+    InterpreterResult interpreterResult = t.interpret(flux, context);
+
+    // if prefix not found return ERROR and Prefix not found.
+    if (InterpreterResult.Code.ERROR.equals(interpreterResult.code())) {
+      Assert.fail(interpreterResult.toString());
+    }
+
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+    List<InterpreterResultMessage> message = interpreterResult.message();
+
+    Assert.assertEquals(9, message.size());
+
+    message.forEach(m -> Assert.assertEquals(InterpreterResult.Type.TABLE, m.getType()));
+
+    Assert.assertEquals(
+        "result\ttable\t_start\t_stop\t_field\t_measurement\tcpu\thost\t_value\t_time\n" +
+        "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+        "\tcpu\tcpu-total\tmacek.local\t12.381414297598637\t2020-01-24T09:28:00Z\n" +
+        "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+            "\tcpu\tcpu-total\tmacek.local\t18.870254041431455\t2020-01-24T09:29:00Z\n" +
+        "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+            "\tcpu\tcpu-total\tmacek.local\t26.64080311971415\t2020-01-24T09:30:00Z\n" +
+        "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+            "\tcpu\tcpu-total\tmacek.local\t11.644120979499911\t2020-01-24T09:31:00Z\n" +
+        "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+            "\tcpu\tcpu-total\tmacek.local\t16.046354351571846\t2020-01-24T09:32:00Z\n",
+        message.get(0).getData());
+
+    Assert.assertEquals("result\ttable\t_start\t_stop\t_field\t_measurement\tcpu\thost\t_value" +
+        "\t_time\n" +
+        "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+        "\tcpu\tcpu7\tmacek.local\t3.4507517507517504\t2020-01-24T09:28:00Z\n" +
+        "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+        "\tcpu\tcpu7\tmacek.local\t8.817554700888033\t2020-01-24T09:29:00Z\n" +
+        "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+        "\tcpu\tcpu7\tmacek.local\t16.957243048909714\t2020-01-24T09:30:00Z\n" +
+        "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+        "\tcpu\tcpu7\tmacek.local\t3.408601950268617\t2020-01-24T09:31:00Z\n" +
+        "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" +
+        "\tcpu\tcpu7\tmacek.local\t10.672760839427506\t2020-01-24T09:32:00Z\n",
+        message.get(8).getData());
+
+    t.close();
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index 8c358f3..6e6e232 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
     <module>file</module>
     <module>flink</module>
     <module>ignite</module>
+    <module>influxdb</module>
     <module>kylin</module>
     <module>python</module>
     <module>lens</module>
diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE
index 386398e..5a448ac 100644
--- a/zeppelin-distribution/src/bin_license/LICENSE
+++ b/zeppelin-distribution/src/bin_license/LICENSE
@@ -136,6 +136,7 @@ The following components are provided under Apache License.
     (Apache 2.0) Google Guice - Core Library (com.google.inject:guice:3.0 - http://code.google.com/p/google-guice/guice/)
     (Apache 2.0) OkHttp (com.squareup.okhttp:okhttp:2.5.0 - https://github.com/square/okhttp/okhttp)
     (Apache 2.0) Okio (com.squareup.okio:okio:1.6.0 - https://github.com/square/okio/okio)
+    (Apache 2.0) OkHttp mockwebserver (com.squareup.okhttp3:mockwebserver:3.13.1) - https://github.com/square/okhttp/blob/master/LICENSE.txt
     (Apache 2.0) config (com.typesafe:config:1.2.1 - https://github.com/typesafehub/config)
     (Apache 2.0) akka-actor (com.typesafe.akka:akka-actor_2.10:2.3.7 - http://akka.io/)
     (Apache 2.0) akka-remote (com.typesafe.akka:akka-remote_2.10:2.3.7 - http://akka.io/)
@@ -221,6 +222,7 @@ The following components are provided under Apache License.
     (Apache 2.0) mongo-java-driver 3.4.1 (org.mongodb:mongo-java-driver:3.4.1) - https://github.com/mongodb/mongo-java-driver/blob/master/LICENSE.txt
     (Apache 2.0) Neo4j Java Driver (https://github.com/neo4j/neo4j-java-driver) - https://github.com/neo4j/neo4j-java-driver/blob/1.4.3/LICENSE.txt
     (Apache 2.0) Hazelcast Jet (http://jet.hazelcast.org) - https://github.com/hazelcast/hazelcast-jet/blob/master/LICENSE
+    (Apache 2.0) RxJava (io.reactivex.rxjava2:rxjava:2.2.17) - https://github.com/ReactiveX/RxJava/blob/2.x/LICENSE
 
 ========================================================================
 MIT licenses
@@ -278,6 +280,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
     (The MIT License) angular-viewport-watch 0.135 (https://github.com/wix/angular-viewport-watch) - https://github.com/wix/angular-viewport-watch/blob/master/LICENSE
     (The MIT License) ansi-up 2.0.2 (https://github.com/drudru/ansi_up) - https://github.com/drudru/ansi_up#license
     (The MIT License) bcpkix-jdk15on 1.60 (org.bouncycastle:bcpkix-jdk15on:1.60 https://github.com/bcgit/bc-java) - https://github.com/bcgit/bc-java/blob/master/LICENSE.html
+    (The MIT License) influxdb-client-java 1.4.0 (com.influxdb:influxdb-client-java:1.4.0 https://github.com/influxdata/influxdb-client-java) - https://github.com/influxdata/influxdb-client-java/blob/master/LICENSE
 
 ========================================================================
 BSD-style licenses
diff --git a/zeppelin-distribution/src/bin_license/licenses/LICENSE-influxdb b/zeppelin-distribution/src/bin_license/licenses/LICENSE-influxdb
new file mode 100644
index 0000000..75f97cf
--- /dev/null
+++ b/zeppelin-distribution/src/bin_license/licenses/LICENSE-influxdb
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2018 Influxdata, Inc.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file