You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/02/24 13:48:20 UTC

nifi git commit: NIFI-4289 - InfluxDB Put processor

Repository: nifi
Updated Branches:
  refs/heads/master 62732cbb8 -> f7fe2da10


NIFI-4289 - InfluxDB Put processor

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2101.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f7fe2da1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f7fe2da1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f7fe2da1

Branch: refs/heads/master
Commit: f7fe2da106d0110841796e5c0f59c078f40cefae
Parents: 62732cb
Author: mans2singh <ma...@yahoo.com>
Authored: Sat Aug 19 16:06:36 2017 -0700
Committer: Pierre Villard <pi...@gmail.com>
Committed: Sat Feb 24 14:46:35 2018 +0100

----------------------------------------------------------------------
 nifi-assembly/LICENSE                           |  27 +-
 nifi-assembly/pom.xml                           |   5 +
 .../nifi-influxdb-nar/pom.xml                   |  44 +++
 .../src/main/resources/META-INF/LICENSE         | 231 ++++++++++++
 .../src/main/resources/META-INF/NOTICE          |  17 +
 .../nifi-influxdb-processors/pom.xml            |  66 ++++
 .../influxdb/AbstractInfluxDBProcessor.java     | 160 ++++++++
 .../nifi/processors/influxdb/PutInfluxDB.java   | 205 +++++++++++
 .../org.apache.nifi.processor.Processor         |  15 +
 .../nifi/processors/influxdb/ITPutInfluxDB.java | 221 +++++++++++
 .../processors/influxdb/TestPutInfluxDB.java    | 364 +++++++++++++++++++
 nifi-nar-bundles/nifi-influxdb-bundle/pom.xml   |  43 +++
 nifi-nar-bundles/pom.xml                        |   1 +
 pom.xml                                         |  11 +
 14 files changed, 1409 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-assembly/LICENSE
----------------------------------------------------------------------
diff --git a/nifi-assembly/LICENSE b/nifi-assembly/LICENSE
index 69a03dc..d485c65 100644
--- a/nifi-assembly/LICENSE
+++ b/nifi-assembly/LICENSE
@@ -2193,4 +2193,29 @@ style license.
       PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
       LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
       NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-      SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
+      SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+  The binary distribution of this product bundles 'influxdb-java' under an MIT 
+    style license.
+
+    Copyright (c) 2014-2017 Stefan Majer
+    
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"), to deal
+    in the Software without restriction, including without limitation the rights
+    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+    copies of the Software, and to permit persons to whom the Software is
+    furnished to do so, subject to the following conditions:
+    
+    The above copyright notice and this permission notice shall be included in all
+    copies or substantial portions of the Software.
+    
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+    SOFTWARE.
+
+      
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index d576261..89c760c 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -323,6 +323,11 @@ language governing permissions and limitations under the License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-influxdb-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-avro-nar</artifactId>
             <type>nar</type>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/pom.xml b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/pom.xml
