You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/19 16:45:40 UTC

[nifi] branch master updated (ab01136 -> 3015ee3)

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from ab01136  NIFI-6997 This closes #3971. connection closing fixed
     new de2a286  NIFI-818: This closes #3926. Initial implementation of NiFi-Accumulo ( https://github.com/phrocker/nifi-accumulo ) with connectors to Apache Accumulo 2.x
     new 3015ee3  NIFI-6944 This closes #3928. fixed NiFiClientUtil.getUrl and modified NiFiClientUtilSpec to test non-default WEB_HTTP_PORT

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 nifi-assembly/pom.xml                              |  27 +
 nifi-nar-bundles/nifi-accumulo-bundle/README.md    |  23 +
 .../nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml |  52 ++
 .../nifi-accumulo-processors/pom.xml               | 101 ++++
 .../accumulo/data/AccumuloRecordConfiguration.java | 159 +++++
 .../org/apache/nifi/accumulo/data/KeySchema.java   | 115 ++++
 .../accumulo/processors/BaseAccumuloProcessor.java |  76 +++
 .../accumulo/processors/PutAccumuloRecord.java     | 657 +++++++++++++++++++++
 .../nifi/accumulo/processors/ScanAccumulo.java     | 349 +++++++++++
 .../services/org.apache.nifi.processor.Processor   |  16 +
 .../controllerservices/MockAccumuloService.java    |  42 ++
 .../nifi/accumulo/processors/TestPutRecord.java    | 218 +++++++
 .../nifi/accumulo/processors/TestScanAccumulo.java | 236 ++++++++
 .../nifi-accumulo-services-api-nar/pom.xml         |  52 ++
 .../nifi-accumulo-services-api/pom.xml             |  61 ++
 .../controllerservices/BaseAccumuloService.java    |  32 +
 .../nifi-accumulo-services-nar/pom.xml             |  53 ++
 .../nifi-accumulo-services/pom.xml                 |  70 +++
 .../controllerservices/AccumuloService.java        | 210 +++++++
 .../org.apache.nifi.controller.ControllerService   |  15 +
 nifi-nar-bundles/nifi-accumulo-bundle/pom.xml      |  50 ++
 .../reporting/datadog/DataDogReportingTask.java    |   2 +-
 nifi-nar-bundles/pom.xml                           |   3 +-
 .../toolkit/admin/client/NiFiClientUtil.groovy     |   2 +-
 .../toolkit/admin/client/NiFiClientUtilSpec.groovy |   5 +-
 25 files changed, 2621 insertions(+), 5 deletions(-)
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/README.md
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/controllerservices/MockAccumuloService.java
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 create mode 100644 nifi-nar-bundles/nifi-accumulo-bundle/pom.xml


[nifi] 02/02: NIFI-6944 This closes #3928. fixed NiFiClientUtil.getUrl and modified NiFiClientUtilSpec to test non-default WEB_HTTP_PORT

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 3015ee39da293938c8119e6f48b2d32548b14b96
Author: Drew Kerrigan <an...@issgovernance.com>
AuthorDate: Wed Dec 11 19:35:51 2019 -0500

    NIFI-6944 This closes #3928. fixed NiFiClientUtil.getUrl and modified NiFiClientUtilSpec to test non-default WEB_HTTP_PORT
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../org/apache/nifi/toolkit/admin/client/NiFiClientUtil.groovy       | 2 +-
 .../org/apache/nifi/toolkit/admin/client/NiFiClientUtilSpec.groovy   | 5 +++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/nifi-toolkit/nifi-toolkit-admin/src/main/groovy/org/apache/nifi/toolkit/admin/client/NiFiClientUtil.groovy b/nifi-toolkit/nifi-toolkit-admin/src/main/groovy/org/apache/nifi/toolkit/admin/client/NiFiClientUtil.groovy
index 89ab169..9bb773c 100644
--- a/nifi-toolkit/nifi-toolkit-admin/src/main/groovy/org/apache/nifi/toolkit/admin/client/NiFiClientUtil.groovy
+++ b/nifi-toolkit/nifi-toolkit-admin/src/main/groovy/org/apache/nifi/toolkit/admin/client/NiFiClientUtil.groovy
@@ -56,7 +56,7 @@ public class NiFiClientUtil {
             urlBuilder.append("http://")
             urlBuilder.append(StringUtils.isEmpty(niFiProperties.getProperty(NiFiProperties.WEB_HTTP_HOST)) ? "localhost": niFiProperties.getProperty(NiFiProperties.WEB_HTTP_HOST))
             urlBuilder.append(":")
-            urlBuilder.append(StringUtils.isEmpty(niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT)) ? "8080": niFiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT))
+            urlBuilder.append(StringUtils.isEmpty(niFiProperties.getProperty(NiFiProperties.WEB_HTTP_PORT)) ? "8080": niFiProperties.getProperty(NiFiProperties.WEB_HTTP_PORT))
         }
 
         if(!StringUtils.isEmpty(endpoint)) {
diff --git a/nifi-toolkit/nifi-toolkit-admin/src/test/groovy/org/apache/nifi/toolkit/admin/client/NiFiClientUtilSpec.groovy b/nifi-toolkit/nifi-toolkit-admin/src/test/groovy/org/apache/nifi/toolkit/admin/client/NiFiClientUtilSpec.groovy
index 2441e94..0b00c08 100644
--- a/nifi-toolkit/nifi-toolkit-admin/src/test/groovy/org/apache/nifi/toolkit/admin/client/NiFiClientUtilSpec.groovy
+++ b/nifi-toolkit/nifi-toolkit-admin/src/test/groovy/org/apache/nifi/toolkit/admin/client/NiFiClientUtilSpec.groovy
@@ -47,8 +47,9 @@ class NiFiClientUtilSpec extends Specification{
 
         then:
 
-        3 * niFiProperties.getProperty(_)
-        url == "http://localhost:8080/nifi-api/controller/cluster/nodes/1"
+        2 * niFiProperties.getProperty(_) 
+        niFiProperties.getProperty(NiFiProperties.WEB_HTTP_PORT) >> "8000"
+        url == "http://localhost:8000/nifi-api/controller/cluster/nodes/1"
     }
 
 


[nifi] 01/02: NIFI-818: This closes #3926. Initial implementation of NiFi-Accumulo ( https://github.com/phrocker/nifi-accumulo ) with connectors to Apache Accumulo 2.x

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit de2a286a7a01725ee11d839ead7811a4135f8b41
Author: Marc Parisi <ph...@apache.org>
AuthorDate: Wed Dec 11 11:58:56 2019 -0500

    NIFI-818: This closes #3926. Initial implementation of NiFi-Accumulo ( https://github.com/phrocker/nifi-accumulo ) with connectors to Apache Accumulo 2.x
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 nifi-assembly/pom.xml                              |  27 +
 nifi-nar-bundles/nifi-accumulo-bundle/README.md    |  23 +
 .../nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml |  52 ++
 .../nifi-accumulo-processors/pom.xml               | 101 ++++
 .../accumulo/data/AccumuloRecordConfiguration.java | 159 +++++
 .../org/apache/nifi/accumulo/data/KeySchema.java   | 115 ++++
 .../accumulo/processors/BaseAccumuloProcessor.java |  76 +++
 .../accumulo/processors/PutAccumuloRecord.java     | 657 +++++++++++++++++++++
 .../nifi/accumulo/processors/ScanAccumulo.java     | 349 +++++++++++
 .../services/org.apache.nifi.processor.Processor   |  16 +
 .../controllerservices/MockAccumuloService.java    |  42 ++
 .../nifi/accumulo/processors/TestPutRecord.java    | 218 +++++++
 .../nifi/accumulo/processors/TestScanAccumulo.java | 236 ++++++++
 .../nifi-accumulo-services-api-nar/pom.xml         |  52 ++
 .../nifi-accumulo-services-api/pom.xml             |  61 ++
 .../controllerservices/BaseAccumuloService.java    |  32 +
 .../nifi-accumulo-services-nar/pom.xml             |  53 ++
 .../nifi-accumulo-services/pom.xml                 |  70 +++
 .../controllerservices/AccumuloService.java        | 210 +++++++
 .../org.apache.nifi.controller.ControllerService   |  15 +
 nifi-nar-bundles/nifi-accumulo-bundle/pom.xml      |  50 ++
 .../reporting/datadog/DataDogReportingTask.java    |   2 +-
 nifi-nar-bundles/pom.xml                           |   3 +-
 23 files changed, 2617 insertions(+), 2 deletions(-)

diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index b69aa64..7c9c7d8 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -897,6 +897,33 @@ language governing permissions and limitations under the License. -->
                     <type>nar</type>
                 </dependency>
             </dependencies>
+    	</profile>
+	<profile>
+            <id>include-accumulo</id>
+            <!-- This profile handles the inclusion of nifi-accumulo artifacts. -->
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-accumulo-nar</artifactId>
+                    <version>1.11.0-SNAPSHOT</version>
+                    <type>nar</type>
+	    	</dependency>
+		<dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-accumulo-services-api-nar</artifactId>
+                    <version>1.11.0-SNAPSHOT</version>
+                    <type>nar</type>
+		</dependency>
+		<dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-accumulo-services-nar</artifactId>
+                    <version>1.11.0-SNAPSHOT</version>
+                    <type>nar</type>
+                </dependency>
+            </dependencies>
         </profile>
         <profile>
             <id>rpm</id>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/README.md b/nifi-nar-bundles/nifi-accumulo-bundle/README.md
new file mode 100644
index 0000000..e431586
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/README.md
@@ -0,0 +1,23 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# nifi-accumulo
+
+This is a basic NiFi->Accumulo integration. Running `mvn install` will create your NAR, which can be added
+to Apache NiFi. This is intended to be created with Apache Accumulo 2.x.
+
+The resulting NAR will be named 'nifi-accumulo-nar'
+
+
+Note that some of this code was modeled after the HBase work.
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml
new file mode 100644
index 0000000..b128110
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-nar/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-nar</artifactId>
+    <version>1.11.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>false</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-processors</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services-api-nar</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
new file mode 100644
index 0000000..23e9902
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+	    <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo-core</artifactId>
+            <version>${accumulo.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo-minicluster</artifactId>
+            <version>${accumulo.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java
new file mode 100644
index 0000000..164f9d5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/AccumuloRecordConfiguration.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.data;
+
+/**
+ * Encapsulates configuring the session with some required parameters.
+ *
+ * Justification: Generally not a fan of this fluent API to configure other objects, but there is a lot encapsulated here
+ * so it helps minimize what we pass between the current set of classes and the upcoming features.
+ */
+public class AccumuloRecordConfiguration {
+    private String tableName;
+    private String rowFieldName;
+    private String columnFamily;
+    private String columnFamilyField;
+    private String timestampField;
+    private String fieldDelimiter;
+    private boolean encodeFieldDelimiter;
+    private boolean qualifierInKey;
+    private boolean deleteKeys;
+
+
+    protected AccumuloRecordConfiguration(final String tableName, final String rowFieldName, final String columnFamily,
+                                          final String columnFamilyField,
+                                          final String timestampField, final String fieldDelimiter,
+                                          final boolean encodeFieldDelimiter,
+                                          final boolean qualifierInKey, final boolean deleteKeys) {
+        this.tableName = tableName;
+        this.rowFieldName = rowFieldName;
+        this.columnFamily = columnFamily;
+        this.columnFamilyField = columnFamilyField;
+        this.timestampField = timestampField;
+        this.fieldDelimiter = fieldDelimiter;
+        this.encodeFieldDelimiter = encodeFieldDelimiter;
+        this.qualifierInKey = qualifierInKey;
+        this.deleteKeys = deleteKeys;
+    }
+
+    public String getTableName(){
+        return tableName;
+    }
+
+    public String getColumnFamily() {
+        return columnFamily;
+    }
+
+    public String getColumnFamilyField() {
+        return columnFamilyField;
+    }
+
+    public boolean getEncodeDelimiter(){
+        return encodeFieldDelimiter;
+    }
+
+    public String getTimestampField(){
+
+        return timestampField;
+    }
+
+    public String getFieldDelimiter(){
+        return fieldDelimiter;
+    }
+
+    public boolean getQualifierInKey(){
+        return qualifierInKey;
+    }
+
+    public boolean isDeleteKeys(){
+        return deleteKeys;
+    }
+
+
+    public String getRowField(){
+        return rowFieldName;
+    }
+
+    public static class Builder{
+
+        public static final Builder newBuilder(){
+            return new Builder();
+        }
+
+        public Builder setRowField(final String rowFieldName){
+            this.rowFieldName = rowFieldName;
+            return this;
+        }
+
+        public Builder setTableName(final String tableName){
+            this.tableName = tableName;
+            return this;
+        }
+
+        public Builder setEncodeFieldDelimiter(final boolean encodeFieldDelimiter){
+            this.encodeFieldDelimiter = encodeFieldDelimiter;
+            return this;
+        }
+
+
+        public Builder setColumnFamily(final String columnFamily){
+            this.columnFamily = columnFamily;
+            return this;
+        }
+
+        public Builder setColumnFamilyField(final String columnFamilyField){
+            this.columnFamilyField = columnFamilyField;
+            return this;
+        }
+
+        public Builder setTimestampField(final String timestampField){
+            this.timestampField = timestampField;
+            return this;
+        }
+
+        public Builder setQualifierInKey(final boolean qualifierInKey){
+            this.qualifierInKey = qualifierInKey;
+            return this;
+        }
+
+        public Builder setFieldDelimiter(final String fieldDelimiter){
+            this.fieldDelimiter = fieldDelimiter;
+            return this;
+        }
+
+        public Builder setDelete(final boolean deleteKeys){
+            this.deleteKeys = deleteKeys;
+            return this;
+        }
+
+        public AccumuloRecordConfiguration build(){
+            return new AccumuloRecordConfiguration(tableName,rowFieldName,columnFamily,columnFamilyField,timestampField,fieldDelimiter,encodeFieldDelimiter,qualifierInKey,deleteKeys);
+        }
+
+
+        private String tableName;
+        private String rowFieldName;
+        private String columnFamily;
+        private String columnFamilyField;
+        private String fieldDelimiter;
+        private boolean qualifierInKey=false;
+        private boolean encodeFieldDelimiter=false;
+        private String timestampField;
+        private boolean deleteKeys=false;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java
new file mode 100644
index 0000000..7ac74b8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/data/KeySchema.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.data;
+
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class KeySchema implements RecordSchema {
+    private static final List<RecordField> KEY_FIELDS = new ArrayList<>();
+
+    private static final List<DataType> DATA_TYPES = new ArrayList<>();
+
+    private static final List<String> FIELD_NAMES = new ArrayList<>();
+
+    static {
+        KEY_FIELDS.add(new RecordField("row", RecordFieldType.STRING.getDataType(),false));
+        KEY_FIELDS.add(new RecordField("columnFamily",RecordFieldType.STRING.getDataType(),true));
+        KEY_FIELDS.add(new RecordField("columnQualifier",RecordFieldType.STRING.getDataType(),true));
+        KEY_FIELDS.add(new RecordField("columnVisibility",RecordFieldType.STRING.getDataType(),true));
+        KEY_FIELDS.add(new RecordField("timestamp",RecordFieldType.LONG.getDataType(),true));
+        DATA_TYPES.add(RecordFieldType.STRING.getDataType());
+        DATA_TYPES.add(RecordFieldType.LONG.getDataType());
+        FIELD_NAMES.addAll(KEY_FIELDS.stream().map( x-> x.getFieldName()).collect(Collectors.toList()));
+    }
+    @Override
+    public List<RecordField> getFields() {
+        return KEY_FIELDS;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return KEY_FIELDS.size();
+    }
+
+    @Override
+    public RecordField getField(int i) {
+        return KEY_FIELDS.get(i);
+    }
+
+    @Override
+    public List<DataType> getDataTypes() {
+         return DATA_TYPES;
+    }
+
+    @Override
+    public List<String> getFieldNames() {
+        return FIELD_NAMES;
+    }
+
+    @Override
+    public Optional<DataType> getDataType(String s) {
+        if (s.equalsIgnoreCase("timestamp")){
+            return Optional.of( RecordFieldType.LONG.getDataType() );
+        } else{
+            if (FIELD_NAMES.stream().filter(x -> s.equalsIgnoreCase(s)).count() > 0){
+                return  Optional.of(RecordFieldType.STRING.getDataType());
+            }
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<String> getSchemaText() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<String> getSchemaFormat() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<RecordField> getField(final String s) {
+        return KEY_FIELDS.stream().filter(x -> x.getFieldName().equalsIgnoreCase(s)).findFirst();
+    }
+
+    @Override
+    public SchemaIdentifier getIdentifier() {
+        return SchemaIdentifier.builder().name("AccumuloKeySchema").version(1).branch("nifi-accumulo").build();
+    }
+
+    @Override
+    public Optional<String> getSchemaName() {
+        return Optional.of("AccumuloKeySchema");
+    }
+
+    @Override
+    public Optional<String> getSchemaNamespace() {
+        return Optional.of("nifi-accumulo");
+    }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
new file mode 100644
index 0000000..d0888ac
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/BaseAccumuloProcessor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+/**
+ * Base Accumulo class that provides connector services, table name, and thread
+ * properties
+ */
+public abstract class BaseAccumuloProcessor extends AbstractProcessor {
+
+    protected static final PropertyDescriptor ACCUMULO_CONNECTOR_SERVICE = new PropertyDescriptor.Builder()
+            .name("accumulo-connector-service")
+            .displayName("Accumulo Connector Service")
+            .description("Specifies the Controller Service to use for accessing Accumulo.")
+            .required(true)
+            .identifiesControllerService(BaseAccumuloService.class)
+            .build();
+
+
+    protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+            .name("Table Name")
+            .description("The name of the Accumulo Table into which data will be placed")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor CREATE_TABLE = new PropertyDescriptor.Builder()
+            .name("Create Table")
+            .description("Creates a table if it does not exist. This property will only be used when EL is not present in 'Table Name'")
+            .required(true)
+            .defaultValue("False")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+
+    protected static final PropertyDescriptor THREADS = new PropertyDescriptor.Builder()
+            .name("Threads")
+            .description("Number of threads used for reading and writing")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10")
+            .build();
+
+    /**
+     * Implementations can decide to include all base properties or individually include them. List is immutable
+     * so that implementations must constructor their own lists knowingly
+     */
+
+    protected static final ImmutableList<PropertyDescriptor> baseProperties = ImmutableList.of(ACCUMULO_CONNECTOR_SERVICE,TABLE_NAME,CREATE_TABLE,THREADS);
+
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
new file mode 100644
index 0000000..dbcc3e7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/PutAccumuloRecord.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.annotation.behavior.DynamicProperties;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
+import org.apache.nifi.accumulo.data.AccumuloRecordConfiguration;
+
+import javax.xml.bind.DatatypeConverter;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "accumulo", "put", "record"})
+@DynamicProperties({
+        @DynamicProperty(name = "visibility.<COLUMN FAMILY>", description = "Visibility label for everything under that column family " +
+                "when a specific label for a particular column qualifier is not available.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+                value = "visibility label for <COLUMN FAMILY>"
+        ),
+        @DynamicProperty(name = "visibility.<COLUMN FAMILY>.<COLUMN QUALIFIER>", description = "Visibility label for the specified column qualifier " +
+                "qualified by a configured column family.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+                value = "visibility label for <COLUMN FAMILY>:<COLUMN QUALIFIER>."
+        )
+})
+/**
+ * Purpose and Design: Requires a connector be defined by way of an AccumuloService object. This class
+ * simply extens BaseAccumuloProcessor to extract records from a flow file. The location of a record field value can be
+ * placed into the value or part of the column qualifier ( this can/may change )
+ *
+ * Supports deletes. If the delete flag is used we'll delete keys found within that flow file.
+ */
+public class PutAccumuloRecord extends BaseAccumuloProcessor {
+
+    protected static final PropertyDescriptor MEMORY_SIZE = new PropertyDescriptor.Builder()
+            .name("Memory Size")
+            .description("The maximum memory size Accumulo at any one time from the record set.")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("10 MB")
+            .build();
+
+    protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
+            .name("Column Family")
+            .description("The Column Family to use when inserting data into Accumulo")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    protected static final PropertyDescriptor COLUMN_FAMILY_FIELD = new PropertyDescriptor.Builder()
+            .name("Column Family Field")
+            .description("Field name used as the column family if one is not specified above.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    protected static final PropertyDescriptor DELETE_KEY = new PropertyDescriptor.Builder()
+            .name("delete-key")
+            .displayName("Delete Key")
+            .description("Deletes the key")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor RECORD_IN_QUALIFIER = new PropertyDescriptor.Builder()
+            .name("record-value-in-qualifier")
+            .displayName("Record Value In Qualifier")
+            .description("Places the record value into the column qualifier instead of the value.")
+            .required(false)
+            .defaultValue("False")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor FLUSH_ON_FLOWFILE = new PropertyDescriptor.Builder()
+            .name("flush-on-flow-file")
+            .displayName("Flush Every FlowFile")
+            .description("Flushes the table writer on every flow file.")
+            .required(true)
+            .defaultValue("True")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor FIELD_DELIMITER_AS_HEX = new PropertyDescriptor.Builder()
+            .name("field-delimiter-as-hex")
+            .displayName("Hex Encode Field Delimiter")
+            .description("Allows you to hex encode the delimiter as a character. So 0x00 places a null character between the record name and value.")
+            .required(false)
+            .defaultValue("False")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor FIELD_DELIMITER = new PropertyDescriptor.Builder()
+            .name("field-delimiter")
+            .displayName("Field Delimiter")
+            .description("Delimiter between the record value and name. ")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
+            .name("Row Identifier Field Name")
+            .description("Specifies the name of a record field whose value should be used as the row id for the given record." +
+                    " If EL defines a value that is not a field name that will be used as the row identifier.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    protected static final PropertyDescriptor TIMESTAMP_FIELD = new PropertyDescriptor.Builder()
+            .name("timestamp-field")
+            .displayName("Timestamp Field")
+            .description("Specifies the name of a record field whose value should be used as the timestamp. If empty a timestamp will be recorded as the time of insertion")
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    protected static final PropertyDescriptor VISIBILITY_PATH = new PropertyDescriptor.Builder()
+            .name("visibility-path")
+            .displayName("Visibility String Record Path Root")
+            .description("A record path that points to part of the record which contains a path to a mapping of visibility strings to record paths")
+            .required(false)
+            .addValidator(Validator.VALID)
+            .build();
+
+    protected static final PropertyDescriptor DEFAULT_VISIBILITY = new PropertyDescriptor.Builder()
+            .name("default-visibility")
+            .displayName("Default Visibility")
+            .description("Default visibility when VISIBILITY_PATH is not defined. ")
+            .required(false)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after it has been successfully stored in Accumulo")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it cannot be sent to Accumulo")
+            .build();
+
+
+    /**
+     * Connector service which provides us a connector if the configuration is correct.
+     */
+    protected BaseAccumuloService accumuloConnectorService;
+
+    /**
+     * Connector that we need to persist while we are operational.
+     */
+    protected AccumuloClient client;
+
+    /**
+     * Table writer that will close when we shutdown or upon error.
+     */
+    private MultiTableBatchWriter tableWriter = null;
+
+    /**
+     * Record path cache
+     */
+    protected RecordPathCache recordPathCache;
+
+
+    /**
+     * Flushes the tableWriter on every flow file if true.
+     */
+    protected boolean flushOnEveryFlow;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        return rels;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> set = Collections.emptySet();
+        if (!validationContext.getProperty(COLUMN_FAMILY).isSet() && !validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet())
+            set.add(new ValidationResult.Builder().explanation("Column Family OR Column family field name must be defined").build());
+        else if (validationContext.getProperty(COLUMN_FAMILY).isSet() && validationContext.getProperty(COLUMN_FAMILY_FIELD).isSet())
+            set.add(new ValidationResult.Builder().explanation("Column Family OR Column family field name must be defined, but not both").build());
+        return set;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        accumuloConnectorService = context.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class);
+        final Double maxBytes = context.getProperty(MEMORY_SIZE).asDataSize(DataUnit.B);
+        this.client = accumuloConnectorService.getClient();
+        BatchWriterConfig writerConfig = new BatchWriterConfig();
+        writerConfig.setMaxWriteThreads(context.getProperty(THREADS).asInteger());
+        writerConfig.setMaxMemory(maxBytes.longValue());
+        tableWriter = client.createMultiTableBatchWriter(writerConfig);
+        flushOnEveryFlow = context.getProperty(FLUSH_ON_FLOWFILE).asBoolean();
+        if (!flushOnEveryFlow){
+            writerConfig.setMaxLatency(60, TimeUnit.SECONDS);
+        }
+
+        if (context.getProperty(CREATE_TABLE).asBoolean() && !context.getProperty(TABLE_NAME).isExpressionLanguagePresent()) {
+            final Map<String, String> flowAttributes = new HashMap<>();
+            final String table = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowAttributes).getValue();
+              final TableOperations tableOps = this.client.tableOperations();
+              if (!tableOps.exists(table)) {
+                  getLogger().info("Creating " + table + " table.");
+                  try {
+                      tableOps.create(table);
+                  } catch (TableExistsException te) {
+                      // can safely ignore
+                  } catch (AccumuloSecurityException | AccumuloException e) {
+                      getLogger().info("Accumulo or Security error creating. Continuing... " + table + ". ", e);
+                  }
+              }
+        }
+    }
+
+
+    @OnUnscheduled
+    @OnDisabled
+    public synchronized void shutdown(){
+        /**
+         * Close the writer when we are shut down.
+         */
+        if (null != tableWriter){
+            try {
+                tableWriter.close();
+            } catch (MutationsRejectedException e) {
+                getLogger().error("Mutations were rejected",e);
+            }
+            tableWriter = null;
+        }
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(baseProperties);
+        properties.add(RECORD_READER_FACTORY);
+        properties.add(ROW_FIELD_NAME);
+        properties.add(ROW_FIELD_NAME);
+        properties.add(COLUMN_FAMILY);
+        properties.add(COLUMN_FAMILY_FIELD);
+        properties.add(DELETE_KEY);
+        properties.add(FLUSH_ON_FLOWFILE);
+        properties.add(FIELD_DELIMITER);
+        properties.add(FIELD_DELIMITER_AS_HEX);
+        properties.add(MEMORY_SIZE);
+        properties.add(RECORD_IN_QUALIFIER);
+        properties.add(TIMESTAMP_FIELD);
+        properties.add(VISIBILITY_PATH);
+        properties.add(DEFAULT_VISIBILITY);
+        return properties;
+    }
+
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+        final FlowFile flowFile = processSession.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordReaderFactory recordParserFactory = processContext.getProperty(RECORD_READER_FACTORY)
+                .asControllerService(RecordReaderFactory.class);
+
+        final String recordPathText = processContext.getProperty(VISIBILITY_PATH).getValue();
+        final String defaultVisibility = processContext.getProperty(DEFAULT_VISIBILITY).isSet() ? processContext.getProperty(DEFAULT_VISIBILITY).getValue() : null;
+
+        final String tableName = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        // create the table if EL is present, create table is true and the table does not exist.
+        if (processContext.getProperty(TABLE_NAME).isExpressionLanguagePresent() && processContext.getProperty(CREATE_TABLE).asBoolean()) {
+            final TableOperations tableOps = this.client.tableOperations();
+            if (!tableOps.exists(tableName)) {
+                getLogger().info("Creating " + tableName + " table.");
+                try {
+                    tableOps.create(tableName);
+                } catch (TableExistsException te) {
+                    // can safely ignore, though we shouldn't arrive here due to table.exists called, but it's possible
+                    // that with multiple threads two could attempt table creation concurrently. We don't want that
+                    // to be a failure.
+                } catch (AccumuloSecurityException | AccumuloException e) {
+                    throw new ProcessException("Accumulo or Security error creating. Continuing... " + tableName + ". ",e);
+                }
+            }
+        }
+
+        AccumuloRecordConfiguration builder = AccumuloRecordConfiguration.Builder.newBuilder()
+                .setTableName(tableName)
+                .setColumnFamily(processContext.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue())
+                .setColumnFamilyField(processContext.getProperty(COLUMN_FAMILY_FIELD).evaluateAttributeExpressions(flowFile).getValue())
+                .setRowField(processContext.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue())
+                .setEncodeFieldDelimiter(processContext.getProperty(FIELD_DELIMITER_AS_HEX).asBoolean())
+                .setFieldDelimiter(processContext.getProperty(FIELD_DELIMITER).isSet() ? processContext.getProperty(FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue() : "")
+                .setQualifierInKey(processContext.getProperty(RECORD_IN_QUALIFIER).isSet() ? processContext.getProperty(RECORD_IN_QUALIFIER).asBoolean() : false)
+                .setDelete(processContext.getProperty(DELETE_KEY).isSet() ? processContext.getProperty(DELETE_KEY).evaluateAttributeExpressions(flowFile).asBoolean() : false)
+                .setTimestampField(processContext.getProperty(TIMESTAMP_FIELD).evaluateAttributeExpressions(flowFile).getValue()).build();
+
+
+        RecordPath recordPath = null;
+        if (recordPathCache != null && !StringUtils.isEmpty(recordPathText)) {
+            recordPath = recordPathCache.getCompiled(recordPathText);
+        }
+
+        boolean failed = false;
+        Mutation prevMutation=null;
+        try (final InputStream in = processSession.read(flowFile);
+             final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
+            Record record;
+            /**
+             * HBase supports a restart point. This may be something that we can/should add if needed.
+             */
+            while ((record = reader.nextRecord()) != null) {
+                prevMutation = createMutation(prevMutation, processContext, record, reader.getSchema(), recordPath, flowFile,defaultVisibility,  builder);
+
+            }
+            addMutation(builder.getTableName(),prevMutation);
+        } catch (Exception ex) {
+            getLogger().error("Failed to put records to Accumulo.", ex);
+            failed = true;
+        }
+
+        if (flushOnEveryFlow){
+            try {
+                tableWriter.flush();
+            } catch (MutationsRejectedException e) {
+                throw new ProcessException(e);
+            }
+        }
+
+
+        if (!failed) {
+            processSession.transfer(flowFile, REL_SUCCESS);
+        } else {
+            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
+        }
+
+        processSession.commit();
+    }
+
+    /**
+     * Adapted from HBASEUtils. Their approach seemed ideal for what our intent is here.
+     * @param columnFamily column family from which to extract the visibility or to execute an expression against
+     * @param columnQualifier column qualifier from which to extract the visibility or to execute an expression against
+     * @param flowFile flow file being written
+     * @param context process context
+     * @return
+     */
+    public static String produceVisibility(String columnFamily, String columnQualifier, FlowFile flowFile, ProcessContext context) {
+        if (org.apache.commons.lang3.StringUtils.isNotEmpty(columnFamily)) {
+            return null;
+        }
+        String lookupKey = String.format("visibility.%s%s%s", columnFamily, !org.apache.commons.lang3.StringUtils.isNotEmpty(columnQualifier) ? "." : "", columnQualifier);
+        String fromAttribute = flowFile.getAttribute(lookupKey);
+
+        if (fromAttribute == null && !org.apache.commons.lang3.StringUtils.isBlank(columnQualifier)) {
+            String lookupKeyFam = String.format("visibility.%s", columnFamily);
+            fromAttribute = flowFile.getAttribute(lookupKeyFam);
+        }
+
+        if (fromAttribute != null) {
+            return fromAttribute;
+        } else {
+            PropertyValue descriptor = context.getProperty(lookupKey);
+            if (descriptor == null || !descriptor.isSet()) {
+                descriptor = context.getProperty(String.format("visibility.%s", columnFamily));
+            }
+
+            String retVal = descriptor != null ? descriptor.evaluateAttributeExpressions(flowFile).getValue() : null;
+
+            return retVal;
+        }
+    }
+
+    private void addMutation(final String tableName, final Mutation m) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+        tableWriter.getBatchWriter(tableName).addMutation(m);
+
+    }
+
+    /**
+     * Returns the row provided the record schema
+     * @param record record against which we are evaluating
+     * @param schema Record schema
+     * @param rowOrFieldName Row identifier or field name
+     * @return Text object containing the resulting row.
+     */
+    private Text getRow(final Record record,
+                        final RecordSchema schema,
+                        final String rowOrFieldName){
+        if ( !schema.getFieldNames().contains(rowOrFieldName) ){
+            return new Text(rowOrFieldName);
+        } else{
+            return new Text(record.getAsString(rowOrFieldName));
+        }
+    }
+
+    /**
+     * Creates a mutation with the provided arguments
+     * @param prevMutation previous mutation, to append to if in the same row.
+     * @param context process context.
+     * @param record record object extracted from the flow file
+     * @param schema schema for this record
+     * @param recordPath record path for visibility extraction
+     * @param flowFile flow file
+     * @param defaultVisibility default visibility
+     * @param config configuration of this instance.
+     * @return Returns the Mutation to insert
+     * @throws AccumuloSecurityException Error accessing Accumulo
+     * @throws AccumuloException Non security ( or table ) related Accumulo exceptions writing to the store.
+     * @throws TableNotFoundException Table not found on the cluster
+     */
+    protected Mutation createMutation(final Mutation prevMutation,
+                                      final ProcessContext context,
+                                      final Record record,
+                                      final RecordSchema schema,
+                                      final RecordPath recordPath,
+                                      final FlowFile flowFile,
+                                      final String defaultVisibility,
+                                      AccumuloRecordConfiguration config) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+        Mutation m=null;
+        if (record != null) {
+
+            final Long timestamp;
+            Set<String> fieldsToSkip = new HashSet<>();
+            if (!StringUtils.isBlank(config.getTimestampField())) {
+                try {
+                    timestamp = record.getAsLong(config.getTimestampField());
+                    fieldsToSkip.add(config.getTimestampField());
+                } catch (Exception e) {
+                    throw new AccumuloException("Could not convert " + config.getTimestampField() + " to a long", e);
+                }
+
+                if (timestamp == null) {
+                    getLogger().warn("The value of timestamp field " + config.getTimestampField() + " was null, record will be inserted with latest timestamp");
+                }
+            } else {
+                timestamp = null;
+            }
+
+
+
+            RecordField visField = null;
+            Map visSettings = null;
+            if (recordPath != null) {
+                final RecordPathResult result = recordPath.evaluate(record);
+                FieldValue fv = result.getSelectedFields().findFirst().get();
+                visField = fv.getField();
+                if (null != visField)
+                fieldsToSkip.add(visField.getFieldName());
+                visSettings = (Map)fv.getValue();
+            }
+
+
+            if (null != prevMutation){
+                Text row = new Text(prevMutation.getRow());
+                Text curRow = getRow(record,schema,config.getRowField());
+                if (row.equals(curRow)){
+                    m = prevMutation;
+                } else{
+                    m = new Mutation(curRow);
+                    addMutation(config.getTableName(),prevMutation);
+                }
+            } else{
+                Text row = getRow(record,schema,config.getRowField());
+                m = new Mutation(row);
+            }
+
+            fieldsToSkip.add(config.getRowField());
+
+            String columnFamily = config.getColumnFamily();
+            if (StringUtils.isBlank(columnFamily) && !StringUtils.isBlank(config.getColumnFamilyField())) {
+                final String cfField = config.getColumnFamilyField();
+                columnFamily = record.getAsString(cfField);
+                fieldsToSkip.add(cfField);
+            } else if (StringUtils.isBlank(columnFamily) && StringUtils.isBlank(config.getColumnFamilyField())){
+                throw new IllegalArgumentException("Invalid configuration for column family " + columnFamily + " and " + config.getColumnFamilyField());
+            }
+            final Text cf = new Text(columnFamily);
+
+            for (String name : schema.getFieldNames().stream().filter(p->!fieldsToSkip.contains(p)).collect(Collectors.toList())) {
+                String visString = (visField != null && visSettings != null && visSettings.containsKey(name))
+                        ? (String)visSettings.get(name) : defaultVisibility;
+
+                Text cq = new Text(name);
+                final Value value;
+                String recordValue  = record.getAsString(name);
+                if (config.getQualifierInKey()){
+                    final String delim = config.getFieldDelimiter();
+                    if (!StringUtils.isEmpty(delim)) {
+                        if (config.getEncodeDelimiter()) {
+                            byte [] asHex = DatatypeConverter.parseHexBinary(delim);
+                            cq.append(asHex, 0, asHex.length);
+                        }else{
+                            cq.append(delim.getBytes(), 0, delim.length());
+                        }
+                    }
+                    cq.append(recordValue.getBytes(),0,recordValue.length());
+                    value = new Value();
+                } else{
+                    value = new Value(recordValue.getBytes());
+                }
+
+                if (StringUtils.isBlank(visString)) {
+                    visString = produceVisibility(cf.toString(), cq.toString(), flowFile, context);
+                }
+
+                ColumnVisibility cv = new ColumnVisibility();
+                if (StringUtils.isBlank(visString)) {
+                    if (!StringUtils.isBlank(defaultVisibility)) {
+                        cv = new ColumnVisibility(defaultVisibility);
+                    }
+                } else {
+                    cv = new ColumnVisibility(visString);
+                }
+
+                if (null != timestamp) {
+                    if (config.isDeleteKeys()) {
+                        m.putDelete(cf, cq, cv, timestamp);
+                    } else {
+                        m.put(cf, cq, cv, timestamp, value);
+                    }
+                } else{
+                    if (config.isDeleteKeys())
+                        m.putDelete(cf, cq, cv);
+                    else
+                        m.put(cf, cq, cv, value);
+                }
+            }
+
+
+
+        }
+
+        return m;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        /**
+         * Adapted from HBase puts. This is a good approach and one that we should adopt here, too.
+         */
+        if (propertyDescriptorName.startsWith("visibility.")) {
+            String[] parts = propertyDescriptorName.split("\\.");
+            String displayName;
+            String description;
+
+            if (parts.length == 2) {
+                displayName = String.format("Column Family %s Default Visibility", parts[1]);
+                description = String.format("Default visibility setting for %s", parts[1]);
+            } else if (parts.length == 3) {
+                displayName = String.format("Column Qualifier %s.%s Default Visibility", parts[1], parts[2]);
+                description = String.format("Default visibility setting for %s.%s", parts[1], parts[2]);
+            } else {
+                return null;
+            }
+
+            return new PropertyDescriptor.Builder()
+                    .name(propertyDescriptorName)
+                    .displayName(displayName)
+                    .description(description)
+                    .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                    .dynamic(true)
+                    .build();
+        }
+
+        return null;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
new file mode 100644
index 0000000..f31c15b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/java/org/apache/nifi/accumulo/processors/ScanAccumulo.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
+import org.apache.nifi.accumulo.data.KeySchema;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({"hadoop", "accumulo", "scan", "record"})
+/**
+ * Purpose and Design: Requires a connector be defined by way of an AccumuloService object. This class
+ * simply extends BaseAccumuloProcessor to scan accumulo based on aspects and expression executed against
+ * a flow file
+ *
+ */
+public class ScanAccumulo extends BaseAccumuloProcessor {
+    static final PropertyDescriptor START_KEY = new PropertyDescriptor.Builder()
+            .displayName("Start key")
+            .name("start-key")
+            .description("Start row key")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    static final PropertyDescriptor START_KEY_INCLUSIVE = new PropertyDescriptor.Builder()
+            .displayName("Start key Inclusive")
+            .name("start-key-inclusive")
+            .description("Determines if the start key is inclusive ")
+            .required(false)
+            .defaultValue("True")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor END_KEY = new PropertyDescriptor.Builder()
+            .displayName("End key")
+            .name("end-key")
+            .description("End row key for this. If not specified or empty this will be infinite")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    static final PropertyDescriptor END_KEY_INCLUSIVE = new PropertyDescriptor.Builder()
+            .displayName("End key Inclusive")
+            .name("end-key-inclusive")
+            .description("Determines if the end key is inclusive")
+            .required(false)
+            .defaultValue("False")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder()
+            .name("accumulo-authorizations")
+            .displayName("Authorizations")
+            .description("The comma separated list of authorizations to pass to the scanner.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    static final PropertyDescriptor COLUMNFAMILY = new PropertyDescriptor.Builder()
+            .name("column-family")
+            .displayName("Start Column Family")
+            .description("The column family that is part of the start key. If no column key is defined only this column family will be selected")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    static final PropertyDescriptor COLUMNFAMILY_END = new PropertyDescriptor.Builder()
+            .name("column-family-end")
+            .displayName("End Column Family")
+            .description("The column family to select is part of end key")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after it has been successfully retrieved from Accumulo")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it cannot be retrieved fromAccumulo")
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing out the records")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    /**
+     * Connector service which provides us a connector if the configuration is correct.
+     */
+    protected BaseAccumuloService accumuloConnectorService;
+
+    /**
+     * Connector that we need to persist while we are operational.
+     */
+    protected AccumuloClient client;
+
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_FAILURE);
+        return rels;
+    }
+
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Collection<ValidationResult> set = new ArrayList<>();
+        if ((validationContext.getProperty(COLUMNFAMILY).isSet() && !validationContext.getProperty(COLUMNFAMILY_END).isSet())
+        || !validationContext.getProperty(COLUMNFAMILY).isSet() && validationContext.getProperty(COLUMNFAMILY_END).isSet() )
+            set.add(new ValidationResult.Builder().explanation("Column Family and Column family end  must be defined").build());
+        return set;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        accumuloConnectorService = context.getProperty(ACCUMULO_CONNECTOR_SERVICE).asControllerService(BaseAccumuloService.class);
+        this.client = accumuloConnectorService.getClient();
+    }
+
+    private Authorizations stringToAuth(final String authorizations){
+        if (!StringUtils.isBlank(authorizations))
+            return  new Authorizations(authorizations.split(","));
+        else
+            return new Authorizations();
+    }
+
+
+    protected long scanAccumulo(final RecordSetWriterFactory writerFactory, final ProcessContext processContext, final ProcessSession processSession, final Optional<FlowFile> incomingFlowFile){
+
+        final Map<String, String> flowAttributes = incomingFlowFile.isPresent() ?  incomingFlowFile.get().getAttributes() : new HashMap<>();
+        final String table = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowAttributes).getValue();
+        final String startKey = processContext.getProperty(START_KEY).evaluateAttributeExpressions(flowAttributes).getValue();
+        final boolean startKeyInclusive = processContext.getProperty(START_KEY_INCLUSIVE).asBoolean();
+        final boolean endKeyInclusive = processContext.getProperty(END_KEY_INCLUSIVE).asBoolean();
+        final String endKey = processContext.getProperty(END_KEY).evaluateAttributeExpressions(flowAttributes).getValue();
+        final String authorizations = processContext.getProperty(AUTHORIZATIONS).isSet()
+                ? processContext.getProperty(AUTHORIZATIONS).evaluateAttributeExpressions(flowAttributes).getValue() : "";
+        final int threads = processContext.getProperty(THREADS).asInteger();
+        final String startKeyCf = processContext.getProperty(COLUMNFAMILY).evaluateAttributeExpressions(flowAttributes).getValue();
+        final String endKeyCf = processContext.getProperty(COLUMNFAMILY_END).evaluateAttributeExpressions(flowAttributes).getValue();
+
+        final Authorizations auths = stringToAuth(authorizations);
+
+        final LongAdder recordCounter = new LongAdder();
+
+        final Range lookupRange = buildRange(startKey,startKeyCf,startKeyInclusive,endKey,endKeyCf,endKeyInclusive);
+
+        boolean cloneFlowFile = incomingFlowFile.isPresent();
+
+        try (BatchScanner scanner = client.createBatchScanner(table,auths,threads)) {
+            if (!StringUtils.isBlank(startKeyCf) &&  StringUtils.isBlank(endKeyCf))
+                scanner.fetchColumnFamily(new Text(startKeyCf));
+            scanner.setRanges(Collections.singleton(lookupRange));
+
+            final Iterator<Map.Entry<Key,Value>> kvIter = scanner.iterator();
+            if (!kvIter.hasNext()){
+                /**
+                 * Create a flow file with a record count of zero.
+                 */
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("record.count", String.valueOf(0));
+                final FlowFile newFlow = processSession.create();
+                processSession.putAllAttributes(newFlow,attributes);
+                processSession.transfer(newFlow, REL_SUCCESS);
+                return 0;
+            } else{
+
+                while (kvIter.hasNext()) {
+                    FlowFile iterationFlowFile = cloneFlowFile ? processSession.clone(incomingFlowFile.get()) : processSession.create();
+
+                    final int keysPerFlowFile = 1000;
+                    final Map<String, String> attributes = new HashMap<>();
+                    iterationFlowFile = processSession.write(iterationFlowFile, new StreamCallback() {
+                        @Override
+                        public void process(final InputStream in, final OutputStream out) throws IOException {
+
+                            try{
+                                final RecordSchema writeSchema = writerFactory.getSchema(flowAttributes, new KeySchema());
+                                try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) {
+
+                                    int i = 0;
+                                    writer.beginRecordSet();
+                                    for (; i < keysPerFlowFile && kvIter.hasNext(); i++) {
+
+                                        Map.Entry<Key, Value> kv = kvIter.next();
+
+                                        final Key key = kv.getKey();
+
+                                        Map<String, Object> data = new HashMap<>();
+                                        data.put("row", key.getRow().toString());
+                                        data.put("columnFamily", key.getColumnFamily().toString());
+                                        data.put("columnQualifier", key.getColumnQualifier().toString());
+                                        data.put("columnVisibility", key.getColumnVisibility().toString());
+                                        data.put("timestamp", key.getTimestamp());
+
+                                        MapRecord record = new MapRecord(new KeySchema(), data);
+                                        writer.write(record);
+
+
+                                    }
+                                    recordCounter.add(i);
+
+                                    final WriteResult writeResult = writer.finishRecordSet();
+                                    attributes.put("record.count", String.valueOf(i));
+                                    attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                                    attributes.putAll(writeResult.getAttributes());
+                                }
+                            } catch (SchemaNotFoundException e) {
+                                getLogger().error("Failed to process {}; will route to failure", new Object[] {
+                                        incomingFlowFile.isPresent() ? incomingFlowFile.get() : "No incoming flow file", e});
+
+                                throw new IOException(e);
+                            }
+                        }
+
+                    });
+                    processSession.putAllAttributes(iterationFlowFile,attributes);
+                    processSession.transfer(iterationFlowFile, REL_SUCCESS);
+                }
+            }
+        } catch (final Exception e) {
+            getLogger().error("Failed to process {}; will route to failure", new Object[] {incomingFlowFile.isPresent() ? incomingFlowFile.get() : "No incoming flow file", e});
+            if (cloneFlowFile) {
+                processSession.transfer(incomingFlowFile.get(), REL_FAILURE);
+            }
+            return 0;
+        }
+
+        if (cloneFlowFile) {
+            processSession.remove(incomingFlowFile.get());
+        }
+
+        getLogger().info("Successfully converted {} records for {}", new Object[] {recordCounter.longValue(), incomingFlowFile.toString()});
+
+        return recordCounter.longValue();
+    }
+
+
+    Range buildRange(final String startRow, final String startKeyCf,boolean startKeyInclusive, final String endRow, final String endKeyCf,boolean endKeyInclusive){
+        Key start = StringUtils.isBlank(startRow) ? null : StringUtils.isBlank(startKeyCf) ? new Key(startRow) : new Key(startRow,startKeyCf);
+        Key end = StringUtils.isBlank(endRow) ? null : StringUtils.isBlank(endKeyCf) ? new Key(endRow) : new Key(endRow,endKeyCf);
+        return new Range(start,startKeyInclusive,end,endKeyInclusive);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
+        FlowFile flowFile = processSession.get();
+
+        final RecordSetWriterFactory writerFactory = processContext.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        long recordCount = scanAccumulo(writerFactory,processContext,processSession,Optional.ofNullable(flowFile));
+
+        processSession.adjustCounter("Records Processed", recordCount, false);
+    }
+
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(baseProperties);
+        properties.add(START_KEY);
+        properties.add(START_KEY_INCLUSIVE);
+        properties.add(END_KEY);
+        properties.add(COLUMNFAMILY);
+        properties.add(COLUMNFAMILY_END);
+        properties.add(END_KEY_INCLUSIVE);
+        properties.add(RECORD_WRITER);
+        properties.add(AUTHORIZATIONS);
+        return properties;
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..a1ce072
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.accumulo.processors.PutAccumuloRecord
+org.apache.nifi.accumulo.processors.ScanAccumulo
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/controllerservices/MockAccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/controllerservices/MockAccumuloService.java
new file mode 100644
index 0000000..4ad489a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/controllerservices/MockAccumuloService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.controllerservices;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockAccumuloService {
+
+
+    public static AccumuloService getService(final TestRunner runner, final String zk, final String instanceName, final String user, final String password) throws InitializationException {
+        final AccumuloService accclient = new AccumuloService();
+        Map<String,String> properties = new HashMap<>();
+        properties.put(AccumuloService.ACCUMULO_PASSWORD.getName(), password);
+        properties.put(AccumuloService.AUTHENTICATION_TYPE.getName(), "PASSWORD");
+        properties.put(AccumuloService.ACCUMULO_USER.getName(), user);
+        properties.put(AccumuloService.ZOOKEEPER_QUORUM.getName(), zk);
+        properties.put(AccumuloService.INSTANCE_NAME.getName(), instanceName);
+        runner.addControllerService("accclient", accclient, properties);
+        runner.enableControllerService(accclient);
+        runner.setProperty("accumulo-connector-service","accclient");
+        return accclient;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java
new file mode 100644
index 0000000..2d45a48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestPutRecord.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.nifi.accumulo.controllerservices.AccumuloService;
+import org.apache.nifi.accumulo.controllerservices.MockAccumuloService;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestPutRecord {
+
+    public static final String DEFAULT_COLUMN_FAMILY = "family1";
+
+    /**
+     * Though deprecated in 2.0 it still functions very well
+     */
+    private static MiniAccumuloCluster accumulo;
+
+    private TestRunner getTestRunner(String table, String columnFamily) {
+        final TestRunner runner = TestRunners.newTestRunner(PutAccumuloRecord.class);
+        runner.enforceReadStreamsClosed(false);
+        runner.setProperty(PutAccumuloRecord.TABLE_NAME, table);
+        runner.setProperty(PutAccumuloRecord.COLUMN_FAMILY, columnFamily);
+        return runner;
+    }
+
+
+
+
+    @BeforeClass
+    public static void setupInstance() throws IOException, InterruptedException, AccumuloSecurityException, AccumuloException, TableExistsException {
+        Path tempDirectory = Files.createTempDirectory("acc"); // JUnit and Guava supply mechanisms for creating temp directories
+        accumulo = new MiniAccumuloCluster(tempDirectory.toFile(), "password");
+        accumulo.start();
+    }
+
+    private Set<Key> generateTestData(TestRunner runner, boolean valueincq, String delim, String cv) throws IOException {
+
+        final MockRecordParser parser = new MockRecordParser();
+        try {
+            runner.addControllerService("parser", parser);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.enableControllerService(parser);
+        runner.setProperty(PutAccumuloRecord.RECORD_READER_FACTORY, "parser");
+
+        long ts = System.currentTimeMillis();
+
+        parser.addSchemaField("id", RecordFieldType.STRING);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("code", RecordFieldType.STRING);
+        parser.addSchemaField("timestamp", RecordFieldType.LONG);
+
+        Set<Key> expectedKeys = new HashSet<>();
+        ColumnVisibility colViz = new ColumnVisibility();
+        if (null != cv)
+            colViz = new ColumnVisibility(cv);
+        Random random = new Random();
+        for (int x = 0; x < 5; x++) {
+            //final int row = random.nextInt(10000000);
+            final String row = UUID.randomUUID().toString();
+            final String cf = UUID.randomUUID().toString();
+            final String cq = UUID.randomUUID().toString();
+            Text keyCq = new Text("name");
+            if (valueincq){
+                if (null != delim && !delim.isEmpty())
+                    keyCq.append(delim.getBytes(),0,delim.length());
+                keyCq.append(cf.getBytes(),0,cf.length());
+            }
+            expectedKeys.add(new Key(new Text(row), new Text("family1"), keyCq, colViz,ts));
+            keyCq = new Text("code");
+            if (valueincq){
+                if (null != delim && !delim.isEmpty())
+                    keyCq.append(delim.getBytes(),0,delim.length());
+                keyCq.append(cq.getBytes(),0,cq.length());
+            }
+            expectedKeys.add(new Key(new Text(row), new Text("family1"), keyCq, colViz, ts));
+            parser.addRecord(row, cf, cq, ts);
+        }
+
+        return expectedKeys;
+    }
+
+    void verifyKey(String tableName, Set<Key> expectedKeys, Authorizations auths) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+        if (null == auths)
+            auths = new Authorizations();
+        try(BatchScanner scanner = accumulo.getConnector("root","password").createBatchScanner(tableName,auths,1)) {
+            List<Range> ranges = new ArrayList<>();
+            ranges.add(new Range());
+            scanner.setRanges(ranges);
+            for (Map.Entry<Key, Value> kv : scanner) {
+                Assert.assertTrue(kv.getKey() + " not in expected keys",expectedKeys.remove(kv.getKey()));
+            }
+        }
+        Assert.assertEquals(0, expectedKeys.size());
+
+    }
+
+    private void basicPutSetup(boolean valueincq) throws Exception {
+        basicPutSetup(valueincq,null,null,null,false);
+    }
+
+    private void basicPutSetup(boolean valueincq, final String delim) throws Exception {
+        basicPutSetup(valueincq,delim,null,null,false);
+    }
+
+    private void basicPutSetup(boolean valueincq,String delim, String auths, Authorizations defaultVis, boolean deletes) throws Exception {
+        String tableName = UUID.randomUUID().toString();
+        tableName=tableName.replace("-","a");
+        if (null != defaultVis)
+        accumulo.getConnector("root","password").securityOperations().changeUserAuthorizations("root",defaultVis);
+        TestRunner runner = getTestRunner(tableName, DEFAULT_COLUMN_FAMILY);
+        runner.setProperty(PutAccumuloRecord.CREATE_TABLE, "True");
+        runner.setProperty(PutAccumuloRecord.ROW_FIELD_NAME, "id");
+        runner.setProperty(PutAccumuloRecord.COLUMN_FAMILY, DEFAULT_COLUMN_FAMILY);
+        runner.setProperty(PutAccumuloRecord.TIMESTAMP_FIELD, "timestamp");
+        if (valueincq) {
+            if (null != delim){
+                runner.setProperty(PutAccumuloRecord.FIELD_DELIMITER, delim);
+            }
+            runner.setProperty(PutAccumuloRecord.RECORD_IN_QUALIFIER, "True");
+        }
+        if (null != defaultVis){
+            runner.setProperty(PutAccumuloRecord.DEFAULT_VISIBILITY, auths);
+        }
+        AccumuloService client = MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password");
+        Set<Key> expectedKeys = generateTestData(runner,valueincq,delim, auths);
+        runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
+        runner.run();
+
+        List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutAccumuloRecord.REL_SUCCESS);
+        Assert.assertTrue("Wrong count", results.size() == 1);
+        verifyKey(tableName, expectedKeys, defaultVis);
+        if (deletes){
+            runner.setProperty(PutAccumuloRecord.DELETE_KEY, "true");
+            runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
+            runner.run();
+            runner.getFlowFilesForRelationship(PutAccumuloRecord.REL_SUCCESS);
+            verifyKey(tableName, new HashSet<>(), defaultVis);
+        }
+
+    }
+
+
+
+
+    @Test
+    public void testByteEncodedPut() throws Exception {
+        basicPutSetup(false);
+    }
+
+    @Test
+    public void testByteEncodedPutThenDelete() throws Exception {
+        basicPutSetup(true,null,"A&B",new Authorizations("A","B"),true);
+    }
+
+
+    @Test
+    public void testByteEncodedPutCq() throws Exception {
+        basicPutSetup(true);
+    }
+
+    @Test
+    public void testByteEncodedPutCqDelim() throws Exception {
+        basicPutSetup(true,"\u0000");
+    }
+
+    @Test
+    public void testByteEncodedPutCqWithVis() throws Exception {
+        basicPutSetup(true,null,"A&B",new Authorizations("A","B"),false);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java
new file mode 100644
index 0000000..3be8c72
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-processors/src/test/java/org/apache/nifi/accumulo/processors/TestScanAccumulo.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.processors;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.nifi.accumulo.controllerservices.AccumuloService;
+import org.apache.nifi.accumulo.controllerservices.MockAccumuloService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestScanAccumulo {
+
+    public static final String DEFAULT_COLUMN_FAMILY = "family1";
+
+    /**
+     * Though deprecated in 2.0 it still functions very well
+     */
+    private static MiniAccumuloCluster accumulo;
+
+    private TestRunner getTestRunner(String table, String columnFamily) {
+        final TestRunner runner = TestRunners.newTestRunner(ScanAccumulo.class);
+        runner.enforceReadStreamsClosed(false);
+        runner.setProperty(ScanAccumulo.TABLE_NAME, table);
+        return runner;
+    }
+
+
+
+
+    @BeforeClass
+    public static void setupInstance() throws IOException, InterruptedException, AccumuloSecurityException, AccumuloException, TableExistsException {
+        Path tempDirectory = Files.createTempDirectory("acc"); // JUnit and Guava supply mechanisms for creating temp directories
+        accumulo = new MiniAccumuloCluster(tempDirectory.toFile(), "password");
+        accumulo.start();
+    }
+
+    private Set<Key> generateTestData(TestRunner runner, String definedRow, String table, boolean valueincq, String delim, String cv)
+            throws IOException, AccumuloSecurityException, AccumuloException, TableNotFoundException {
+
+        BatchWriterConfig writerConfig = new BatchWriterConfig();
+        writerConfig.setMaxWriteThreads(2);
+        writerConfig.setMaxMemory(1024*1024);
+        MultiTableBatchWriter writer  = accumulo.getConnector("root","password").createMultiTableBatchWriter(writerConfig);
+
+        long ts = System.currentTimeMillis();
+
+
+        final MockRecordWriter parser = new MockRecordWriter();
+        try {
+            runner.addControllerService("parser", parser);
+        } catch (InitializationException e) {
+            throw new IOException(e);
+        }
+        runner.enableControllerService(parser);
+        runner.setProperty(ScanAccumulo.RECORD_WRITER,"parser");
+
+
+        Set<Key> expectedKeys = new HashSet<>();
+        ColumnVisibility colViz = new ColumnVisibility();
+        if (null != cv)
+            colViz = new ColumnVisibility(cv);
+        Random random = new Random();
+        for (int x = 0; x < 5; x++) {
+            //final int row = random.nextInt(10000000);
+            final String row = definedRow.isEmpty() ? UUID.randomUUID().toString() : definedRow;
+            final String cf = UUID.randomUUID().toString();
+            final String cq = UUID.randomUUID().toString();
+            Text keyCq = new Text("name");
+            if (valueincq){
+                if (null != delim && !delim.isEmpty())
+                    keyCq.append(delim.getBytes(),0,delim.length());
+                keyCq.append(cf.getBytes(),0,cf.length());
+            }
+            expectedKeys.add(new Key(new Text(row), new Text(DEFAULT_COLUMN_FAMILY), keyCq, colViz,ts));
+            keyCq = new Text("code");
+            if (valueincq){
+                if (null != delim && !delim.isEmpty())
+                    keyCq.append(delim.getBytes(),0,delim.length());
+                keyCq.append(cq.getBytes(),0,cq.length());
+            }
+            expectedKeys.add(new Key(new Text(row), new Text(DEFAULT_COLUMN_FAMILY), keyCq, colViz, ts));
+            Mutation m = new Mutation(row);
+            m.put(new Text(DEFAULT_COLUMN_FAMILY),new Text(keyCq),colViz,ts, new Value());
+            writer.getBatchWriter(table).addMutation(m);
+        }
+        writer.flush();
+        return expectedKeys;
+    }
+
+    void verifyKey(String tableName, Set<Key> expectedKeys, Authorizations auths) throws AccumuloSecurityException, AccumuloException, TableNotFoundException {
+        if (null == auths)
+            auths = new Authorizations();
+        try(BatchScanner scanner = accumulo.getConnector("root","password").createBatchScanner(tableName,auths,1)) {
+            List<Range> ranges = new ArrayList<>();
+            ranges.add(new Range());
+            scanner.setRanges(ranges);
+            for (Map.Entry<Key, Value> kv : scanner) {
+                Assert.assertTrue(kv.getKey() + " not in expected keys",expectedKeys.remove(kv.getKey()));
+            }
+        }
+        Assert.assertEquals(0, expectedKeys.size());
+
+    }
+
+    private void basicPutSetup(boolean sendFlowFile, boolean valueincq) throws Exception {
+        basicPutSetup(sendFlowFile,"","","","",valueincq,null,"",null,false,5);
+    }
+
+    private void basicPutSetup(boolean sendFlowFile, boolean valueincq, final String delim) throws Exception {
+        basicPutSetup(sendFlowFile,"","","","",valueincq,delim,"",null,false,5);
+    }
+
+    private void basicPutSetup(boolean sendFlowFile,String row,String endrow, String cf,String endcf, boolean valueincq,String delim,
+                               String auths, Authorizations defaultVis, boolean deletes, int expected) throws Exception {
+        String tableName = UUID.randomUUID().toString();
+        tableName=tableName.replace("-","a");
+        accumulo.getConnector("root","password").tableOperations().create(tableName);
+        if (null != defaultVis)
+        accumulo.getConnector("root","password").securityOperations().changeUserAuthorizations("root",defaultVis);
+        TestRunner runner = getTestRunner(tableName, DEFAULT_COLUMN_FAMILY);
+        runner.setProperty(ScanAccumulo.START_KEY, row);
+        if (!cf.isEmpty())
+        runner.setProperty(ScanAccumulo.COLUMNFAMILY, cf);
+        if (!endcf.isEmpty())
+        runner.setProperty(ScanAccumulo.COLUMNFAMILY_END, endcf);
+        runner.setProperty(ScanAccumulo.AUTHORIZATIONS, auths);
+        runner.setProperty(ScanAccumulo.END_KEY, endrow);
+
+        AccumuloService client = MockAccumuloService.getService(runner,accumulo.getZooKeepers(),accumulo.getInstanceName(),"root","password");
+        Set<Key> expectedKeys = generateTestData(runner,row,tableName,valueincq,delim, auths);
+        if (sendFlowFile) {
+            runner.enqueue("Test".getBytes("UTF-8")); // This is to coax the processor into reading the data in the reader.l
+        }
+        runner.run();
+
+
+        List<MockFlowFile> results = runner.getFlowFilesForRelationship(ScanAccumulo.REL_SUCCESS);
+        for(MockFlowFile ff : results){
+            String attr = ff.getAttribute("record.count");
+            Assert.assertEquals(expected,Integer.valueOf(attr).intValue());
+        }
+        Assert.assertTrue("Wrong count, received " + results.size(), results.size() == 1);
+    }
+
+
+
+
+    @Test
+    public void testPullDatWithFlowFile() throws Exception {
+        basicPutSetup(true,false);
+    }
+
+    @Test
+    public void testPullDatWithOutFlowFile() throws Exception {
+        basicPutSetup(false,false);
+    }
+
+    @Test
+    public void testSameRowCf() throws Exception {
+        basicPutSetup(false,"2019","2019","family1","family2",false,null,"",null,false,1);
+    }
+
+    @Test
+    public void testSameRowCfValueInCq() throws Exception {
+        basicPutSetup(false,"2019","2019","family1","family2",true,null,"",null,false,5);
+    }
+
+    @Test
+    public void testSameRowCfValueInCqWithAuths() throws Exception {
+        basicPutSetup(false,"2019","2019","family1","family2",true,null,"abcd",new Authorizations("abcd"),false,5);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testSameRowCfValueInCqErrorCfEnd() throws Exception {
+        basicPutSetup(false,"2019","2019","family1","",true,null,"",null,false,5);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testSameRowCfValueInCqErrorCf() throws Exception {
+        basicPutSetup(false,"2019","2019","","family2",true,null,"",null,false,5);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testSameRowCfValueInCqErrorNotLess() throws Exception {
+        basicPutSetup(false,"2019","2019","family1","family1",true,null,"",null,false,5);
+    }
+
+
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml
new file mode 100644
index 0000000..c1a86ef
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api-nar/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-services-api-nar</artifactId>
+    <version>1.11.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>false</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml
new file mode 100644
index 0000000..6fe8641
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+	    <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-services-api</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+	<dependency>
+	    <groupId>org.apache.accumulo</groupId>
+	    <artifactId>accumulo-core</artifactId>
+	    <version>${accumulo.version}</version>
+	</dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
new file mode 100644
index 0000000..d92b152
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-api/src/main/java/org/apache/nifi/accumulo/controllerservices/BaseAccumuloService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.controllerservices;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+
+@Tags({"accumulo", "client", "service"})
+@CapabilityDescription("Provides a basic connector to Accumulo services")
+public interface BaseAccumuloService extends ControllerService {
+
+
+    AccumuloClient getClient();
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml
new file mode 100644
index 0000000..c19e8ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services-nar/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-services-nar</artifactId>
+    <version>1.11.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>false</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+           <artifactId>nifi-accumulo-services-api-nar</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
new file mode 100644
index 0000000..f6985c7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+	    <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-accumulo-bundle</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-accumulo-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+	<dependency>
+	    <groupId>org.apache.accumulo</groupId>
+	    <artifactId>accumulo-core</artifactId>
+	    <version>${accumulo.version}</version>
+	</dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-lookup-service-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-accumulo-services-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
new file mode 100644
index 0000000..91da7fe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/java/org/apache/nifi/accumulo/controllerservices/AccumuloService.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.accumulo.controllerservices;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Purpose: Controller service that provides us a configured connector. Note that we don't need to close this
+ *
+ * Justification: Centralizes the configuration of the connecting accumulo code. This also will be used
+ * for any kerberos integration.
+ */
+@RequiresInstanceClassLoading
+@Tags({"accumulo", "client", "service"})
+@CapabilityDescription("A controller service for accessing an HBase client.")
+public class AccumuloService extends AbstractControllerService implements BaseAccumuloService {
+
+    private enum AuthenticationType{
+        PASSWORD,
+        NONE
+    }
+
+    protected static final PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
+            .name("ZooKeeper Quorum")
+            .description("Comma-separated list of ZooKeeper hosts for Accumulo.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final PropertyDescriptor INSTANCE_NAME = new PropertyDescriptor.Builder()
+            .name("Instance Name")
+            .description("Instance name of the Accumulo cluster")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+
+    protected static final PropertyDescriptor ACCUMULO_USER = new PropertyDescriptor.Builder()
+            .name("Accumulo User")
+            .description("Connecting user for Accumulo")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final PropertyDescriptor ACCUMULO_PASSWORD = new PropertyDescriptor.Builder()
+            .name("Accumulo Password")
+            .description("Connecting user's password when using the PASSWORD Authentication type")
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
+            .name("Authentication Type")
+            .description("Authentication Type")
+            .allowableValues(AuthenticationType.values())
+            .defaultValue(AuthenticationType.PASSWORD.toString())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    /**
+     * Reference to the accumulo client.
+     */
+    AccumuloClient client;
+
+    /**
+     * properties
+     */
+    private List<PropertyDescriptor> properties;
+
+    @Override
+    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(ZOOKEEPER_QUORUM);
+        props.add(INSTANCE_NAME);
+        props.add(ACCUMULO_USER);
+        props.add(AUTHENTICATION_TYPE);
+        props.add(ACCUMULO_PASSWORD);
+        properties = Collections.unmodifiableList(props);
+    }
+
+    private AuthenticationToken getToken(final AuthenticationType type, final ConfigurationContext context){
+        switch(type){
+            case PASSWORD:
+                return new PasswordToken(context.getProperty(ACCUMULO_PASSWORD).getValue());
+            default:
+                return null;
+        }
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(INSTANCE_NAME);
+        properties.add(ZOOKEEPER_QUORUM);
+        properties.add(ACCUMULO_USER);
+        properties.add(AUTHENTICATION_TYPE);
+        properties.add(ACCUMULO_PASSWORD);
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> problems = new ArrayList<>();
+
+        if (!validationContext.getProperty(INSTANCE_NAME).isSet()){
+            problems.add(new ValidationResult.Builder().valid(false).subject(INSTANCE_NAME.getName()).explanation("Instance name must be supplied").build());
+        }
+
+        if (!validationContext.getProperty(ZOOKEEPER_QUORUM).isSet()){
+            problems.add(new ValidationResult.Builder().valid(false).subject(ZOOKEEPER_QUORUM.getName()).explanation("Zookeepers must be supplied").build());
+        }
+
+        if (!validationContext.getProperty(ACCUMULO_USER).isSet()){
+            problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_USER.getName()).explanation("Accumulo user must be supplied").build());
+        }
+
+        final AuthenticationType type = validationContext.getProperty(
+                AUTHENTICATION_TYPE).isSet() ? AuthenticationType.valueOf( validationContext.getProperty(AUTHENTICATION_TYPE).getValue() ) : AuthenticationType.PASSWORD;
+
+        switch(type){
+            case PASSWORD:
+                if (!validationContext.getProperty(ACCUMULO_PASSWORD).isSet()){
+                    problems.add(
+                            new ValidationResult.Builder().valid(false).subject(AUTHENTICATION_TYPE.getName()).explanation("Password must be supplied for the Password Authentication type").build());
+                }
+                break;
+            default:
+                problems.add(new ValidationResult.Builder().valid(false).subject(ACCUMULO_PASSWORD.getName()).explanation("Non supported Authentication type").build());
+        }
+
+        return problems;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
+        if (!context.getProperty(INSTANCE_NAME).isSet() || !context.getProperty(ZOOKEEPER_QUORUM).isSet() || !context.getProperty(ACCUMULO_USER).isSet()){
+            throw new InitializationException("Instance name and Zookeeper Quorum must be specified");
+        }
+
+
+
+        final String instanceName = context.getProperty(INSTANCE_NAME).evaluateAttributeExpressions().getValue();
+        final String zookeepers = context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue();
+        final String accumuloUser = context.getProperty(ACCUMULO_USER).evaluateAttributeExpressions().getValue();
+
+        final AuthenticationType type = AuthenticationType.valueOf( context.getProperty(AUTHENTICATION_TYPE).getValue() );
+
+
+
+        final AuthenticationToken token = getToken(type,context);
+
+        this.client = Accumulo.newClient().to(instanceName,zookeepers).as(accumuloUser,token).build();
+
+        if (null == token){
+            throw new InitializationException("Feature not implemented");
+        }
+
+    }
+
+    @Override
+    public AccumuloClient getClient(){
+        return client;
+    }
+
+    @OnDisabled
+    public void shutdown() {
+        client.close();
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..0e27be4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/nifi-accumulo-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.accumulo.controllerservices.AccumuloService
diff --git a/nifi-nar-bundles/nifi-accumulo-bundle/pom.xml b/nifi-nar-bundles/nifi-accumulo-bundle/pom.xml
new file mode 100644
index 0000000..a43c010
--- /dev/null
+++ b/nifi-nar-bundles/nifi-accumulo-bundle/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+    license agreements. See the NOTICE file distributed with this work for additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <properties>
+        <accumulo.version>2.0.0</accumulo.version>
+    </properties>
+
+    <artifactId>nifi-accumulo-bundle</artifactId>
+    <version>1.11.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-accumulo-services-api</module>
+        <module>nifi-accumulo-services-api-nar</module>
+        <module>nifi-accumulo-services</module>
+        <module>nifi-accumulo-services-nar</module>
+        <module>nifi-accumulo-processors</module>
+        <module>nifi-accumulo-nar</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-accumulo-processors</artifactId>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-services</artifactId>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
index 1c0704e..3747571 100644
--- a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
+++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
@@ -139,7 +139,7 @@ public class DataDogReportingTask extends AbstractReportingTask {
         try {
             updateDataDogTransport(context);
         } catch (IOException e) {
-            e.printStackTrace();
+            logger.warn("Unable to update data dog transport", e);
         }
         updateAllMetricGroups(status);
         ddMetricRegistryBuilder.getDatadogReporter().report();
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index d23aa21..522cdb4 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -99,7 +99,8 @@
         <module>nifi-prometheus-bundle</module>
         <module>nifi-easyrules-bundle</module>
         <module>nifi-sql-reporting-bundle</module>
-        <module>nifi-rules-action-handler-bundle</module>
+	<module>nifi-rules-action-handler-bundle</module>
+	<module>nifi-accumulo-bundle</module>
     </modules>
 
     <build>