new file mode 100644
index 0000000..2782992
--- /dev/null
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-influxdb-bundle</artifactId>
+        <version>1.6.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-influxdb-nar</artifactId>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-influxdb-processors</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..37b7cf7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,231 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses. 
+
+This product bundles 'influxdb-java' which is available under an MIT license.
+
+    Copyright (c) 2014-2017 Stefan Majer
+    
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"), to deal
+    in the Software without restriction, including without limitation the rights
+    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+    copies of the Software, and to permit persons to whom the Software is
+    furnished to do so, subject to the following conditions:
+    
+    The above copyright notice and this permission notice shall be included in all
+    copies or substantial portions of the Software.
+    
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+    SOFTWARE.

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..9dbfdd3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,17 @@
+nifi-influxdb-nar
+Copyright 2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml
new file mode 100644
index 0000000..f678921
--- /dev/null
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-influxdb-bundle</artifactId>
+        <version>1.6.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-influxdb-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+	    <dependency>
+	        <groupId>org.influxdb</groupId>
+  	        <artifactId>influxdb-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java
new file mode 100644
index 0000000..13838de
--- /dev/null
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+
+import okhttp3.OkHttpClient;
+import okhttp3.OkHttpClient.Builder;
+
+/**
+ * Abstract base class for InfluxDB processors
+ */
+abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
+
+    protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("influxdb-charset")
+            .displayName("Character Set")
+            .description("Specifies the character set of the document data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INFLUX_DB_URL = new PropertyDescriptor.Builder()
+            .name("influxdb-url")
+            .displayName("InfluxDB connection URL")
+            .description("InfluxDB URL to connect to. Eg: http://influxdb:8086")
+            .defaultValue("http://localhost:8086")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor INFLUX_DB_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("InfluxDB Max Connection Time Out (seconds)")
+            .description("The maximum time for establishing connection to the InfluxDB")
+            .defaultValue("0 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
+            .name("influxdb-dbname")
+            .displayName("Database Name")
+            .description("InfluxDB database to connect to")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("influxdb-username")
+            .displayName("Username")
+            .required(false)
+            .description("Username for accessing InfluxDB")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("influxdb-password")
+            .displayName("Password")
+            .required(false)
+            .description("Password for user")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    protected static final PropertyDescriptor MAX_RECORDS_SIZE = new PropertyDescriptor.Builder()
+            .name("influxdb-max-records-size")
+            .displayName("Max size of records")
+            .description("Maximum size of records allowed to be posted in one batch")
+            .expressionLanguageSupported(true)
+            .defaultValue("1 MB")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final String INFLUX_DB_ERROR_MESSAGE = "influxdb.error.message";
+
+    protected AtomicReference<InfluxDB> influxDB = new AtomicReference<>();
+    protected long maxRecordsSize;
+
+    /**
+     * Helper method to create InfluxDB instance
+     * @return InfluxDB instance
+     */
+    protected synchronized InfluxDB getInfluxDB(ProcessContext context) {
+        if ( influxDB.get() == null ) {
+            String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+            String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+            long connectionTimeout = context.getProperty(INFLUX_DB_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS);
+            String influxDbUrl = context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue();
+
+            try {
+                influxDB.set(makeConnection(username, password, influxDbUrl, connectionTimeout));
+            } catch(Exception e) {
+                getLogger().error("Error while getting connection {}", new Object[] { e.getLocalizedMessage() },e);
+                throw new RuntimeException("Error while getting connection" + e.getLocalizedMessage(),e);
+            }
+            getLogger().info("InfluxDB connection created for host {}",
+                    new Object[] {influxDbUrl});
+        }
+        return influxDB.get();
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordsSize = context.getProperty(MAX_RECORDS_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue();
+    }
+
+    protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
+        Builder builder = new OkHttpClient.Builder().connectTimeout(connectionTimeout, TimeUnit.SECONDS);
+        if ( StringUtils.isBlank(username) || StringUtils.isBlank(password) ) {
+            return InfluxDBFactory.connect(influxDbUrl, builder);
+        } else {
+            return InfluxDBFactory.connect(influxDbUrl, username, password, builder);
+        }
+    }
+
+    @OnStopped
+    public void close() {
+        if (getLogger().isDebugEnabled()) {
+            getLogger().info("Closing connection");
+        }
+        if ( influxDB.get() != null ) {
+            influxDB.get().close();
+            influxDB.set(null);;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java
new file mode 100644
index 0000000..ed45025
--- /dev/null
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBIOException;
+
+import java.io.ByteArrayOutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement","insert", "write", "put", "timeseries"})
+@CapabilityDescription("Processor to write the content of a FlowFile in 'line protocol'.  Please check details of the 'line protocol' in InfluxDB documentation (https://www.influxdb.com/). "
+        + "  The flow file can contain single measurement point or multiple measurement points separated by line seperator.  The timestamp (last field) should be in nano-seconds resolution.")
+@WritesAttributes({
+    @WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"),
+    })
+public class PutInfluxDB extends AbstractInfluxDBProcessor {
+
+    public static AllowableValue CONSISTENCY_LEVEL_ALL = new AllowableValue("ALL", "All", "Return success when all nodes have responded with write success");
+    public static AllowableValue CONSISTENCY_LEVEL_ANY = new AllowableValue("ANY", "Any", "Return success when any nodes have responded with write success");
+    public static AllowableValue CONSISTENCY_LEVEL_ONE = new AllowableValue("ONE", "One", "Return success when one node has responded with write success");
+    public static AllowableValue CONSISTENCY_LEVEL_QUORUM = new AllowableValue("QUORUM", "Quorum", "Return success when a majority of nodes have responded with write success");
+
+    public static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder()
+            .name("influxdb-consistency-level")
+            .displayName("Consistency Level")
+            .description("InfluxDB consistency level")
+            .required(true)
+            .defaultValue(CONSISTENCY_LEVEL_ONE.getValue())
+            .expressionLanguageSupported(true)
+            .allowableValues(CONSISTENCY_LEVEL_ONE, CONSISTENCY_LEVEL_ANY, CONSISTENCY_LEVEL_ALL, CONSISTENCY_LEVEL_QUORUM)
+            .build();
+
+    public static final PropertyDescriptor RETENTION_POLICY = new PropertyDescriptor.Builder()
+            .name("influxdb-retention-policy")
+            .displayName("Retention Policy")
+            .description("Retention policy for the saving the records")
+            .defaultValue("autogen")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+            .description("Successful FlowFiles that are saved to InfluxDB are routed to this relationship").build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+            .description("FlowFiles were not saved to InfluxDB are routed to this relationship").build();
+
+    static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
+            .description("FlowFiles were not saved to InfluxDB due to retryable exception are routed to this relationship").build();
+
+    static final Relationship REL_MAX_SIZE_EXCEEDED = new Relationship.Builder().name("failure-max-size")
+            .description("FlowFiles exceeding max records size are routed to this relationship").build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    static {
+        final Set<Relationship> tempRelationships = new HashSet<>();
+        tempRelationships.add(REL_SUCCESS);
+        tempRelationships.add(REL_FAILURE);
+        tempRelationships.add(REL_RETRY);
+        tempRelationships.add(REL_MAX_SIZE_EXCEEDED);
+        relationships = Collections.unmodifiableSet(tempRelationships);
+
+        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
+        tempDescriptors.add(DB_NAME);
+        tempDescriptors.add(INFLUX_DB_URL);
+        tempDescriptors.add(INFLUX_DB_CONNECTION_TIMEOUT);
+        tempDescriptors.add(USERNAME);
+        tempDescriptors.add(PASSWORD);
+        tempDescriptors.add(CHARSET);
+        tempDescriptors.add(CONSISTENCY_LEVEL);
+        tempDescriptors.add(RETENTION_POLICY);
+        tempDescriptors.add(MAX_RECORDS_SIZE);
+        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if ( flowFile.getSize() == 0) {
+            getLogger().error("Empty measurements");
+            flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, "Empty measurement size " + flowFile.getSize());
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        if ( flowFile.getSize() > maxRecordsSize) {
+            getLogger().error("Message size of records exceeded {} max allowed is {}", new Object[] { flowFile.getSize(), maxRecordsSize});
+            flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, "Max records size exceeded " + flowFile.getSize());
+            session.transfer(flowFile, REL_MAX_SIZE_EXCEEDED);
+            return;
+        }
+
+        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+        String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).evaluateAttributeExpressions(flowFile).getValue();
+        String database = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        String retentionPolicy = context.getProperty(RETENTION_POLICY).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            long startTimeMillis = System.currentTimeMillis();
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            session.exportTo(flowFile, baos);
+            String records = new String(baos.toByteArray(), charset);
+
+            writeToInfluxDB(context, consistencyLevel, database, retentionPolicy, records);
+
+            final long endTimeMillis = System.currentTimeMillis();
+            getLogger().debug("Records {} inserted", new Object[] {records});
+
+            session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().send(flowFile,
+                    new StringBuilder("influxdb://").append(context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue()).append("/").append(database).toString(),
+                    (endTimeMillis - startTimeMillis));
+        } catch (InfluxDBIOException exception) {
+            flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, String.valueOf(exception.getMessage()));
+            if ( exception.getCause() instanceof SocketTimeoutException ) {
+                getLogger().error("Failed to insert into influxDB due SocketTimeoutException to {} and retrying",
+                        new Object[]{exception.getLocalizedMessage()}, exception);
+                session.transfer(flowFile, REL_RETRY);
+            } else {
+                getLogger().error("Failed to insert into influxDB due to {}",
+                        new Object[]{exception.getLocalizedMessage()}, exception);
+                session.transfer(flowFile, REL_FAILURE);
+            }
+            context.yield();
+        } catch (Exception exception) {
+            getLogger().error("Failed to insert into influxDB due to {}",
+                    new Object[]{exception.getLocalizedMessage()}, exception);
+            flowFile = session.putAttribute(flowFile, INFLUX_DB_ERROR_MESSAGE, String.valueOf(exception.getMessage()));
+            session.transfer(flowFile, REL_FAILURE);
+            context.yield();
+        }
+    }
+
+    protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy, String records) {
+        getInfluxDB(context).write(database, retentionPolicy, InfluxDB.ConsistencyLevel.valueOf(consistencyLevel), records);
+    }
+
+    @OnStopped
+    public void close() {
+        super.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..008a00a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.influxdb.PutInfluxDB

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java
new file mode 100644
index 0000000..8db743d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITPutInfluxDB.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import static org.junit.Assert.assertEquals;
+import java.util.List;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Integration test for InfluxDB. Please ensure that the InfluxDB is running
+ * on local host with default port and has database test with table test. Please set user
+ * and password if applicable before running the integration tests.
+ */
+public class ITPutInfluxDB {
+
+    private TestRunner runner;
+    private InfluxDB influxDB;
+    private String dbName = "test";
+    private String dbUrl = "http://localhost:8086";
+    private String user = "admin";
+    private String password = "admin";
+
+    @Before
+    public void setUp() throws Exception {
+        runner = TestRunners.newTestRunner(PutInfluxDB.class);
+        runner.setProperty(PutInfluxDB.DB_NAME, dbName);
+        runner.setProperty(PutInfluxDB.USERNAME, user);
+        runner.setProperty(PutInfluxDB.PASSWORD, password);
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, dbUrl);
+        runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
+        runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
+        runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
+        runner.assertValid();
+        influxDB = InfluxDBFactory.connect(dbUrl,user,password);
+        if ( influxDB.databaseExists(dbName) ) {
+            QueryResult result = influxDB.query(new Query("DROP measurement water", dbName));
+            checkError(result);
+            result = influxDB.query(new Query("DROP measurement testm", dbName));
+            checkError(result);
+            result = influxDB.query(new Query("DROP database " + dbName, dbName));
+            Thread.sleep(1000);
+        }
+        influxDB.createDatabase(dbName);
+        int max = 10;
+        while (!influxDB.databaseExists(dbName) && (max-- < 0)) {
+            Thread.sleep(5);
+        }
+        if ( ! influxDB.databaseExists(dbName) ) {
+            throw new Exception("unable to create database " + dbName);
+        }
+    }
+
+    protected void checkError(QueryResult result) {
+        if ( result.hasError() ) {
+            throw new IllegalStateException("Error while dropping measurements " + result.getError());
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        runner = null;
+        if ( influxDB != null ) {
+            influxDB.close();
+        }
+    }
+
+    @Test
+    public void testValidSinglePoint() {
+        String message = "water,country=US,city=newark rain=1,humidity=0.6";
+        byte [] bytes = message.getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
+        assertEquals("Value should be equal", 1, flowFiles.size());
+        assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE));
+        QueryResult result = influxDB.query(new Query("select * from water", dbName));
+        assertEquals("size should be same", 1, result.getResults().iterator().next().getSeries().size());
+        List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
+        assertEquals("size should be same", 1, values.size());
+   }
+
+    @Test
+    public void testValidSinglePointWithTime() {
+        QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
+        assertEquals("Should have no results", null, result.getResults().iterator().next().getSeries());
+        String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652";
+        byte [] bytes = message.getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
+        assertEquals("Value should be equal", 1, flowFiles.size());
+        assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE));
+        result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
+        assertEquals("size should be same", 1, result.getResults().iterator().next().getSeries().size());
+        List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
+        assertEquals("size should be same", 1, values.size());
+    }
+
+    @Test
+    public void testValidSinglePointWithTimeAndUrlExpression() {
+        runner.setVariable("influxDBUrl", "http://localhost:8086");
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "${influxDBUrl}");
+        QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
+        assertEquals("Should have no results", null, result.getResults().iterator().next().getSeries());
+        String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652";
+        byte [] bytes = message.getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
+        assertEquals("Value should be equal", 1, flowFiles.size());
+        assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE));
+        result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
+        assertEquals("size should be same", 1, result.getResults().iterator().next().getSeries().size());
+        List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
+        assertEquals("size should be same", 1, values.size());
+   }
+
+    @Test
+    public void testValidSinglePointWithUsernameEL() {
+        runner.setVariable("influxdb.username", "admin");
+        runner.setProperty(PutInfluxDB.USERNAME, "${influxdb.username}");
+        QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
+        assertEquals("Should have no results", null, result.getResults().iterator().next().getSeries());
+        String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652";
+        byte [] bytes = message.getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
+   }
+
+    @Test
+    public void testValidSinglePointWithPasswordEL() {
+        runner.setVariable("influxdb.password", "admin");
+        runner.setProperty(PutInfluxDB.PASSWORD, "${influxdb.password}");
+        QueryResult result = influxDB.query(new Query("select * from water where time = 1501002274856668652", dbName));
+        assertEquals("Should have no results", null, result.getResults().iterator().next().getSeries());
+        String message = "water,country=US,city=sf rain=1,humidity=0.6 1501002274856668652";
+        byte [] bytes = message.getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
+   }
+
+   @Test
+    public void testValidTwoPointWithSameMeasurement() {
+        String message = "water,country=US,city=newark rain=1,humidity=0.6" + System.lineSeparator()
+                + "water,country=US,city=nyc rain=2,humidity=0.7" + System.lineSeparator();
+        byte [] bytes = message.getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
+        assertEquals("Value should be equal", 1, flowFiles.size());
+        assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE));
+        QueryResult result = influxDB.query(new Query("select * from water", dbName));
+        assertEquals("size should be same", 1, result.getResults().iterator().next().getSeries().size());
+        List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
+        assertEquals("size should be same", 2, values.size());
+    }
+
+    @Test
+    public void testValidTwoPointWithSameMeasurementBadFormat() {
+        String message = "water,country=US,city=newark rain=1,humidity=0.6" + System.lineSeparator()
+                + "water,country=US,city=nyc,rain=2,humidity=0.7" + System.lineSeparator();
+        byte [] bytes = message.getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
+        assertEquals("Value should be equal", 1, flowFiles.size());
+        assertEquals("Value should be equal","{\"error\":\"partial write: unable to parse 'water,country=US,city=nyc,rain=2,humidity=0.7': missing fields dropped=0\"}\n",
+            flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE));
+        QueryResult result = influxDB.query(new Query("select * from water", dbName));
+        assertEquals("size should be same", 1, result.getResults().iterator().next().getSeries().size());
+        List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
+        assertEquals("size should be same", 1, values.size());
+    }
+
+    @Test
+    public void testValidTwoPointWithDifferentMeasurement() {
+        String message = "water,country=US,city=newark rain=1,humidity=0.6" + System.lineSeparator()
+                + "testm,country=US,city=chicago rain=10,humidity=0.9" + System.lineSeparator();
+        byte [] bytes = message.getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
+        assertEquals("Value should be equal", 1, flowFiles.size());
+        assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE));
+        QueryResult result = influxDB.query(new Query("select * from water, testm", dbName));
+        assertEquals("size should be same", 2, result.getResults().iterator().next().getSeries().size());
+        List<List<Object>> values = result.getResults().iterator().next().getSeries().iterator().next().getValues();
+        assertEquals("size should be same", 1, values.size());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestPutInfluxDB.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestPutInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestPutInfluxDB.java
new file mode 100644
index 0000000..c14769f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/TestPutInfluxDB.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.EOFException;
+import java.net.SocketTimeoutException;
+import java.util.List;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBIOException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPutInfluxDB {
+    private TestRunner runner;
+    private PutInfluxDB mockPutInfluxDB;
+
+    @Before
+    public void setUp() throws Exception {
+        mockPutInfluxDB = new PutInfluxDB() {
+            @Override
+            protected InfluxDB makeConnection(String username, String password, String influxDbUrl, long connectionTimeout) {
+                return null;
+            }
+
+            @Override
+            protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
+                    String records) {
+            }
+        };
+        runner = TestRunners.newTestRunner(mockPutInfluxDB);
+        runner.setProperty(PutInfluxDB.DB_NAME, "test");
+        runner.setProperty(PutInfluxDB.USERNAME, "user");
+        runner.setProperty(PutInfluxDB.PASSWORD, "password");
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
+        runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
+        runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
+        runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
+        runner.assertValid();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        runner = null;
+    }
+
+    @Test
+    public void testDefaultValid() {
+        runner.assertValid();
+    }
+
+    @Test
+    public void testBlankDBUrl() {
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testEmptyDBName() {
+        runner.setProperty(PutInfluxDB.DB_NAME, "");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testEmptyConnectionTimeout() {
+        runner.setProperty(PutInfluxDB.INFLUX_DB_CONNECTION_TIMEOUT, "");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testEmptyUsername() {
+        runner = TestRunners.newTestRunner(mockPutInfluxDB);
+        runner.setProperty(PutInfluxDB.DB_NAME, "test");
+        runner.setProperty(PutInfluxDB.PASSWORD, "password");
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
+        runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
+        runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
+        runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
+        runner.assertValid();
+        runner.setProperty(PutInfluxDB.USERNAME, "");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testEmptyPassword() {
+        runner = TestRunners.newTestRunner(mockPutInfluxDB);
+        runner.setProperty(PutInfluxDB.DB_NAME, "test");
+        runner.setProperty(PutInfluxDB.USERNAME, "username");
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
+        runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
+        runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
+        runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
+        runner.assertValid();
+        runner.setProperty(PutInfluxDB.PASSWORD, "");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testPasswordEL() {
+        runner = TestRunners.newTestRunner(mockPutInfluxDB);
+        runner.setVariable("influxdb.password", "password");
+        runner.setProperty(PutInfluxDB.PASSWORD, "${influxdb.password}");
+        runner.setProperty(PutInfluxDB.DB_NAME, "test");
+        runner.setProperty(PutInfluxDB.USERNAME, "username");
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
+        runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
+        runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
+        runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testUsernameEL() {
+        runner = TestRunners.newTestRunner(mockPutInfluxDB);
+        runner.setVariable("influxdb.username", "username");
+        runner.setProperty(PutInfluxDB.PASSWORD, "password");
+        runner.setProperty(PutInfluxDB.DB_NAME, "test");
+        runner.setProperty(PutInfluxDB.USERNAME, "${influxdb.username}");
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
+        runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
+        runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
+        runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testCharsetUTF8() {
+        runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testEmptyConsistencyLevel() {
+        runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL, "");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testCharsetBlank() {
+        runner.setProperty(PutInfluxDB.CHARSET, "");
+        runner.assertNotValid();
+    }
+    @Test
+    public void testZeroMaxDocumentSize() {
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "0");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testSizeGreaterThanThresholdUsingEL() {
+        runner.setVariable("max.record.size", "1 B");
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "${max.record.size}");
+        runner.assertValid();
+        byte [] bytes = new byte[2];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_MAX_SIZE_EXCEEDED, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_MAX_SIZE_EXCEEDED);
+        assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"Max records size exceeded " + bytes.length);
+    }
+
+    @Test
+    public void testSizeGreaterThanThreshold() {
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 B");
+        runner.assertValid();
+        byte [] bytes = new byte[2];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_MAX_SIZE_EXCEEDED, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_MAX_SIZE_EXCEEDED);
+        assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"Max records size exceeded " + bytes.length);
+    }
+
+    @Test
+    public void testValidSingleMeasurement() {
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 MB");
+        runner.assertValid();
+        byte [] bytes = "test".getBytes();
+
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
+
+        assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), null);
+    }
+
+    @Test
+    public void testWriteThrowsException() {
+        mockPutInfluxDB = new PutInfluxDB() {
+            @Override
+            protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
+                    String records) {
+                throw new RuntimeException("WriteException");
+            }
+        };
+        runner = TestRunners.newTestRunner(mockPutInfluxDB);
+        runner.setProperty(PutInfluxDB.DB_NAME, "test");
+        runner.setProperty(PutInfluxDB.USERNAME, "u1");
+        runner.setProperty(PutInfluxDB.PASSWORD, "p1");
+        runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
+        runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
+        runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
+        runner.assertValid();
+
+        byte [] bytes = "test".getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
+
+        assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"WriteException");
+    }
+
+    @Test
+    public void testWriteThrowsIOException() {
+        mockPutInfluxDB = new PutInfluxDB() {
+            @Override
+            protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
+                    String records) {
+                throw new InfluxDBIOException(new EOFException("EOFException"));
+            }
+        };
+        runner = TestRunners.newTestRunner(mockPutInfluxDB);
+        runner.setProperty(PutInfluxDB.DB_NAME, "test");
+        runner.setProperty(PutInfluxDB.USERNAME, "u1");
+        runner.setProperty(PutInfluxDB.PASSWORD, "p1");
+        runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
+        runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
+        runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
+        runner.assertValid();
+
+        byte [] bytes = "test".getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
+
+        assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"java.io.EOFException: EOFException");
+    }
+
+    @Test
+    public void testWriteThrowsSocketTimeoutException() {
+        mockPutInfluxDB = new PutInfluxDB() {
+            @Override
+            protected void writeToInfluxDB(ProcessContext context, String consistencyLevel, String database, String retentionPolicy,
+                    String records) {
+                throw new InfluxDBIOException(new SocketTimeoutException("SocketTimeoutException"));
+            }
+        };
+        runner = TestRunners.newTestRunner(mockPutInfluxDB);
+        runner.setProperty(PutInfluxDB.DB_NAME, "test");
+        runner.setProperty(PutInfluxDB.USERNAME, "u1");
+        runner.setProperty(PutInfluxDB.PASSWORD, "p1");
+        runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
+        runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
+        runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
+        runner.assertValid();
+
+        byte [] bytes = "test".getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_RETRY, 1);
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_RETRY);
+
+        assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"java.net.SocketTimeoutException: SocketTimeoutException");
+    }
+
+    @Test
+    public void testTriggerThrowsException() {
+        mockPutInfluxDB = new PutInfluxDB() {
+            @Override
+            protected InfluxDB getInfluxDB(ProcessContext context) {
+                throw new RuntimeException("testException");
+            }
+        };
+        runner = TestRunners.newTestRunner(mockPutInfluxDB);
+        runner.setProperty(PutInfluxDB.DB_NAME, "test");
+        runner.setProperty(PutInfluxDB.USERNAME, "u1");
+        runner.setProperty(PutInfluxDB.PASSWORD, "p1");
+        runner.setProperty(PutInfluxDB.CHARSET, "UTF-8");
+        runner.setProperty(PutInfluxDB.INFLUX_DB_URL, "http://dbUrl");
+        runner.setProperty(PutInfluxDB.CONSISTENCY_LEVEL,PutInfluxDB.CONSISTENCY_LEVEL_ONE.getValue());
+        runner.setProperty(PutInfluxDB.RETENTION_POLICY,"autogen");
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 KB");
+        runner.assertValid();
+
+        byte [] bytes = "test".getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
+
+        assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE),"testException");
+    }
+
+    @Test
+    public void testValidArrayMeasurement() {
+        runner.setProperty(PutInfluxDB.MAX_RECORDS_SIZE, "1 MB");
+        runner.assertValid();
+
+        runner.enqueue("test rain=2\ntest rain=3".getBytes());
+        runner.run(1,true,true);
+
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_SUCCESS);
+        assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), null);
+    }
+
+    @Test
+    public void testInvalidEmptySingleMeasurement() {
+        byte [] bytes = "".getBytes();
+        runner.enqueue(bytes);
+        runner.run(1,true,true);
+        runner.assertAllFlowFilesTransferred(PutInfluxDB.REL_FAILURE, 1);
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutInfluxDB.REL_FAILURE);
+        assertEquals(flowFiles.get(0).getAttribute(PutInfluxDB.INFLUX_DB_ERROR_MESSAGE), "Empty measurement size 0");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-nar-bundles/nifi-influxdb-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/pom.xml b/nifi-nar-bundles/nifi-influxdb-bundle/pom.xml
new file mode 100644
index 0000000..aa68705
--- /dev/null
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.6.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-influxdb-bundle</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-influxdb-processors</module>
+        <module>nifi-influxdb-nar</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-influxdb-processors</artifactId>
+                <version>1.6.0-SNAPSHOT</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 2632a4a..a1d6eac 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -74,6 +74,7 @@
         <module>nifi-windows-event-log-bundle</module>
         <module>nifi-ignite-bundle</module>
         <module>nifi-rethinkdb-bundle</module>
+        <module>nifi-influxdb-bundle</module>
         <module>nifi-email-bundle</module>
         <module>nifi-groovyx-bundle</module>
     	<module>nifi-ranger-bundle</module>

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7fe2da1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 173c8b6..f7b2495 100644
--- a/pom.xml
+++ b/pom.xml
@@ -251,6 +251,11 @@
                 <version>2.3.3</version>
             </dependency>
             <dependency>
+                <groupId>org.influxdb</groupId>
+                <artifactId>influxdb-java</artifactId>
+                <version>2.7</version>
+            </dependency>
+            <dependency>
                <groupId>org.apache.ignite</groupId>
                 <artifactId>ignite-core</artifactId>
                 <version>1.6.0</version>
@@ -1100,6 +1105,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-influxdb-nar</artifactId>
+                <version>1.6.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-ignite-nar</artifactId>
                 <version>1.6.0-SNAPSHOT</version>
                 <type>nar</type>